Commit 0c315139 by chenyuanjie

每日榜单asin指标写入Doris

parent 957e48f8
......@@ -19,6 +19,7 @@ from utils.common_util import CommonUtil
from datetime import datetime, timedelta
from utils.hdfs_utils import HdfsUtils
from utils.secure_db_client import get_remote_engine
from utils.DorisHelper import DorisHelper
# 数字/虚拟类目排除列表 — 筛选条件用
EXCLUDE_CATEGORIES = (
......@@ -264,33 +265,28 @@ class DwdNsrBsrKeepaAsin(Templates):
CommonUtil().send_wx_msg(users=users, title=title, content=content)
if self.date_type == 'day':
# 更新30day分区
hdfs_day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info})
hdfs_30day = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day", "date_info": "1970-01"})
hdfs_30day_copy = CommonUtil.build_hdfs_path(self.db_save_cate, {"site_name": self.site_name, "date_type": "30day_copy", "date_info": "1970-01"})
if not HdfsUtils.path_exist(hdfs_day):
print(f"源目录不存在: {hdfs_day}")
wx_users = ['fangxingjun', 'chenyuanjie']
wx_msg = f"{hdfs_day} 目录数据不存在,请检查!"
CommonUtil.send_wx_msg(wx_users, "复制数据至30day分区", wx_msg)
sys.exit(1)
if HdfsUtils.path_exist(hdfs_30day_copy):
print(f"中间目录已存在,先清空: {hdfs_30day_copy}")
HdfsUtils.delete_file_in_folder(hdfs_30day_copy)
else:
print(f"中间目录不存在,创建: {hdfs_30day_copy}")
HdfsUtils.create_if_not_exist(hdfs_30day_copy)
os.system(f"hdfs dfs -cp {hdfs_day}/* {hdfs_30day_copy}/")
files = HdfsUtils.read_list(hdfs_30day_copy)
print(f"中间目录文件数: {len(files) if files else 0}")
if not HdfsUtils.path_exist(hdfs_30day):
print(f"目标目录不存在,创建: {hdfs_30day}")
HdfsUtils.create_if_not_exist(hdfs_30day)
HdfsUtils.exchange_path(hdfs_30day_copy, hdfs_30day)
print(f"交换完成! 30day与30day_copy已互换")
# 4. 修复hive元数据
CommonUtil.hive_cmd_exec(f"set hive.msck.path.validation=ignore; msck repair table big_data_selection.{self.db_save_cate};")
# 写入 Doris selection.dim_asin_source_flag(按 date_info 日分区,dynamic_partition 自动清理过期分区)
print(f"写入 Doris selection.dim_asin_source_flag, date_info={self.date_info}")
df_to_doris = self.df_save_asin_cate.select(
F.col("date_info").cast("date").alias("date_info"),
"asin",
"site_name",
F.to_json(
F.expr("transform(split(asin_cate_flag, ','), x -> cast(x as int))")
).alias("asin_cate_flag"),
F.col("bsr_latest_date").cast("date").alias("bsr_latest_date"),
"bsr_30day_count",
F.col("nsr_latest_date").cast("date").alias("nsr_latest_date"),
"nsr_30day_count",
)
table_columns = "date_info, asin, site_name, asin_cate_flag, bsr_latest_date, bsr_30day_count, nsr_latest_date, nsr_30day_count"
DorisHelper.spark_export_with_columns(
df_save=df_to_doris,
db_name='selection',
table_name='dim_asin_source_flag',
table_columns=table_columns,
)
print("Doris selection.dim_asin_source_flag 写入完毕")
# def save_data(self):
# pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment