๋๋ ๋ฐฑ์๋ ์์ง๋์ด์ด์ง๋ง, Databricks ํ์ฉํ ๋ฐ์ดํฐ ์์ง๋์ด๋ง ์ ๋ฌด๋ ๊ฒธํ๊ณ ์๋ค.
์ถํ ๊ฐ๋ฐ ์ปค๋ฆฌ์ด๋ฅผ ๋ฐ์ดํฐ ์ชฝ์ผ๋ก ์ ํฅํ๊ณ ์ถ๊ธฐ๋ ํ์ฌ, ํ์ฌ ํ์ฌ์ ์ ์ฌ ์ดํ ์ง์์ ์ธ ๋ฉด๋ด์์ ๋ฐ์ดํฐ ์ ๋ฌด๋ฅผ ํ๊ณ ์ถ๋ค๊ณ ์ ๊ทน์ ์ผ๋ก ์ดํ์ ํ๊ณ , ์ฑํฐ ๋ฆฌ๋๋ถ์ ์ด๋ฅผ ํ์พํ ๋ฐ์๋ค์ฌ์ฃผ์ จ๋ค. ์ํ๋ ์ ๋ฌด๋ฅผ ๊ฒธํ๊ฒ ๋์ด ์ ๋ง ํ๋ณตํ๋ค.
Databricks๋ฅผ ์ฌ์ฉํ๋ฉฐ ์ฌ์ฉํ๋ API๋ Apache Spark๋ฅผ ์ฌ์ฉํ๋ค.
์ฌ์ค ์ ๊ทน์ ์ธ ๋ฐ์ดํฐ ์์ง๋์ด๋ง๋ณด๋ค๋ ๋ฐ์ดํฐ ํ๋ซํผ, ๋ฐ์ดํฐ ๋ถ์ํ์์ ๋ง๋ค์ด์ฃผ์ Raw Data๋ฅผ ์๋น์ค์ ๋ง๊ฒ ๊ฐ๊ณตํ์ฌ Delta table์ ์ ์ฌํ ํ, ์ด๋ฅผ DB Sink ํ๋ ์ฉ๋์ ๊ฐ๋ฐ์ ์งํํ๊ณ ์๋ค.
์ฒ์ ํ์ดํ๋ผ์ธ์ ๊ฐ๋ฐํ์ ๋๋, ์๋ฌด๊ฒ๋ ๋ชจ๋ฅด๋ ์ํ์์ ๊ณต๋ถํ๋ฉฐ ์งํํ์๊ธฐ์ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ DB์ upsert ์น๋ ๋ฐฉ์์ผ๋ก ๊ฐ๋ฐ์ ํ๋ค.
๊ฐ๋ฐ์ ์๋ฃํ๊ณ ๋์ ์ง๋ฉดํ ๋ฌธ์ ๋ ์ค์ ๋ก ๋ณ๊ฒฝ์ด ๋ ๋ฐ์ดํฐ๊ฐ ์๋ ๋ชจ๋ ๋ฐ์ดํฐ๊ฐ update๋ ๋ค๋ ๊ฒ์ ์์๊ณ ,
sink ๋ ๋ฐ์ดํฐ๋ค์ ์ ํฉ์ฑ์ด ๊ทธ๋ฆฌ ์ข์ง ์๊ฒ ๋ค๋ ๋จ์ ๊ณผ, sink ๋๋ ๋ฐ์ดํฐ์ ์์ด ๋ง๋ค๋ ์ ์ด ๋ฌธ์ ์๋ค.
์ด๋ฅผ ์ด๋ป๊ฒ ๊ฐ์ ํ ์ ์์๊น๋ผ๋ ์๊ฐ์ ํ๊ณ ,
์ฒ์ ์๊ฐํ ๋ฐฉ์์ ์ค์ ๋ก ๋ณ๊ฒฝ ๋ ๋ฐ์ดํฐ๋ค์ ๋ชจ์๋๋ Delta table์ ๋ฐ๋ก ์์ฑํด์ ์ ์ฌํด ๋๊ณ , ์ํ ์นผ๋ผ์ ๋์ด ํด๊ฒฐํ ์ ์๊ฒ ๋ค๋ ์๊ฐ์ ํ์ง๋ง, spark api๋ ๋ถ๋ช ์ด๋ฐ ๋ฌธ์ ์ ๋์ํ ์ ์๋ ๊ธฐ๋ฅ์ ์ ๊ณตํ๋ฆฌ๋ผ ์๊ฐํ๋ค.
์ด๋ ๋ , ํ์ฌ ๊ฐ์ ํ๋ก์ ํธ๋ฅผ ์งํํ๋ฉฐ ํ์ ํ๋ ๋ฐ์ดํฐ ์ฌ์ด์ธํฐ์คํธ๋ถ๊ป์ DB sink์ ์ค์ ๋ณ๊ฒฝ ๋ ๋ฐ์ดํฐ๋ง์ ํํฐ ํ ์ ์๊ฒ ๋ชจ์๋ฅผ ์ค์ผ ์ ์๋ ๊ธฐ๋ฅ์ ๋ํด ๊ณต์ ๋ฅผ ํด์ฃผ์๋ ํ์์ ์ด๋๋ฅผ ํด์ฃผ์ จ๊ณ , ์ด๋ delta lake์ ๋ํด ๋ ์ ์๊ฒ ๋์๋ค.
๊ทธ๋ ๊ฒ ์๊ฒ ๋ ๊ธฐ๋ฅ์ ํ ๋๋ก ์ค์ ์ ๋ฐ์ดํธ๊ฐ ๋ ๋ฐ์ดํฐ๋ค์ ํํด sink๋ฅผ ์งํํ ์ ์๊ฒ ์์ ๊ฐ๋ฐํ์๊ณ , ํ์ฌ ์ด์ ํ๊ฒฝ์์ ์ ์์ ์ผ๋ก db sink๋๋ ๋ฐฐ์น๋ก ์ด์ํ๊ฒ ๋์๋ค.
๊ทธ๋์ ๊ทธ ํด๊ฒฐ ๋ฐฉ๋ฒ์ ํ๋ฒ ๊ณต์ ํด๋ณด๋ คํ๋ค.
๋ฐฐ์น ํ๋ก์ธ์ค ์ค๋ช
1. ๋ฐ์ดํฐ ๋ถ์ํ์์ ์๋ Master Delta Table ์ ๋งค์ผ ์ค์ ์ ์ ๋ฐ์ดํธ๋๋ Job ์ด ์คํ๋๋ค.
2. ๋๋ ๋งค์ผ ํด๋น Job ํด๋น Master Delta Table์์ ํ์ํ ๋ฐ์ดํฐ๋ฅผ ์ถ์ถ, ๊ฐ๊ณตํ์ฌ ๋ฐฑ์๋ ์๋น์ค์ ์ฌ์ฉํ DB ํ ์ด๋ธ ์คํค๋ง์ ๋์ผํ ํํ์ Delta Table ์ ์๋๋ค.
3. ์์ธ Delta Table์์, ์ค๋ ์ ๋ฐ์ดํธ๊ฐ ๋ ๋ฐ์ดํฐ๋ฅผ ๋ชจ๋ DB์ sink ํ๋ค.
๋ด๊ฐ ์งํํ๋ ๋จ๊ณ๋ 2, 3 ๋จ๊ณ์ด๋ค.
๋ฌธ์ ๊ฐ ์์๋๋ ๋ถ๋ถ์ 2๋ฒ์ด๋ค.
๋๋ Delta Table์ ๊ฐ๊ณต๋ ๋ฐ์ดํฐ๋ฅผ ์์ ๋, ์๋์ ๊ฐ์ ํจ์๋ก ์ฌ์ฉํ๋ ํธ์ด์๋ค.
def write_to_delta(table: str, df: DataFrame):
try:
(
df
.write
.format("delta")
.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable(table)
)
except Exception as e:
print(f"delta table insert ์ค ์์ธ๊ฐ ๋ฐ์ํ์ต๋๋ค: {e}")
๋ฌธ์ ๋ ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ overwrite ํ๋ค๋ ๊ฒ์ด๋ค. ์ ๋ฐ์ดํธ๋์ง ์์ ๋ฐ์ดํฐ๊น์ง ๋ชจ์กฐ๋ฆฌ ๋ฎ์ด ์์์ง๋ค.
ํ์ฌ, 3๋ฒ ๋จ๊ณ์์๋ ์ด ๋ชจ๋ ๋ฐ์ดํฐ๋ฅผ ์ฝ์ด DB์ sink ํ๊ฒ ๋๋ค. (updated_at ์นผ๋ผ ๊ธฐ์ค์ผ๋ก ๋งค์ผ ๊ฐ์ ธ์ sink)
์ด ๋ถ๋ถ์ ์ค์ ์ ๋ฐ์ดํธ๊ฐ ์ผ์ด๋ ํ๋ง delta table์ merge ๋๊ฒ ๊ฐ์ ํด์ผ ํ ๊ฒ์ด๋ค.
Detla Lake์ Versioning
ํ๋ฅญํ์ ๋ถ๋ค์ด ๋ง๋ค์ด ์ฃผ์ Delta Lake์์๋ Table Protocol Versioning์ด๋ผ๋ ๊ธฐ๋ฅ์ด ์๋ค.
Delta Table์ ํธ๋์ญ์ ๋ก๊ทธ์๋ ๊ฐ ๋ฒ์ ์ด ๊ธฐ๋ก๋๋ฉฐ, ์ด๋ฅผ ์๋์ ๊ฐ์ ์ฟผ๋ฆฌ๋ก ํ์ธํ ์ ์๋ค.
describe history data_strategy.product.test_delta;
๊ทธ๋ผ ์๋์ ๊ฐ์ด, ๋ฒ์ ์ ํ์ธํ ์ ์๋ค. (์์ธํ ์ ๋ณด๋ ์๋ตํ๋ค.)
version timestamp userId userName operation
77 2024-11-29T 23:06:08.000 4627685283376877 user OPTIMIZE
76 2024-11-29T 23:05:52.000 4627685283376877 user MERGE
75 2024-11-28T 23:06:21.000 4627685283376877 user OPTIMIZE
74 2024-11-28T 23:05:59.000 4627685283376877 user MERGE
์ด ๋ฒ์ ๋์ ์ฌ์ฉํ๋ ค๋ฉด, delta table ์์ฑ ํ ์๋์ ๊ฐ์ ์ฟผ๋ฆฌ๋ฅผ ์คํํด ์ฃผ๋ฉด ๋๋ค.
ALTER TABLE data_strategy.product.test_delta SET TBLPROPERTIES (delta.enableChangeDataFeed = true);
Versioning ํ์ฉํ Delta Table Merge
์ด์ Detla Table์ versioning๊น์ง ์ฌ์ฉ ์๋ฃ ํ์ผ๋, ๋ณ๊ฒฝ ๋ณธ๋ง merge ํ๋ฉด ๋๋ค.
ํต์ฌ์ ๊ฐ๊ณต๋ ํ์ฌ ๋ฐ์ดํฐ์ ๊ธฐ์กด delta์ ์์ธ ๋ฐ์ดํฐ (์ค๋ ์) ๊ณผ์ ๋น๊ต๊ฐ ๊ด๊ฑด์ด๋ค.
test_source_df = test_df
test_source_view = source_df.createOrReplaceTempView('source_view')
test_source_view_columns = test_source_view.columns
test_source_view_condition = ""
for column in test_source_view_columns[:-2]:
# ์ปฌ๋ผ ํ์
๊ฐ์ ธ์ค๊ธฐ
column_type = dict(test_source_df.dtypes)[column]
# ์ปฌ๋ผ ํ์
์ ๋ฐ๋ผ ๋ค๋ฅด๊ฒ ์ฒ๋ฆฌ
if column_type.startswith("map"): # ์ปฌ๋ผ ํ์
์ด MAP์ธ ๊ฒฝ์ฐ
test_source_view_condition += f"CAST(coalesce(target.{column}, map()) AS STRING) <> CAST(coalesce(source.{column}, map()) AS STRING) OR\n"
elif column_type.startswith("timestamp"): # ์ปฌ๋ผ ํ์
์ด timestamp์ธ ๊ฒฝ์ฐ
test_source_view_condition += f"ifnull(target.{column}, current_timestamp()) <> ifnull(source.{column}, current_timestamp()) OR\n"
else: # ์ปฌ๋ผ ํ์
์ด MAP์ด ์๋ ๊ฒฝ์ฐ
test_source_view_condition += f"(target.{column} IS DISTINCT FROM source.{column}) OR\n" # NULL ๋น๊ต๋ฅผ `IS DISTINCT FROM`์ผ๋ก ์ฒ๋ฆฌ
test_source_view_condition = test_source_view_condition[:-3]
print(test_source_view_condition)
์์ ๊ฐ์ด, test_df (๊ฐ๊ณต๋ dataframe)์ ๊ธฐ๋ฐ์ผ๋ก ์์ view๋ฅผ ์์ฑํ๊ณ ,
target๊ณผ ๋น๊ตํ๋ฉฐ, ๊ฐ ์นผ๋ผ์ ์ํํ๋ฉฐ ๋ณ๊ฒฝ ๋ณธ์ ๊ฐ์งํ๊ธฐ ์ํ String์ ๋ง๋ ๋ค.
(target.id IS DISTINCT FROM source.id) OR
(target.registration_number IS DISTINCT FROM source.registration_number) OR
(target.public_id IS DISTINCT FROM source.public_id) OR
(target.name IS DISTINCT FROM source.name) OR
(target.industry IS DISTINCT FROM source.industry)
์์ ๊ฐ์ด print ๋ฌธ์ ์ฐํ ๊ฒฐ๊ณผ๋ฅผ ํ์ธํ ์ ์๋ค.
์ด๋ฅผ ํตํด, ์ค์ Delta์ Merge๋ฅผ ์งํํด ๋ณด์.
spark.sql("""
MERGE INTO {0} AS target
USING test_source_view AS source
ON 1=1
AND target.registration_number = source.registration_number
WHEN MATCHED AND ({1})
THEN
UPDATE SET
target.registration_number = source.registration_number,
target.name = source.name,
target.industry = source.industry,
target.updated_at = source.updated_at
WHEN NOT MATCHED THEN
INSERT *
WHEN NOT MATCHED BY SOURCE THEN
DELETE""".format(test_delta_table_name, test_source_view_condition)).show()
์์ ๊ฐ์ ํํ๋ก ๋ณ๊ฒฝ ๋ณธ์ ํํด์ Case When ๋ฌธ์ ์ฌ์ฉํด merge๋ฅผ ์งํํ ์ ์๋ค.
๊ทธ๋ ๊ฒ ๊ฒฐ๊ณผ๋ ์๋์ ๊ฐ์ด ์ถ๋ ฅ๋๋ค.
+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
| 10404| 9728| 190| 486|
+-----------------+----------------+----------------+-----------------+
๊ทธ๋ผ ์ค์ ๋ก ์ ๋ฐ์ดํธ๋ ๋ฐ์ดํฐ๋ง delta table์ ๋ฐ์์ด ๋ ํ ๊ณ , ์ด๋ฅผ ์ฝ๊ณ sink๋ง ์งํํ๋ฉด ๋๋ค.
์ต์ ๋ฒ์ ๊ณผ ๊ทธ ์ด์ ๋ฒ์ ์ ์กฐํํ๋ฉฐ delete ํ ๋์๊ณผ upsert ํ ์ฟผ๋ฆฌ๋ฅผ ๋ง๋ค๋ฉด ๋ ๊ธ์์ฒจํ์ผ ๊ฒ์ด๋ค.
๊ฒฐ๋ก
Delta Lake์ ์ธ๊ณ๋ ์๋ฉด ์ ์๋ก ๋ฌด๊ถ๋ฌด์งํ๋ค.
์๋ฉด ์ ์๋ก ์ ๊ธฐํ์ง๋ง, ์ ๋ง ๊ณต๋ถํ ๊ฒ ๋ง๋ค๋ ๊ฒ์ ๊นจ๋ฌ์๋ค.
๋ฐฉ๋ํ ๋ฐ์ดํฐ๋ฅผ ์ฝ๊ฒ ๋ค๋ฃจ๊ธฐ ์ํ API์ ๋ํ ๊ณต๋ถ๋ง์ ํ๊ธฐ์๋ ์ค๋ ๊ฑธ๋ฆฌ๊ฒ ์ง๋ง, ์ค์ ๋ก ์ด๋ป๊ฒ ๋์ํ๋์ง๋ ๊ถ๊ธํด์ง๋ค.
ํ์ฌ๋ ๋ฐ์ดํฐ ์ถ์ถ ๋ฐ ๊ฐ๊ณต ๋ฐ ์ฑํฌ๋ง ํ๊ณ ์์ง๋ง, ์ธ์ ๊ฐ ์๋ฏธ ์๋ ๋ฐ์ดํฐ๋ฅผ ๋ง๋ค์ด๋ด๋ ์ญํ ๋ ํ๋ฒ ํด๋ณด๊ณ ์ถ๋ค.
Delta Lake๋ฅผ ๊ณต๋ถํ๋ ๋ถ๋ค๊ป ๋ง์ ๋์์ด ๋๊ธธ ๋ฐ๋๋๋ค.