Commit 9e362c9c by chenyuanjie

fix

parent 4f3cf1f3
...@@ -504,7 +504,8 @@ class DimAsinDetail(object): ...@@ -504,7 +504,8 @@ class DimAsinDetail(object):
F.first("created_time").alias("asin_crawl_date"), F.first("created_time").alias("asin_crawl_date"),
F.concat_ws(',', F.collect_list("asin")).alias("variation_info"), F.concat_ws(',', F.collect_list("asin")).alias("variation_info"),
F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info") F.to_json(F.collect_list(F.struct(F.col("color"), F.col("size"), F.col("style")))).alias("attr_info")
) ).repartition(100).persist(StorageLevel.DISK_ONLY)
print(f"父ASIN变体聚合数量:{df_asin_variat_agg.count()}")
print("导出父ASIN最新变体信息到doris:") print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select( df_doris = df_asin_variat_agg.select(
"parent_asin", "parent_asin",
...@@ -515,6 +516,7 @@ class DimAsinDetail(object): ...@@ -515,6 +516,7 @@ class DimAsinDetail(object):
F.current_timestamp().alias("updated_at")) F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at" table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
df_asin_variat_agg.unpersist()
else: else:
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment