Commit f7096a8c by fangxingjun

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents e521edcf e37eeca5
...@@ -474,20 +474,13 @@ class DimAsinDetail(object): ...@@ -474,20 +474,13 @@ class DimAsinDetail(object):
# 处理parent_asin下最新变体信息 # 处理parent_asin下最新变体信息
def handle_latest_variation_info(self): def handle_latest_variation_info(self):
if self.date_type in ['month', 'month_week', 'month_aba_me'] and self.date_info >= '2024-06': if self.date_type in ['month', 'month_week', 'month_aba_me'] and self.date_info >= '2024-06':
max_report_sql = f"""
SELECT MAX(date_info) as table_date_info FROM {self.doris_db}.{self.parent_asin_latest_detail_table}
"""
df_date_info = DorisHelper.spark_import_with_sql(self.spark, query=max_report_sql)
table_date_info = df_date_info.take(1)[0]['table_date_info']
print("doris中记录最新的日期为:", table_date_info)
if self.date_info >= table_date_info:
df_asin_variat = self.df_asin_detail.filter("parent_asin is not null").select("parent_asin", "variat_list_change", "created_time") df_asin_variat = self.df_asin_detail.filter("parent_asin is not null").select("parent_asin", "variat_list_change", "created_time")
latest_asin_window = Window.partitionBy('parent_asin').orderBy( latest_asin_window = Window.partitionBy('parent_asin').orderBy(
F.desc_nulls_last("created_time") F.desc_nulls_last("created_time")
) )
df_asin_variat = df_asin_variat.withColumn("p_rank", F.row_number().over(window=latest_asin_window)) df_asin_variat = df_asin_variat.withColumn("p_rank", F.row_number().over(window=latest_asin_window))
df_asin_variat = df_asin_variat.filter("p_rank = 1").drop("p_rank") df_asin_variat = df_asin_variat.filter("p_rank = 1").drop("p_rank")
df_asin_variat =df_asin_variat.filter(F.size("variat_list_change") > 0). \ df_asin_variat = df_asin_variat.filter(F.size("variat_list_change") > 0). \
select("parent_asin", "created_time", F.explode("variat_list_change").alias("variant_attribute")). \ select("parent_asin", "created_time", F.explode("variat_list_change").alias("variant_attribute")). \
select("parent_asin", "created_time", F.col("variant_attribute")[0].alias("asin"), select("parent_asin", "created_time", F.col("variant_attribute")[0].alias("asin"),
F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"), F.col("variant_attribute")[1].alias("color"), F.col("variant_attribute")[3].alias("size"),
...@@ -499,14 +492,15 @@ class DimAsinDetail(object): ...@@ -499,14 +492,15 @@ class DimAsinDetail(object):
) )
print("导出父ASIN最新变体信息到doris:") print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select( df_doris = df_asin_variat_agg.select(
"parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info", "parent_asin",
F.lit(self.date_info).alias("date_info"),
# Doris 新表 asin_crawl_date 是 DATETIME,需 string → timestamp 显式转
F.to_timestamp(F.col("asin_crawl_date")).alias("asin_crawl_date"),
"variation_info", "attr_info",
F.current_timestamp().alias("updated_at")) F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at" table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
else: else:
print("不用导出旧数据到doris中")
pass
else:
pass pass
# 字段标准化及存储 # 字段标准化及存储
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment