Commit c25353de by chenyuanjie

每天刷新利润率趋势图

parent 03fa9a45
"""
author: CT
description: 利润率趋势图刷新脚本——定时刷新中间表近 3 月数据 + Spark 聚合写入趋势物理表
【背景】
利润率会持续变化(同 asin+price 的最新利润率覆盖旧值),需要定时把变化反映到趋势图。
历史月份(< 近 3 月)保持不动,认为已稳定;近 3 月每天刷新一次。
【流程】
Step 0) 动态获取近 3 月(MySQL workflow_everyday.MAX(report_date) 向前推 3 月,含当月)
Step 1) 站点循环 × 月份循环:
INSERT INTO dwt.dwt_asin_profit_rate_history (含 update_time)
数据源: dwt.{site}_flow_asin_month INNER JOIN dwd.dwd_asin_profit_rate_latest
Doris MOW + sequence_col=update_time 自动按更新时间覆盖旧版本,无需 DELETE
Step 2) Spark 端聚合并写入物理表 selection.us_asin_profit_rate_trend
Spark 读 dwt_asin_profit_rate_history(us) → sort by (asin, date_info)
→ groupBy asin → collect_list (Spark 保证有序) → 写 Doris (INSERT OVERWRITE)
避开 Doris 端 GROUP BY + collect 内存爆炸问题
【无入参】
不接收 sys.argv,所有逻辑动态推断;海豚定时调度直接执行 spark-submit 即可
执行示例:
spark-submit dws_asin_profit_rate_trend_refresh.py
"""
import os
import sys
from datetime import datetime
from dateutil.relativedelta import relativedelta
sys.path.append(os.path.dirname(sys.path[0]))
from pyspark.sql import functions as F, Window
from utils.spark_util import SparkUtil
from utils.db_util import DBUtil
from utils.DorisHelper import DorisHelper
# 当前仅刷新 us;后续扩展只需在此处加站点
SITES = ['us']
# 中间表 / 趋势表常量
STG_DB = "dwt"
STG_TABLE = "dwt_asin_profit_rate_history"
TREND_DB = "selection"
TREND_TABLE = "us_asin_profit_rate_trend"
def _doris_connect(use_type='selection'):
"""统一 pymysql 连接, database='selection' 让 UDF / selection 库可见"""
import pymysql
conn_info = DorisHelper.get_connection_info(use_type)
return pymysql.connect(
host=conn_info['ip'],
port=conn_info['jdbc_port'],
user=conn_info['user'],
password=conn_info['pwd'],
database='selection',
charset='utf8mb4',
autocommit=True,
)
def _exec_doris_sql(sql_list, use_type='selection'):
"""通过 pymysql 走 Doris jdbc_port 执行 DDL / DML 等"""
conn = _doris_connect(use_type)
try:
cur = conn.cursor()
for sql in sql_list:
print(f"[Doris SQL] {sql[:250]}{'...' if len(sql) > 250 else ''}")
cur.execute(sql)
cur.close()
finally:
conn.close()
def get_recent_3_months(spark):
"""通过 MySQL workflow_everyday 拿最大 report_date 向前推 3 月(含当月)"""
sql_max_month = (
"select MAX(report_date) as date_info from workflow_everyday "
"where site_name = 'us' and date_type = 'month' and page = '流量选品'"
)
print(f"sql_max_month = {sql_max_month}")
mysql_con = DBUtil.get_connection_info('mysql', 'us')
max_date_info = SparkUtil.read_jdbc_query(
session=spark, url=mysql_con['url'],
pwd=mysql_con['pwd'], username=mysql_con['username'], query=sql_max_month,
).collect()[0]['date_info']
print(f"workflow_everyday 最大 report_date: {max_date_info}")
assert max_date_info is not None, "workflow_everyday 流量选品月度记录为空, 无法推断近 3 月窗口"
base_dt = datetime.strptime(str(max_date_info), '%Y-%m')
months = [(base_dt - relativedelta(months=i)).strftime('%Y-%m') for i in range(2, -1, -1)]
print(f"近 3 月窗口: {months}")
return months
def refresh_one_month(site_name, date_info):
"""单站点 + 单月份刷新:
INNER JOIN 月维度 × 利润率,只写有利润率的 asin
Doris MOW + sequence_col=update_time 自动按 pr.update_time 覆盖旧版本,无需 DELETE
"""
print(f"\n========== 刷新 site={site_name}, date_info={date_info} ==========")
insert_sql = f"""
INSERT INTO `dwt`.`dwt_asin_profit_rate_history`
(site_name, asin, date_info, price, ocean_profit, air_profit, update_time)
SELECT
'{site_name}' AS site_name,
m.asin,
m.date_info,
m.price,
pr.ocean_profit,
pr.air_profit,
pr.update_time
FROM `dwt`.`{site_name}_flow_asin_month` m
INNER JOIN `dwd`.`dwd_asin_profit_rate_latest` pr
ON m.asin = pr.asin
AND m.price = pr.price
AND pr.site_name = '{site_name}'
WHERE m.date_info = '{date_info}'
AND m.price > 0
"""
_exec_doris_sql([insert_sql])
print(f"[完成] site={site_name}, date_info={date_info}")
def spark_aggregate_and_export(spark, site_name='us'):
"""Spark 端聚合 dwt.dwt_asin_profit_rate_history → 物理表 selection.us_asin_profit_rate_trend
避开 Doris MV REFRESH 时 GROUP BY + COLLECT_LIST 内存爆炸的问题。
数据流:
1) Spark 读 dwt.dwt_asin_profit_rate_history (Doris 端按 site_name 过滤)
2) sort by (asin, date_info ASC) 让组内有序
3) groupBy asin + collect_list (Spark 保证按输入顺序聚合)
4) 写 Doris selection.us_asin_profit_rate_trend (INSERT OVERWRITE 整表)
"""
print(f"\n========== Spark 聚合趋势数据 site={site_name} → {TREND_DB}.{TREND_TABLE} ==========")
# 1. Spark 读中间表(用 connector 并行读,避免 JDBC 单分区 OOM)
# spark_import_with_connector 按 Doris tablet 切分并行,不支持 WHERE 下推,Spark 端 filter
table_identifier = f"{STG_DB}.{STG_TABLE}"
read_fields = "site_name,asin,date_info,price,ocean_profit,air_profit"
print(f"[Spark 读 Doris connector] table={table_identifier}, fields={read_fields}")
df = DorisHelper.spark_import_with_connector(spark, table_identifier, read_fields) \
.filter(F.col('site_name') == site_name) \
.select('asin', 'date_info', 'price', 'ocean_profit', 'air_profit') \
.repartition(40, 'asin')
# 2. groupBy 内按 date_info 排序 (Window + collect_list 保证有序对齐)
win = Window.partitionBy('asin').orderBy(F.col('date_info').asc())
df_sorted = df.withColumn('rn', F.row_number().over(win))
# 3. groupBy + collect_list (Spark 保证 collect_list 按输入顺序;sortWithinPartitions 进一步加固)
df_agg = df_sorted.sortWithinPartitions('asin', 'rn').groupBy('asin').agg(
F.collect_list('date_info').alias('date_info_arr'),
F.collect_list('price').alias('price_arr'),
F.collect_list('ocean_profit').alias('ocean_profit_arr'),
F.collect_list('air_profit').alias('air_profit_arr'),
).cache()
cnt = df_agg.count()
print(f"Spark 聚合后 asin 数: {cnt:,}")
df_agg.show(5, truncate=False)
# 4. INSERT OVERWRITE 写 Doris (Doris connector 需要 ARRAY 字段转 JSON 字符串)
# Doris StreamLoad ARRAY 接收 JSON 数组字符串格式
df_save = df_agg.select(
F.col('asin'),
F.to_json(F.col('date_info_arr')).alias('date_info_arr'),
F.to_json(F.col('price_arr')).alias('price_arr'),
F.to_json(F.col('ocean_profit_arr')).alias('ocean_profit_arr'),
F.to_json(F.col('air_profit_arr')).alias('air_profit_arr'),
)
table_columns = "asin, date_info_arr, price_arr, ocean_profit_arr, air_profit_arr"
DorisHelper.spark_export_with_columns(
df_save=df_save,
db_name=TREND_DB,
table_name=TREND_TABLE,
table_columns=table_columns,
)
df_agg.unpersist()
print(f"[完成] 趋势物理表 {TREND_DB}.{TREND_TABLE} 已更新, 总 asin 数 {cnt:,}")
def main():
spark = SparkUtil.get_spark_session("DwsAsinProfitRateTrendRefresh")
# Step 0: 动态推近 3 月
months = get_recent_3_months(spark)
print(f"\n站点列表: {SITES}")
print(f"刷新月份: {months}")
print(f"总刷新任务数: {len(SITES) * len(months)}")
# Step 1: 站点 × 月份 循环刷新中间表
success_list = []
failed_list = []
for site_name in SITES:
for date_info in months:
try:
refresh_one_month(site_name, date_info)
success_list.append(f"{site_name}/{date_info}")
except Exception as e:
print(f"[失败] site={site_name}, date_info={date_info}, 原因: {e}")
failed_list.append(f"{site_name}/{date_info}: {e}")
print("\n========== 中间表刷新结果汇总 ==========")
print(f"成功 {len(success_list)} 个: {success_list}")
if failed_list:
print(f"失败 {len(failed_list)} 个:")
for item in failed_list:
print(f" - {item}")
sys.exit(1)
# Step 2: Spark 聚合并写入趋势物理表
try:
for site_name in SITES:
spark_aggregate_and_export(spark, site_name=site_name)
except Exception as e:
print(f"[失败] Spark 聚合趋势失败: {e}")
sys.exit(1)
print("\nsuccess!")
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment