본문 바로가기
Data Engineering

[Delta Lake] DB Sink 되는 건들의 모수를 줄여보기

by GroovyArea 2024. 11. 30.

나는 백엔드 엔지니어이지만, Databricks 활용한 데이터 엔지니어링 업무도 겸하고 있다.

추후 개발 커리어를 데이터 쪽으로 전향하고 싶기도 하여, 현재 회사에 입사 이후 지속적인 면담에서 데이터 업무를 하고 싶다고 적극적으로 어필을 했고, 챕터 리드분은 이를 흔쾌히 받아들여주셨다. 원하는 업무를 겸하게 되어 정말 행복하다.

 

Databricks를 사용하며 사용하는 API는 Apache Spark를 사용한다.

사실 적극적인 데이터 엔지니어링보다는 데이터 플랫폼, 데이터 분석팀에서 만들어주신 Raw Data를 서비스에 맞게 가공하여 Delta table에 적재한 후, 이를 DB Sink 하는 용도의 개발을 진행하고 있다.

 

처음 파이프라인을 개발했을 때는, 아무것도 모르는 상태에서 공부하며 진행했었기에 모든 데이터를 DB에 upsert 치는 방식으로 개발을 했다.

개발을 완료하고 나서 직면한 문제는 실제로 변경이 된 데이터가 아닌 모든 데이터가 update된 다는 것을 알았고,

sink 된 데이터들의 정합성이 그리 좋지 않겠다는 단점과, sink 되는 데이터의 양이 많다는 점이 문제였다.

 

이를 어떻게 개선할 수 있을까라는 생각을 했고,

처음 생각한 방안은 실제로 변경 된 데이터들을 모아두는 Delta table을 따로 생성해서 적재해 두고, 상태 칼럼을 두어 해결할 수 있겠다는 생각을 했지만, spark api는 분명 이런 문제에 대응할 수 있는 기능을 제공하리라 생각했다.

 

어느 날, 현재 같은 프로젝트를 진행하며 협업하는 데이터 사이언티스트분께서 DB sink시 실제 변경 된 데이터만을 필터 할 수 있게 모수를 줄일 수 있는 기능에 대해 공유를 해주시는 회의에 초대를 해주셨고, 이때 delta lake에 대해 더 잘 알게 되었다.

그렇게 알게 된 기능을 토대로 실제 업데이트가 된 데이터들에 한해 sink를 진행할 수 있게 수정 개발하였고, 현재 운영 환경에서 정상적으로 db sink되는 배치로 운영하게 되었다.

그래서 그 해결 방법을 한번 공유해보려한다.

 

배치 프로세스 설명

1. 데이터 분석팀에서 쌓는 Master Delta Table 은 매일 오전에 업데이트되는 Job 이 실행된다.

2. 나는 매일 해당 Job 해당 Master Delta Table에서 필요한 데이터를 추출, 가공하여 백엔드 서비스에 사용할 DB 테이블 스키마와 동일한 형태의 Delta Table 에 쌓는다.

3. 쌓인 Delta Table에서, 오늘 업데이트가 된 데이터를 모두 DB에 sink 한다.

 

내가 진행했던 단계는 2, 3 단계이다.

 

문제가 시작되는 부분은 2번이다.

 

나는 Delta Table에 가공된 데이터를 쌓을 때, 아래와 같은 함수로 사용하는 편이었다.

def write_to_delta(table: str, df: DataFrame):
    try:
        (
            df
            .write
            .format("delta")
            .mode("overwrite")
            .option("mergeSchema", "true")
            .saveAsTable(table)
        )
    except Exception as e:
        print(f"delta table insert 중 예외가 발생했습니다: {e}")

 

문제는 모든 데이터를 overwrite 한다는 것이다. 업데이트되지 않은 데이터까지 모조리 덮어 씌워진다.

하여, 3번 단계에서는 이 모든 데이터를 읽어 DB에 sink 하게 된다. (updated_at 칼럼 기준으로 매일 가져와 sink)

이 부분을 실제 업데이트가 일어난 행만 delta table에 merge 되게 개선해야 할 것이다.

 

Detla Lake의 Versioning

훌륭하신 분들이 만들어 주신 Delta Lake에서는 Table Protocol Versioning이라는 기능이 있다.

Delta Table의 트랜잭션 로그에는 각 버전이 기록되며, 이를 아래와 같은 쿼리로 확인할 수 있다.

describe history data_strategy.product.test_delta;

 
그럼 아래와 같이, 버전을 확인할 수 있다. (자세한 정보는 생략했다.)

version timestamp userId userName operation

77 2024-11-29T 23:06:08.000 4627685283376877 user OPTIMIZE
76 2024-11-29T 23:05:52.000 4627685283376877 user MERGE
75 2024-11-28T 23:06:21.000 4627685283376877 user OPTIMIZE
74 2024-11-28T 23:05:59.000 4627685283376877 user MERGE

 

이 버저닝을 사용하려면, delta table 생성 후 아래와 같은 쿼리를 실행해 주면 된다.

ALTER TABLE data_strategy.product.test_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

 

Versioning 활용한 Delta Table Merge

이제 Detla Table의 versioning까지 사용 완료 했으니, 변경 본만 merge 하면 된다.

핵심은 가공된 현재 데이터와 기존 delta에 쌓인 데이터 (스냅숏) 과의 비교가 관건이다.

 

test_source_df = test_df
test_source_view = source_df.createOrReplaceTempView('source_view')

test_source_view_columns = test_source_view.columns

test_source_view_condition = ""
for column in test_source_view_columns[:-2]:
    # 컬럼 타입 가져오기
    column_type = dict(test_source_df.dtypes)[column]
    
    # 컬럼 타입에 따라 다르게 처리
    if column_type.startswith("map"):  # 컬럼 타입이 MAP인 경우
        test_source_view_condition += f"CAST(coalesce(target.{column}, map()) AS STRING) <> CAST(coalesce(source.{column}, map()) AS STRING) OR\n"
    elif column_type.startswith("timestamp"): # 컬럼 타입이 timestamp인 경우
        test_source_view_condition += f"ifnull(target.{column}, current_timestamp()) <> ifnull(source.{column}, current_timestamp()) OR\n"
    else:  # 컬럼 타입이 MAP이 아닌 경우
        test_source_view_condition += f"(target.{column} IS DISTINCT FROM source.{column}) OR\n"  # NULL 비교를 `IS DISTINCT FROM`으로 처리


test_source_view_condition = test_source_view_condition[:-3]

print(test_source_view_condition)

 

위와 같이, test_df (가공된 dataframe)을 기반으로 임시 view를 생성하고,

target과 비교하며, 각 칼럼을 순회하며 변경 본을 감지하기 위한 String을 만든다.

(target.id IS DISTINCT FROM source.id) OR
(target.registration_number IS DISTINCT FROM source.registration_number) OR
(target.public_id IS DISTINCT FROM source.public_id) OR
(target.name IS DISTINCT FROM source.name) OR
(target.industry IS DISTINCT FROM source.industry)

 

위와 같이 print 문에 찍힌 결과를 확인할 수 있다.

이를 통해, 실제 Delta에 Merge를 진행해 보자.

 

spark.sql("""
MERGE INTO {0} AS target
USING test_source_view AS source
ON 1=1
AND target.registration_number = source.registration_number
WHEN MATCHED AND ({1})
THEN
  UPDATE SET 
    target.registration_number = source.registration_number,
    target.name = source.name,
    target.industry = source.industry,
    target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE""".format(test_delta_table_name, test_source_view_condition)).show()

 

위와 같은 형태로 변경 본에 한해서 Case When 문을 사용해 merge를 진행할 수 있다.

그렇게 결과는 아래와 같이 출력된다.

 

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|            10404|            9728|             190|              486|
+-----------------+----------------+----------------+-----------------+

 

그럼 실제로 업데이트된 데이터만 delta table에 반영이 될 테고, 이를 읽고 sink만 진행하면 된다.

최신 버전과 그 이전 버전을 조회하며 delete 할 대상과 upsert 할 쿼리를 만들면 더 금상첨화일 것이다.

 

결론

Delta Lake의 세계는 알면 알 수록 무궁무진하다.

알면 알 수록 신기하지만, 정말 공부할 게 많다는 것을 깨달았다.

방대한 데이터를 쉽게 다루기 위한 API에 대한 공부만을 하기에도 오래 걸리겠지만, 실제로 어떻게 동작하는지도 궁금해진다.

 

현재는 데이터 추출 및 가공 및 싱크만 하고 있지만, 언젠가 의미 있는 데이터를 만들어내는 역할도 한번 해보고 싶다.

Delta Lake를 공부하는 분들께 많은 도움이 되길 바랍니다.

 

반응형