Commit 422564cd by chenyuanjie

流量选品近3个月每日刷新

parent 95b97877
"""
author: CT
description: 流量选品月物化表刷新脚本——批量刷新近 3 月 selection.{site}_flow_asin_month_{yyyy_mm}
【场景】
月物化表(由 dwt_flow_asin_month.py 每月跑一次)依赖的上游(利润率/keepa/asin_type/分类规则/user_mask/...)
在月度计算完毕后仍会持续更新,需要定时刷新近 3 月物化数据让最新关联结果落到 selection 表
【流程】
1) MySQL workflow_everyday.MAX(report_date) WHERE date_type='month' AND page='流量选品' → 取最大月份
2) 从最大月份向前推 3 月(含当月)→ 得到 3 个 date_info
3) 站点列表(SITES)枚举 × 近 3 月,循环执行 INSERT OVERWRITE
4) INSERT OVERWRITE SQL 复用 dwt_flow_asin_month.build_insert_overwrite_sql
【无入参】
不接收 sys.argv,所有逻辑动态推断;海豚定时调度直接执行 spark-submit 即可
执行示例:
spark-submit dwt_flow_asin_month_refresh.py
"""
import os
import sys
from datetime import datetime
from dateutil.relativedelta import relativedelta
sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from doris_handle.dwt_flow_asin_month import (
_exec_doris_sql,
build_insert_overwrite_sql,
)
# 当前仅刷新 us; 后续扩展只需在此处加站点
SITES = ['us']
def get_recent_3_months():
"""通过 MySQL workflow_everyday 拿最大 report_date 向前推 3 月(含当月)
参考: export_keepa_asin_del.py / export_need_profit_rate.py
返回: ['yyyy-MM', 'yyyy-MM', 'yyyy-MM'] 从早到晚排序
"""
# us 站点 mysql 为权威源(三站点 mysql 数据一致)
sql_max_month = (
"select MAX(report_date) as date_info from workflow_everyday "
"where site_name = 'us' and date_type = 'month' and page = '流量选品'"
)
print(f"sql_max_month = {sql_max_month}")
spark = SparkUtil.get_spark_session("DwtFlowAsinMonthRefresh: probe_max_month")
mysql_con = DBUtil.get_connection_info('mysql', 'us')
max_date_info = SparkUtil.read_jdbc_query(
session=spark, url=mysql_con['url'],
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql_max_month,
).collect()[0]['date_info']
print(f"workflow_everyday 最大 report_date: {max_date_info}")
assert max_date_info is not None, "workflow_everyday 流量选品月度记录为空, 无法推断近 3 月窗口"
base_dt = datetime.strptime(str(max_date_info), '%Y-%m')
months = [(base_dt - relativedelta(months=i)).strftime('%Y-%m') for i in range(2, -1, -1)]
print(f"近 3 月窗口: {months}")
return months
def refresh_one_month(site_name, date_info):
"""单站点 + 单月份的刷新: 直接 INSERT OVERWRITE (表已由月度主流程建好,不需要再建表)"""
date_info_underscore = date_info.replace('-', '_')
selection_table = f"{site_name}_flow_asin_month_{date_info_underscore}"
print(f"\n========== 刷新 site={site_name}, date_info={date_info}, table=selection.{selection_table} ==========")
print(f"[执行] Doris INSERT OVERWRITE selection.{selection_table}")
_exec_doris_sql([build_insert_overwrite_sql(site_name, selection_table, date_info)])
print(f"[完成] selection.{selection_table} 刷新成功")
def main():
months = get_recent_3_months()
print(f"\n站点列表: {SITES}")
print(f"刷新月份: {months}")
print(f"总刷新任务数: {len(SITES) * len(months)}")
success_list = []
failed_list = []
for site_name in SITES:
for date_info in months:
try:
refresh_one_month(site_name, date_info)
success_list.append(f"{site_name}/{date_info}")
except Exception as e:
print(f"[失败] site={site_name}, date_info={date_info}, 原因: {e}")
failed_list.append(f"{site_name}/{date_info}: {e}")
print("\n========== 刷新结果汇总 ==========")
print(f"成功 {len(success_list)} 个: {success_list}")
if failed_list:
print(f"失败 {len(failed_list)} 个:")
for item in failed_list:
print(f" - {item}")
# 有失败时以非 0 退出码退出, 让海豚捕获
sys.exit(1)
print("success!")
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment