๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
Data Engineering

[Delta Lake] DB Sink ๋˜๋Š” ๊ฑด๋“ค์˜ ๋ชจ์ˆ˜๋ฅผ ์ค„์—ฌ๋ณด๊ธฐ

by GroovyArea 2024. 11. 30.

๋‚˜๋Š” ๋ฐฑ์—”๋“œ ์—”์ง€๋‹ˆ์–ด์ด์ง€๋งŒ, Databricks ํ™œ์šฉํ•œ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง ์—…๋ฌด๋„ ๊ฒธํ•˜๊ณ  ์žˆ๋‹ค.

์ถ”ํ›„ ๊ฐœ๋ฐœ ์ปค๋ฆฌ์–ด๋ฅผ ๋ฐ์ดํ„ฐ ์ชฝ์œผ๋กœ ์ „ํ–ฅํ•˜๊ณ  ์‹ถ๊ธฐ๋„ ํ•˜์—ฌ, ํ˜„์žฌ ํšŒ์‚ฌ์— ์ž…์‚ฌ ์ดํ›„ ์ง€์†์ ์ธ ๋ฉด๋‹ด์—์„œ ๋ฐ์ดํ„ฐ ์—…๋ฌด๋ฅผ ํ•˜๊ณ  ์‹ถ๋‹ค๊ณ  ์ ๊ทน์ ์œผ๋กœ ์–ดํ•„์„ ํ–ˆ๊ณ , ์ฑ•ํ„ฐ ๋ฆฌ๋“œ๋ถ„์€ ์ด๋ฅผ ํ”์พŒํžˆ ๋ฐ›์•„๋“ค์—ฌ์ฃผ์…จ๋‹ค. ์›ํ•˜๋Š” ์—…๋ฌด๋ฅผ ๊ฒธํ•˜๊ฒŒ ๋˜์–ด ์ •๋ง ํ–‰๋ณตํ•˜๋‹ค.

 

Databricks๋ฅผ ์‚ฌ์šฉํ•˜๋ฉฐ ์‚ฌ์šฉํ•˜๋Š” API๋Š” Apache Spark๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

์‚ฌ์‹ค ์ ๊ทน์ ์ธ ๋ฐ์ดํ„ฐ ์—”์ง€๋‹ˆ์–ด๋ง๋ณด๋‹ค๋Š” ๋ฐ์ดํ„ฐ ํ”Œ๋žซํผ, ๋ฐ์ดํ„ฐ ๋ถ„์„ํŒ€์—์„œ ๋งŒ๋“ค์–ด์ฃผ์‹  Raw Data๋ฅผ ์„œ๋น„์Šค์— ๋งž๊ฒŒ ๊ฐ€๊ณตํ•˜์—ฌ Delta table์— ์ ์žฌํ•œ ํ›„, ์ด๋ฅผ DB Sink ํ•˜๋Š” ์šฉ๋„์˜ ๊ฐœ๋ฐœ์„ ์ง„ํ–‰ํ•˜๊ณ  ์žˆ๋‹ค.

 

์ฒ˜์Œ ํŒŒ์ดํ”„๋ผ์ธ์„ ๊ฐœ๋ฐœํ–ˆ์„ ๋•Œ๋Š”, ์•„๋ฌด๊ฒƒ๋„ ๋ชจ๋ฅด๋Š” ์ƒํƒœ์—์„œ ๊ณต๋ถ€ํ•˜๋ฉฐ ์ง„ํ–‰ํ–ˆ์—ˆ๊ธฐ์— ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ DB์— upsert ์น˜๋Š” ๋ฐฉ์‹์œผ๋กœ ๊ฐœ๋ฐœ์„ ํ–ˆ๋‹ค.

๊ฐœ๋ฐœ์„ ์™„๋ฃŒํ•˜๊ณ  ๋‚˜์„œ ์ง๋ฉดํ•œ ๋ฌธ์ œ๋Š” ์‹ค์ œ๋กœ ๋ณ€๊ฒฝ์ด ๋œ ๋ฐ์ดํ„ฐ๊ฐ€ ์•„๋‹Œ ๋ชจ๋“  ๋ฐ์ดํ„ฐ๊ฐ€ update๋œ ๋‹ค๋Š” ๊ฒƒ์„ ์•Œ์•˜๊ณ ,

sink ๋œ ๋ฐ์ดํ„ฐ๋“ค์˜ ์ •ํ•ฉ์„ฑ์ด ๊ทธ๋ฆฌ ์ข‹์ง€ ์•Š๊ฒ ๋‹ค๋Š” ๋‹จ์ ๊ณผ, sink ๋˜๋Š” ๋ฐ์ดํ„ฐ์˜ ์–‘์ด ๋งŽ๋‹ค๋Š” ์ ์ด ๋ฌธ์ œ์˜€๋‹ค.

 

์ด๋ฅผ ์–ด๋–ป๊ฒŒ ๊ฐœ์„ ํ•  ์ˆ˜ ์žˆ์„๊นŒ๋ผ๋Š” ์ƒ๊ฐ์„ ํ–ˆ๊ณ ,

์ฒ˜์Œ ์ƒ๊ฐํ•œ ๋ฐฉ์•ˆ์€ ์‹ค์ œ๋กœ ๋ณ€๊ฒฝ ๋œ ๋ฐ์ดํ„ฐ๋“ค์„ ๋ชจ์•„๋‘๋Š” Delta table์„ ๋”ฐ๋กœ ์ƒ์„ฑํ•ด์„œ ์ ์žฌํ•ด ๋‘๊ณ , ์ƒํƒœ ์นผ๋Ÿผ์„ ๋‘์–ด ํ•ด๊ฒฐํ•  ์ˆ˜ ์žˆ๊ฒ ๋‹ค๋Š” ์ƒ๊ฐ์„ ํ–ˆ์ง€๋งŒ, spark api๋Š” ๋ถ„๋ช… ์ด๋Ÿฐ ๋ฌธ์ œ์— ๋Œ€์‘ํ•  ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์„ ์ œ๊ณตํ•˜๋ฆฌ๋ผ ์ƒ๊ฐํ–ˆ๋‹ค.

 

์–ด๋А ๋‚ , ํ˜„์žฌ ๊ฐ™์€ ํ”„๋กœ์ ํŠธ๋ฅผ ์ง„ํ–‰ํ•˜๋ฉฐ ํ˜‘์—…ํ•˜๋Š” ๋ฐ์ดํ„ฐ ์‚ฌ์ด์–ธํ‹ฐ์ŠคํŠธ๋ถ„๊ป˜์„œ DB sink์‹œ ์‹ค์ œ ๋ณ€๊ฒฝ ๋œ ๋ฐ์ดํ„ฐ๋งŒ์„ ํ•„ํ„ฐ ํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋ชจ์ˆ˜๋ฅผ ์ค„์ผ ์ˆ˜ ์žˆ๋Š” ๊ธฐ๋Šฅ์— ๋Œ€ํ•ด ๊ณต์œ ๋ฅผ ํ•ด์ฃผ์‹œ๋Š” ํšŒ์˜์— ์ดˆ๋Œ€๋ฅผ ํ•ด์ฃผ์…จ๊ณ , ์ด๋•Œ delta lake์— ๋Œ€ํ•ด ๋” ์ž˜ ์•Œ๊ฒŒ ๋˜์—ˆ๋‹ค.

๊ทธ๋ ‡๊ฒŒ ์•Œ๊ฒŒ ๋œ ๊ธฐ๋Šฅ์„ ํ† ๋Œ€๋กœ ์‹ค์ œ ์—…๋ฐ์ดํŠธ๊ฐ€ ๋œ ๋ฐ์ดํ„ฐ๋“ค์— ํ•œํ•ด sink๋ฅผ ์ง„ํ–‰ํ•  ์ˆ˜ ์žˆ๊ฒŒ ์ˆ˜์ • ๊ฐœ๋ฐœํ•˜์˜€๊ณ , ํ˜„์žฌ ์šด์˜ ํ™˜๊ฒฝ์—์„œ ์ •์ƒ์ ์œผ๋กœ db sink๋˜๋Š” ๋ฐฐ์น˜๋กœ ์šด์˜ํ•˜๊ฒŒ ๋˜์—ˆ๋‹ค.

๊ทธ๋ž˜์„œ ๊ทธ ํ•ด๊ฒฐ ๋ฐฉ๋ฒ•์„ ํ•œ๋ฒˆ ๊ณต์œ ํ•ด๋ณด๋ คํ•œ๋‹ค.

 

๋ฐฐ์น˜ ํ”„๋กœ์„ธ์Šค ์„ค๋ช…

1. ๋ฐ์ดํ„ฐ ๋ถ„์„ํŒ€์—์„œ ์Œ“๋Š” Master Delta Table ์€ ๋งค์ผ ์˜ค์ „์— ์—…๋ฐ์ดํŠธ๋˜๋Š” Job ์ด ์‹คํ–‰๋œ๋‹ค.

2. ๋‚˜๋Š” ๋งค์ผ ํ•ด๋‹น Job ํ•ด๋‹น Master Delta Table์—์„œ ํ•„์š”ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์ถ”์ถœ, ๊ฐ€๊ณตํ•˜์—ฌ ๋ฐฑ์—”๋“œ ์„œ๋น„์Šค์— ์‚ฌ์šฉํ•  DB ํ…Œ์ด๋ธ” ์Šคํ‚ค๋งˆ์™€ ๋™์ผํ•œ ํ˜•ํƒœ์˜ Delta Table ์— ์Œ“๋Š”๋‹ค.

3. ์Œ“์ธ Delta Table์—์„œ, ์˜ค๋Š˜ ์—…๋ฐ์ดํŠธ๊ฐ€ ๋œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ชจ๋‘ DB์— sink ํ•œ๋‹ค.

 

๋‚ด๊ฐ€ ์ง„ํ–‰ํ–ˆ๋˜ ๋‹จ๊ณ„๋Š” 2, 3 ๋‹จ๊ณ„์ด๋‹ค.

 

๋ฌธ์ œ๊ฐ€ ์‹œ์ž‘๋˜๋Š” ๋ถ€๋ถ„์€ 2๋ฒˆ์ด๋‹ค.

 

๋‚˜๋Š” Delta Table์— ๊ฐ€๊ณต๋œ ๋ฐ์ดํ„ฐ๋ฅผ ์Œ“์„ ๋•Œ, ์•„๋ž˜์™€ ๊ฐ™์€ ํ•จ์ˆ˜๋กœ ์‚ฌ์šฉํ•˜๋Š” ํŽธ์ด์—ˆ๋‹ค.

def write_to_delta(table: str, df: DataFrame):
    try:
        (
            df
            .write
            .format("delta")
            .mode("overwrite")
            .option("mergeSchema", "true")
            .saveAsTable(table)
        )
    except Exception as e:
        print(f"delta table insert ์ค‘ ์˜ˆ์™ธ๊ฐ€ ๋ฐœ์ƒํ–ˆ์Šต๋‹ˆ๋‹ค: {e}")

 

๋ฌธ์ œ๋Š” ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ overwrite ํ•œ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค. ์—…๋ฐ์ดํŠธ๋˜์ง€ ์•Š์€ ๋ฐ์ดํ„ฐ๊นŒ์ง€ ๋ชจ์กฐ๋ฆฌ ๋ฎ์–ด ์”Œ์›Œ์ง„๋‹ค.

ํ•˜์—ฌ, 3๋ฒˆ ๋‹จ๊ณ„์—์„œ๋Š” ์ด ๋ชจ๋“  ๋ฐ์ดํ„ฐ๋ฅผ ์ฝ์–ด DB์— sink ํ•˜๊ฒŒ ๋œ๋‹ค. (updated_at ์นผ๋Ÿผ ๊ธฐ์ค€์œผ๋กœ ๋งค์ผ ๊ฐ€์ ธ์™€ sink)

์ด ๋ถ€๋ถ„์„ ์‹ค์ œ ์—…๋ฐ์ดํŠธ๊ฐ€ ์ผ์–ด๋‚œ ํ–‰๋งŒ delta table์— merge ๋˜๊ฒŒ ๊ฐœ์„ ํ•ด์•ผ ํ•  ๊ฒƒ์ด๋‹ค.

 

Detla Lake์˜ Versioning

ํ›Œ๋ฅญํ•˜์‹  ๋ถ„๋“ค์ด ๋งŒ๋“ค์–ด ์ฃผ์‹  Delta Lake์—์„œ๋Š” Table Protocol Versioning์ด๋ผ๋Š” ๊ธฐ๋Šฅ์ด ์žˆ๋‹ค.

Delta Table์˜ ํŠธ๋žœ์žญ์…˜ ๋กœ๊ทธ์—๋Š” ๊ฐ ๋ฒ„์ „์ด ๊ธฐ๋ก๋˜๋ฉฐ, ์ด๋ฅผ ์•„๋ž˜์™€ ๊ฐ™์€ ์ฟผ๋ฆฌ๋กœ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

describe history data_strategy.product.test_delta;

 
๊ทธ๋Ÿผ ์•„๋ž˜์™€ ๊ฐ™์ด, ๋ฒ„์ „์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค. (์ž์„ธํ•œ ์ •๋ณด๋Š” ์ƒ๋žตํ–ˆ๋‹ค.)

version timestamp userId userName operation

77 2024-11-29T 23:06:08.000 4627685283376877 user OPTIMIZE
76 2024-11-29T 23:05:52.000 4627685283376877 user MERGE
75 2024-11-28T 23:06:21.000 4627685283376877 user OPTIMIZE
74 2024-11-28T 23:05:59.000 4627685283376877 user MERGE

 

์ด ๋ฒ„์ €๋‹์„ ์‚ฌ์šฉํ•˜๋ ค๋ฉด, delta table ์ƒ์„ฑ ํ›„ ์•„๋ž˜์™€ ๊ฐ™์€ ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•ด ์ฃผ๋ฉด ๋œ๋‹ค.

ALTER TABLE data_strategy.product.test_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true);

 

Versioning ํ™œ์šฉํ•œ Delta Table Merge

์ด์ œ Detla Table์˜ versioning๊นŒ์ง€ ์‚ฌ์šฉ ์™„๋ฃŒ ํ–ˆ์œผ๋‹ˆ, ๋ณ€๊ฒฝ ๋ณธ๋งŒ merge ํ•˜๋ฉด ๋œ๋‹ค.

ํ•ต์‹ฌ์€ ๊ฐ€๊ณต๋œ ํ˜„์žฌ ๋ฐ์ดํ„ฐ์™€ ๊ธฐ์กด delta์— ์Œ“์ธ ๋ฐ์ดํ„ฐ (์Šค๋ƒ…์ˆ) ๊ณผ์˜ ๋น„๊ต๊ฐ€ ๊ด€๊ฑด์ด๋‹ค.

 

test_source_df = test_df
test_source_view = source_df.createOrReplaceTempView('source_view')

test_source_view_columns = test_source_view.columns

test_source_view_condition = ""
for column in test_source_view_columns[:-2]:
    # ์ปฌ๋Ÿผ ํƒ€์ž… ๊ฐ€์ ธ์˜ค๊ธฐ
    column_type = dict(test_source_df.dtypes)[column]
    
    # ์ปฌ๋Ÿผ ํƒ€์ž…์— ๋”ฐ๋ผ ๋‹ค๋ฅด๊ฒŒ ์ฒ˜๋ฆฌ
    if column_type.startswith("map"):  # ์ปฌ๋Ÿผ ํƒ€์ž…์ด MAP์ธ ๊ฒฝ์šฐ
        test_source_view_condition += f"CAST(coalesce(target.{column}, map()) AS STRING) <> CAST(coalesce(source.{column}, map()) AS STRING) OR\n"
    elif column_type.startswith("timestamp"): # ์ปฌ๋Ÿผ ํƒ€์ž…์ด timestamp์ธ ๊ฒฝ์šฐ
        test_source_view_condition += f"ifnull(target.{column}, current_timestamp()) <> ifnull(source.{column}, current_timestamp()) OR\n"
    else:  # ์ปฌ๋Ÿผ ํƒ€์ž…์ด MAP์ด ์•„๋‹Œ ๊ฒฝ์šฐ
        test_source_view_condition += f"(target.{column} IS DISTINCT FROM source.{column}) OR\n"  # NULL ๋น„๊ต๋ฅผ `IS DISTINCT FROM`์œผ๋กœ ์ฒ˜๋ฆฌ


test_source_view_condition = test_source_view_condition[:-3]

print(test_source_view_condition)

 

์œ„์™€ ๊ฐ™์ด, test_df (๊ฐ€๊ณต๋œ dataframe)์„ ๊ธฐ๋ฐ˜์œผ๋กœ ์ž„์‹œ view๋ฅผ ์ƒ์„ฑํ•˜๊ณ ,

target๊ณผ ๋น„๊ตํ•˜๋ฉฐ, ๊ฐ ์นผ๋Ÿผ์„ ์ˆœํšŒํ•˜๋ฉฐ ๋ณ€๊ฒฝ ๋ณธ์„ ๊ฐ์ง€ํ•˜๊ธฐ ์œ„ํ•œ String์„ ๋งŒ๋“ ๋‹ค.

(target.id IS DISTINCT FROM source.id) OR
(target.registration_number IS DISTINCT FROM source.registration_number) OR
(target.public_id IS DISTINCT FROM source.public_id) OR
(target.name IS DISTINCT FROM source.name) OR
(target.industry IS DISTINCT FROM source.industry)

 

์œ„์™€ ๊ฐ™์ด print ๋ฌธ์— ์ฐํžŒ ๊ฒฐ๊ณผ๋ฅผ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค.

์ด๋ฅผ ํ†ตํ•ด, ์‹ค์ œ Delta์— Merge๋ฅผ ์ง„ํ–‰ํ•ด ๋ณด์ž.

 

spark.sql("""
MERGE INTO {0} AS target
USING test_source_view AS source
ON 1=1
AND target.registration_number = source.registration_number
WHEN MATCHED AND ({1})
THEN
  UPDATE SET 
    target.registration_number = source.registration_number,
    target.name = source.name,
    target.industry = source.industry,
    target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
  INSERT *
WHEN NOT MATCHED BY SOURCE THEN
  DELETE""".format(test_delta_table_name, test_source_view_condition)).show()

 

์œ„์™€ ๊ฐ™์€ ํ˜•ํƒœ๋กœ ๋ณ€๊ฒฝ ๋ณธ์— ํ•œํ•ด์„œ Case When ๋ฌธ์„ ์‚ฌ์šฉํ•ด merge๋ฅผ ์ง„ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค.

๊ทธ๋ ‡๊ฒŒ ๊ฒฐ๊ณผ๋Š” ์•„๋ž˜์™€ ๊ฐ™์ด ์ถœ๋ ฅ๋œ๋‹ค.

 

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|            10404|            9728|             190|              486|
+-----------------+----------------+----------------+-----------------+

 

๊ทธ๋Ÿผ ์‹ค์ œ๋กœ ์—…๋ฐ์ดํŠธ๋œ ๋ฐ์ดํ„ฐ๋งŒ delta table์— ๋ฐ˜์˜์ด ๋  ํ…Œ๊ณ , ์ด๋ฅผ ์ฝ๊ณ  sink๋งŒ ์ง„ํ–‰ํ•˜๋ฉด ๋œ๋‹ค.

์ตœ์‹  ๋ฒ„์ „๊ณผ ๊ทธ ์ด์ „ ๋ฒ„์ „์„ ์กฐํšŒํ•˜๋ฉฐ delete ํ•  ๋Œ€์ƒ๊ณผ upsert ํ•  ์ฟผ๋ฆฌ๋ฅผ ๋งŒ๋“ค๋ฉด ๋” ๊ธˆ์ƒ์ฒจํ™”์ผ ๊ฒƒ์ด๋‹ค.

 

๊ฒฐ๋ก 

Delta Lake์˜ ์„ธ๊ณ„๋Š” ์•Œ๋ฉด ์•Œ ์ˆ˜๋ก ๋ฌด๊ถ๋ฌด์ง„ํ•˜๋‹ค.

์•Œ๋ฉด ์•Œ ์ˆ˜๋ก ์‹ ๊ธฐํ•˜์ง€๋งŒ, ์ •๋ง ๊ณต๋ถ€ํ•  ๊ฒŒ ๋งŽ๋‹ค๋Š” ๊ฒƒ์„ ๊นจ๋‹ฌ์•˜๋‹ค.

๋ฐฉ๋Œ€ํ•œ ๋ฐ์ดํ„ฐ๋ฅผ ์‰ฝ๊ฒŒ ๋‹ค๋ฃจ๊ธฐ ์œ„ํ•œ API์— ๋Œ€ํ•œ ๊ณต๋ถ€๋งŒ์„ ํ•˜๊ธฐ์—๋„ ์˜ค๋ž˜ ๊ฑธ๋ฆฌ๊ฒ ์ง€๋งŒ, ์‹ค์ œ๋กœ ์–ด๋–ป๊ฒŒ ๋™์ž‘ํ•˜๋Š”์ง€๋„ ๊ถ๊ธˆํ•ด์ง„๋‹ค.

 

ํ˜„์žฌ๋Š” ๋ฐ์ดํ„ฐ ์ถ”์ถœ ๋ฐ ๊ฐ€๊ณต ๋ฐ ์‹ฑํฌ๋งŒ ํ•˜๊ณ  ์žˆ์ง€๋งŒ, ์–ธ์  ๊ฐ€ ์˜๋ฏธ ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ ๋งŒ๋“ค์–ด๋‚ด๋Š” ์—ญํ• ๋„ ํ•œ๋ฒˆ ํ•ด๋ณด๊ณ  ์‹ถ๋‹ค.

Delta Lake๋ฅผ ๊ณต๋ถ€ํ•˜๋Š” ๋ถ„๋“ค๊ป˜ ๋งŽ์€ ๋„์›€์ด ๋˜๊ธธ ๋ฐ”๋ž๋‹ˆ๋‹ค.

 

๋ฐ˜์‘ํ˜•