DataLake에서 update를 하는 방법

조회수 1899


DataLake는 (비정형) 원시 데이터를 적재하는 저장소의 개념을 띄고 있습니다. (비정형)이라는 것은 저장된 데이터가 스키마나 명세를 갖고 있지 않을 수도 있다는 것을 의미하고 원시데이터라 함은 특정 목적을 위해 데이터가 처리되지 않은 원래의 상태를 뜻합니다.전통적인 방식에 기반하여 이후의 데이터의 lifecycle을 정리해 보면 데이터를 사용할 수 있는 형태로 처리(ETL)하여 ODS(Operational Data Store)에 저장하고 이를 의사결정 및 각종 분석에 활용하기 위해 DW(Data Warehouse)에 집중시킵니다. 또 ODS나 DW의 데이터 중 발굴된 인사이트는 서비스로의 활용을 위해 별도로 처리되어 저장되는데 이곳을 Mart라고 칭하고 있습니다.

BigData 시대에 접어들면서 이러한 Data Structure 및 각 영역의 Role이 희미해져 가는 것 같은데요. 그러다 보니 DataLake에서 여러 Role을 중첩하는 경우가 많아졌습니다. 롤을 구분 지으면 관리 비용 증대와 Data Silo라는 부작용이 수반되니 어찌 보면 당연한 흐름이라고 봅니다. 여기에서 고민거리가 생기는데요, 대부분 DataLake는 대용량의 데이터를 저장하기 위해 HADOOP의 HDFS 혹은 이에 상응하는 Cloud의 Block Storage를 통하여 구현되게 됩니다. 이 파일시스템은 WORM(Write Once Read Many) 특징을 갖고 있어 한 번 쓴 데이터는 수정할 수 없으며 갱신하기 위해서는 이전 것을 지우고 새로 만들어야 하는 문제가 발생합니다.

한 예로, 여러분께서 인터넷 쇼핑을 하면 아래와 같은 이력이 만들어질 것입니다.



여러 개의 행태 이력이 만들어졌지만 모두 하나의 주문이 되기까지의 여정입니다. 


여정의 기록 저장소인 datalake의 DW화 or Mart화가 진행되다 보니

"주문번호를 보니 같은 주문번호가 많은데 어떻게 세야 하는 거야?",
"그러니 주문이 몇 건이야? 간단하게 셀 수 있게 만들어줘!",
"현재 유지되고 있는 카트는 몇 개고 이 중에 주문으로 연결된 비율은 얼마야?"

라는 요건들을 이곳에서 해소해야 할 경우가 있습니다.

사실 여기까지는 parse, aggregation, evaluation 그리고 loop을 하면 되니 그럭저럭 힘들긴 해도 할 만합니다. (REPL을 흉내 내봤습니다. 데이터 엔지니어와 데이터 사이언티스트 분들 힘내십쇼!)


그러나,

"8월 서비스 배포가 잘못되어 데이터에 이상이 있다, 8월 데이터 전체를 다시 전송하겠다."

"중복해서 보낸 데이터가 있으니 15:00 시점으로 롤백하세요."


이런 요청이 들어오기 시작했을 때 update가 되지 않는다고 하면 "아니 Mysql에서는 쉽게 되는데 BigData라면서 그런 것도 안 되냐?"라는 이슈가 발생하게 되고 또 dw, mart, datalake 등의 개론부터 설명하는 비용이 들기도 하고 기술적으로 해결하려다 보면 KUDU, HBase 등을 도입하고 Update나 Version 관리를 해야 하는데 이 또한 운영 비용의 급격한 증가와 더불어 Update나 TTL로 사라지는 이력 정보는 전체 이력을 저장해야 하는 DataLake의 특징을 희석시켜 버리게 합니다. 또, 클라우드 상에서 serverless로 운영하다 보면 OSS를 중첩시키는 것에 대한 어려움이 생기게 되고요. 결국 DataLake이지만 앞서 설명한 용도별 Data Structure를 구현해야 하는 상황이 발생하게 됩니다.


그래서 이런 문제를 해결하기 위해서 간단하면서도 서버리스 환경에서 운영가능한  "Updatable Datalake"를 개인적으로 PoC 해봤습니다.
먼저 간략한 구조를 그려보면 다음과 같습니다.


  1. 초기 기준 데이터를 만들기 위해 과거 전체를 집계합니다. (최초 스냅샷)
  2. 이를 업데이트 가능한 DB에 bulk insert 합니다.
  3. DB의 내용을 백업 및 재활용 용도로 Block Storage에 저장합니다.
  4. 오늘 데이터가 들어오면 (3)의 데이터와 left join하여 변경할 필요가 있는 데이터만 남기도록 필터링합니다.
  5. 필터링한 데이터를 DB에 upsert 할 수 있도록 SQL 쿼리문으로 변환시킵니다.
  6. upsert를하여 신규데이터는 insert가 되도록 기존에 존재하는 데이터는 update되도록 처리합니다.
  7. 다시 (3)의 과정으로 변경이 완료된 DB의 스냅샷을 BlockStorage에 저장합니다.


이런 흐름을 설계하게 되면 DataLake를 DW, ODS, Mart의 용도로 활용할 수 있게 됩니다. 또 BigData Env.에서 귀찮은 작업인 Update, Rollback 등을 쉽게 처리할 수 있고요. 그리고 모든 데이터를 유실 없는 원시데이터 형태로 보관 및 활용할 수 있는 특성도 계속 유지할 수 있습니다.

다만, PoC 시 아쉬웠던 점은 DB를 위해 RDS를 runtime에 프로비져닝하게되면 cold start 타임이 많이 소요되었고 그래서 4가 진행되기도 전에 RDB를 미리 띄우는 식의 트릭이 필요했습니다. 이는 RDB를 static instance나 상면 장비로 운영한다면 큰 문제는 없을 것으로 보입니다. 이와 같은 문제를 처리할 수 있는 다양한 방법이 있겠지만 최소한의 노력으로 최대한의 효과를 바라는 마음에 구성해봤고요. 참조하시면 조금이나마 도움이 되지 않을까 해서 글을 써봤습니다.

긴 글 시간 내어 읽어 주셔서 감사드리고요.
이 컨셉을 위해 만들어 놨던 코드 몇 개를 기록하며 마무리 하겠습니다.


2의 과정에서 pyspark에서 DB에 bulk insert 하는 구문 입니다. (가급적 overwrite는 빼시고 작업을 하시는게 human error를 ...)

order_df.write.format('jdbc') \ 
     .mode('overwrite') \ 
     .option("url", "jdbc:mysql://airguy.************.com:3306/test") \
     .option("dbtable", "shopify_order_master") \
     .option("user", "user") \
     .option("password", "password") \
     .save()


4의 과정에서 최초 master의 최초 생성 시각보다 빠른 데이터와 최근 변경 시각보다 느린 데이터만을 남겨두는 구문 입니다.

update_df = merge_df.withColumn('created_at'F.when(F.col('t_created_at') < F.col('m_created_at'), F.col('t_created_at')).otherwise(F.col('m_created_at'))) \
    .withColumn('field1'F.when(F.col('t_updated_at') > F.col('m_updated_at'), F.col('t_field1')).otherwise(F.col('m_field1'))) \
    .withColumn('field2'F.when(F.col('t_updated_at') > F.col('m_updated_at'), F.col('t_field2')).otherwise(F.col('m_field2'))) \
    ...
    .select('field1''field2', ...)
  • t_로 시작하는 컬럼은 오늘 유입된 데이터 m_으로 시작하는 컬럼은 master 데이터의 필드입니다. 시각을 비교해서 어떤 컬럼을 취할지 분기하는 코드입니다. SQL로 처리하면 좀 더 간단하겠지만 metastore가 준비되지 않은 상황에서 glue 등을 이용할 경우까지 고려하여 pure pyspark으로 작성했습니다.


5의 과정에서 dataframe을 upsert sql 구문으로 변환하는 모듈 입니다.

def create_upsert_sql(x) :

        global type_dict

        x_dict = x.asDict()

        column_list = []
        value_list = []
        key_list = []

        for key, value in x_dict.items() :
               column_list.append(key)

                if 'int' in type_dict[key] :
                     value_list.append(value)
                     key_list.append("{}={}".format(key, value))
                else :
                     value_list.append("'{}'".format(value))
                     key_list.append("{}='{}'".format(key, value))

        value_list = list(map(str,value_list))

        return "INSERT INTO test.shopify_order_master ({}) VALUES({}) ON DUPLICATE KEY UPDATE {};".format(",".join(column_list), ",".join(value_list), ",".join(key_list))

update_df.rdd.map(lambda x : create_upsert_sql(x)).coalesce(1, shuffle = True)\
               .saveAsTextFile("file:///airguy/poc/shopify_update_test/order_upsert_20210808"


이 과정을 거치면 dataframe으로 존재했던 데이터가 서버 로컬 디스크에 sql 구문 파일로 만들어 집니다. 

 INSERT INTO test.table_name (field1, field2, ...)
 VALUES(value1, value2, ...)
 ON DUPLICATE KEY UPDATE field1=value1, field2=value2, ...;


이것을 Mysql cli 등으로 구동하면  테이블의 key 기준으로 해당 데이터가 존재하지 않는 경우 insert가 실행되고 record가 존재하면 update 구문이 수행되는 upsert가 실행 됩니다.


전체데이터 중 0808에 변경된 이력





이 과정을 거치면 dataframe으로 존재했던 데이터가 서버 로컬 디스크에 sql 구문 파일로 만들어 집니다.



무료 인사이트 리포트와 최신 마케팅 트렌드 자료를 받아보고 싶다면
다이티 뉴스레터를 구독해보세요!