Commit e37eeca5 by chenyuanjie

月流程更新parent_asin_latest表

parent cc57a012
......@@ -474,20 +474,13 @@ class DimAsinDetail(object):
# 处理parent_asin下最新变体信息
def handle_latest_variation_info(self):
if self.date_type in ['month', 'month_week', 'month_aba_me'] and self.date_info >= '2024-06':
max_report_sql = f"""
SELECT MAX(date_info) as table_date_info FROM {self.doris_db}.{self.parent_asin_latest_detail_table}
"""
df_date_info = DorisHelper.spark_import_with_sql(self.spark, query=max_report_sql)
table_date_info = df_date_info.take(1)[0]['table_date_info']
print("doris中记录最新的日期为:", table_date_info)
if self.date_info >= table_date_info:
df_asin_variat = self.df_asin_detail.filter("parent_asin is not null").select("parent_asin", "variat_list_change", "created_time")
latest_asin_window = Window.partitionBy('parent_asin').orderBy(
F.desc_nulls_last("created_time")
)
df_asin_variat = df_asin_variat.withColumn("p_rank", F.row_number().over(window=latest_asin_window))
df_asin_variat = df_asin_variat.filter("p_rank = 1").drop("p_rank")
df_asin_variat =df_asin_variat.filter(F.size("variat_list_change") > 0). \
df_asin_variat = df_asin_variat.filter(F.size("variat_list_change") > 0). \
select("parent_asin", "created_time", F.explode("variat_list_change").alias("variant_attribute")). \
select("parent_asin", "created_time", F.col("variant_attribute")[0].alias("asin"),
F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"),
......@@ -499,14 +492,15 @@ class DimAsinDetail(object):
)
print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select(
"parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info",
"parent_asin",
F.lit(self.date_info).alias("date_info"),
# Doris 新表 asin_crawl_date 是 DATETIME,需 string → timestamp 显式转
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"variation_info", "attr_info",
F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
else:
print("不用导出旧数据到doris中")
pass
else:
pass
# 字段标准化及存储
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment