Commit ca996d62 by chenyuanjie

fix

parent 53d3100f
...@@ -435,8 +435,9 @@ class KafkaFlowAsinDetail(Templates): ...@@ -435,8 +435,9 @@ class KafkaFlowAsinDetail(Templates):
) )
print("导出父ASIN最新变体信息到doris:") print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select( df_doris = df_asin_variat_agg.select(
"parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info") "parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info",
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info" F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
df_doris.unpersist() df_doris.unpersist()
return df return df
......
...@@ -434,8 +434,9 @@ class KafkaRankAsinDetail(Templates): ...@@ -434,8 +434,9 @@ class KafkaRankAsinDetail(Templates):
) )
print("导出父ASIN最新变体信息到doris:") print("导出父ASIN最新变体信息到doris:")
df_doris = df_asin_variat_agg.select( df_doris = df_asin_variat_agg.select(
"parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info") "parent_asin", F.lit(self.date_info).alias("date_info"), "asin_crawl_date", "variation_info", "attr_info",
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info" F.current_timestamp().alias("updated_at"))
table_columns = "parent_asin, date_info, asin_crawl_date, variation_info, attr_info, updated_at"
DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns) DorisHelper.spark_export_with_columns(df_save=df_doris, db_name=self.doris_db, table_name=self.parent_asin_latest_detail_table, table_columns=table_columns)
df_doris.unpersist() df_doris.unpersist()
return df return df
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment