Commit 1589eba9 by chenyuanjie

榜单asin导出Doris

parent d9e2d8b9
......@@ -272,13 +272,16 @@ class DwdNsrBsrKeepaAsin(Templates):
content = f"整合asin完成(爬虫无需抓取)--数量: {self.df_save.count()}"
CommonUtil().send_wx_msg(users=users, title=title, content=content)
if self.date_type == 'day':
# 写入 Doris dwd.dwd_asin_source_flag(按 date_info 日分区,dynamic_partition 自动清理过期分区)
print(f"写入 Doris dwd.dwd_asin_source_flag, date_info={self.date_info}")
if self.date_type in ('day', 'month'):
# 写入 Doris dwd.dwd_asin_source_flag(LIST 三元组分区 site_name + date_type + date_info,全保留)
# day 分区:date_info=yyyy-MM-dd
# month 分区:date_info=yyyy-MM
print(f"写入 Doris dwd.dwd_asin_source_flag, date_type={self.date_type}, date_info={self.date_info}")
df_to_doris = self.df_save_asin_cate.select(
F.col("date_info").cast("date").alias("date_info"),
"asin",
"site_name",
F.col("site_name"),
F.lit(self.date_type).alias('date_type'),
F.col("date_info"),
F.col("asin"),
F.to_json(
F.expr("transform(split(asin_cate_flag, ','), x -> cast(x as int))")
).alias("asin_cate_flag"),
......@@ -287,7 +290,7 @@ class DwdNsrBsrKeepaAsin(Templates):
F.col("nsr_latest_date").cast("date").alias("nsr_latest_date"),
"nsr_30day_count",
)
table_columns = "date_info, asin, site_name, asin_cate_flag, bsr_latest_date, bsr_30day_count, nsr_latest_date, nsr_30day_count"
table_columns = "site_name, date_type, date_info, asin, asin_cate_flag, bsr_latest_date, bsr_30day_count, nsr_latest_date, nsr_30day_count"
DorisHelper.spark_export_with_columns(
df_save=df_to_doris,
db_name='dwd',
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment