Commit 1c7acf03 by chenyuanjie

fix

parent 67cc3704
......@@ -8,8 +8,8 @@ description: 利润率数据增量同步 + 去重整合 — 一站式 PG → Hiv
2) Spark 读当日 sqoop 增量 + Hive 历史所有分区,按 (asin, price) 去重
排序键:updated_time desc 保留最新一行
3) 覆盖当日分区为整合后全量快照;写入校验通过后删除所有 < 今日 的历史分区
4) 整合后的当日数据写入 Doris dwd.dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=asin_crawl_date 自动取最新
4) 当日 sqoop 增量(不含历史回灌)写入 Doris dwd.dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=update_time 自动取最新
执行示例: spark-submit dim_asin_profit_rate_info.py us 2026-05-15
"""
import os
......@@ -87,7 +87,8 @@ class DimAsinProfitRateInfo(object):
WHERE site_name = '{self.site_name}' AND date_info = '{self.date_info}'
"""
print(f"sql_today =\n{sql_today}")
self.df_today = self.spark.sql(sqlQuery=sql_today).repartition(40, 'asin', 'price')
# cache:save_data 会 DROP PARTITION 当日分区,write_to_doris 还要复用此 df,必须先物化
self.df_today = self.spark.sql(sqlQuery=sql_today).repartition(40, 'asin', 'price').cache()
sql_history = f"""
SELECT asin, price, category, ocean_profit, air_profit,
......@@ -181,10 +182,10 @@ class DimAsinProfitRateInfo(object):
self.df_history.unpersist()
def write_to_doris(self):
"""整合后的当日数据写 Doris dwd_asin_profit_rate_latest
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=asin_crawl_date 自动按抓取时间取最新
"""当日 sqoop 增量数据写 Doris dwd_asin_profit_rate_latest(不回灌历史)
Doris UNIQUE KEY(site_name, asin, price) + sequence_col=update_time 自动按更新时间取最新
"""
df_to_doris = self.df_save.select(
df_to_doris = self.df_today.select(
F.lit(self.site_name).alias('site_name'),
F.col('asin'),
F.round(F.col('price'), 2).cast('decimal(20,2)').alias('price'),
......@@ -194,7 +195,7 @@ class DimAsinProfitRateInfo(object):
F.to_timestamp(F.col('updated_time')).alias('update_time'),
).cache()
count = df_to_doris.count()
print(f"写入 Doris 数据量:{count:,}")
print(f"写入 Doris 增量数据量:{count:,}")
df_to_doris.show(10, truncate=False)
table_columns = "site_name, asin, price, ocean_profit, air_profit, asin_crawl_date, update_time"
......@@ -205,6 +206,7 @@ class DimAsinProfitRateInfo(object):
table_columns=table_columns,
)
df_to_doris.unpersist()
self.df_today.unpersist()
self.df_save.unpersist()
print("success!")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment