Commit 1777e202 by chenyuanjie

流量选品月导出Doris-流程控制

parent 30b05aea
...@@ -2,24 +2,30 @@ ...@@ -2,24 +2,30 @@
author: CT author: CT
description: 同步 Hive dwt_flow_asin 月维度数据到 Doris dwt.{site}_flow_asin_month description: 同步 Hive dwt_flow_asin 月维度数据到 Doris dwt.{site}_flow_asin_month
流程: 流程:
1) 在 Doris 建当月物化表 selection.{site}_flow_asin_month_{yyyy_mm}(IF NOT EXISTS) Step 0) 建 selection 月物化表 selection.{site}_flow_asin_month_{yyyy_mm}[_test]
2) Spark 读 Hive dwt_flow_asin 月数据,规范化后写入 dwt.{site}_flow_asin_month Step 1~3) Spark 读 Hive → 规范化 → 写 Doris dwt.{site}_flow_asin_month
3) Doris 端执行 INSERT OVERWRITE 把 dwt 主表 + 关联 JOIN 物化到 selection 月表 Step 4) Doris INSERT OVERWRITE 物化到 selection 月物化表
Step 5) 更新 MySQL workflow_everyday 流程记录表(仅 formal 模式)
支持 us / uk / de 三站点 支持 us / uk / de 三站点
支持 formal / test 模式:
- formal:selection 表名无后缀,更新流程记录表
- test :selection 表名加 _test 后缀,不更新流程记录表(dwt 主表不变)
执行示例: 执行示例:
spark-submit dwt_flow_asin_month.py us 2026-05 spark-submit dwt_flow_asin_month.py us 2026-05 # 默认 formal
spark-submit dwt_flow_asin_month.py uk 2026-05 spark-submit dwt_flow_asin_month.py us 2026-05 formal
spark-submit dwt_flow_asin_month.py de 2026-05 spark-submit dwt_flow_asin_month.py us 2026-05 test # test 模式
""" """
import os import os
import sys import sys
sys.path.append(os.path.dirname(sys.path[0])) sys.path.append(os.path.dirname(sys.path[0]))
import pandas as pd
from pyspark.sql import functions as F from pyspark.sql import functions as F
from utils.spark_util import SparkUtil from utils.spark_util import SparkUtil
from utils.DorisHelper import DorisHelper from utils.DorisHelper import DorisHelper
from utils.db_util import DBUtil, DbTypes
DORIS_DB = "dwt" DORIS_DB = "dwt"
...@@ -50,9 +56,9 @@ def _exec_doris_sql(sql_list, use_type='selection'): ...@@ -50,9 +56,9 @@ def _exec_doris_sql(sql_list, use_type='selection'):
conn.close() conn.close()
def build_create_table_sql(site_name, date_info_underscore, date_info): def build_create_table_sql(table_name, date_info):
"""构建 selection.{site}_flow_asin_month_{yyyy_mm} 建表语句(与 DDL 一致)""" """构建 selection.{table_name} 建表语句(与 DDL 一致);
table_name = f"{site_name}_flow_asin_month_{date_info_underscore}" table_name 由外层拼接:{site}_flow_asin_month_{yyyy_mm}[_test]"""
return f""" return f"""
CREATE TABLE IF NOT EXISTS `selection`.`{table_name}` CREATE TABLE IF NOT EXISTS `selection`.`{table_name}`
( (
...@@ -203,9 +209,9 @@ PROPERTIES ( ...@@ -203,9 +209,9 @@ PROPERTIES (
""" """
def build_insert_overwrite_sql(site_name, date_info_underscore, date_info): def build_insert_overwrite_sql(site_name, table_name, date_info):
"""构建 INSERT OVERWRITE 到 selection.{site}_flow_asin_month_{yyyy_mm} 的 SQL""" """构建 INSERT OVERWRITE 到 selection.{table_name} 的 SQL;
table_name = f"{site_name}_flow_asin_month_{date_info_underscore}" table_name 由外层拼接:{site}_flow_asin_month_{yyyy_mm}[_test]"""
return f""" return f"""
INSERT OVERWRITE TABLE `selection`.`{table_name}` INSERT OVERWRITE TABLE `selection`.`{table_name}`
SELECT SELECT
...@@ -383,20 +389,54 @@ WHERE f.date_info = '{date_info}' ...@@ -383,20 +389,54 @@ WHERE f.date_info = '{date_info}'
""" """
def main(site_name, date_info): def modify_mission_record_status(site_name, date_info, result_type):
"""流程记录表更新:仅 month + formal 模式才入库 mysql workflow_everyday
参考 export_es/es_flow_asin.py modify_mission_record_status"""
if result_type != 'formal':
print(f"[Step 5] result_type={result_type},跳过流程记录表更新")
return
record_table = 'workflow_everyday'
record_table_name_field = f'{site_name}_flow_asin_last_month'
record_type = 'month'
cur_date = date_info
engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name='us')
select_sql = (
f"select id from {record_table} where site_name='{site_name}' and date_type='month' "
f"and report_date='{cur_date}' and page='流量选品' and status_val=14 and is_end='是'"
)
df_is_finished = pd.read_sql(select_sql, engine_mysql)
if df_is_finished.empty:
replace_sql = f"""
replace into {record_table} (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type)
VALUES ('{site_name}', '{cur_date}', '流量选品计算完毕', 14, '{record_table_name_field}', '{record_type}', '流量选品', '是', '流量选品计算完毕', 'doris')
"""
DBUtil.exec_sql('mysql', 'us', replace_sql)
print(f"[Step 5] 流程记录表 workflow_everyday 已写入:{site_name} {cur_date}")
else:
print(f"[Step 5] 流程记录表已存在该记录,跳过")
def main(site_name, date_info, result_type='formal'):
assert site_name in SUPPORTED_SITES, f"不支持的站点:{site_name},仅支持 us/uk/de" assert site_name in SUPPORTED_SITES, f"不支持的站点:{site_name},仅支持 us/uk/de"
assert result_type in ('formal', 'test'), f"不支持的 result_type:{result_type},仅支持 formal/test"
doris_table = f"{site_name}_flow_asin_month" doris_table = f"{site_name}_flow_asin_month" # dwt 主表,不区分 test/formal
date_info_underscore = date_info.replace('-', '_') date_info_underscore = date_info.replace('-', '_')
selection_table = f"{site_name}_flow_asin_month_{date_info_underscore}" # selection 月物化表:test 模式加 _test 后缀
env_suffix = '_test' if result_type == 'test' else ''
selection_table = f"{site_name}_flow_asin_month_{date_info_underscore}{env_suffix}"
print(f"启动:site={site_name}, date_info={date_info}, result_type={result_type}")
print(f" dwt 主表:dwt.{doris_table}(不区分 test/formal)")
print(f" selection 物化表:selection.{selection_table}")
spark = SparkUtil.get_spark_session( spark = SparkUtil.get_spark_session(
f"DwtFlowAsinMonth: {site_name} {date_info}" f"DwtFlowAsinMonth: {site_name} {date_info} {result_type}"
) )
# ===== Step 0:Doris 端建 selection 月物化表(IF NOT EXISTS)===== # ===== Step 0:Doris 端建 selection 月物化表(IF NOT EXISTS) =====
print(f"[Step 0] Doris 建表 selection.{selection_table}") print(f"[Step 0] Doris 建表 selection.{selection_table}")
_exec_doris_sql([build_create_table_sql(site_name, date_info_underscore, date_info)]) _exec_doris_sql([build_create_table_sql(selection_table, date_info)])
# ===== Step 1:读 Hive dwt_flow_asin 月数据 ===== # ===== Step 1:读 Hive dwt_flow_asin 月数据 =====
sql = f""" sql = f"""
...@@ -631,7 +671,10 @@ def main(site_name, date_info): ...@@ -631,7 +671,10 @@ def main(site_name, date_info):
# ===== Step 4:Doris 端 INSERT OVERWRITE 到 selection 月物化表 ===== # ===== Step 4:Doris 端 INSERT OVERWRITE 到 selection 月物化表 =====
print(f"[Step 4] Doris INSERT OVERWRITE selection.{selection_table}") print(f"[Step 4] Doris INSERT OVERWRITE selection.{selection_table}")
_exec_doris_sql([build_insert_overwrite_sql(site_name, date_info_underscore, date_info)]) _exec_doris_sql([build_insert_overwrite_sql(site_name, selection_table, date_info)])
# ===== Step 5:流程记录表更新(仅 formal 模式)=====
modify_mission_record_status(site_name, date_info, result_type)
print("success!") print("success!")
...@@ -639,4 +682,5 @@ def main(site_name, date_info): ...@@ -639,4 +682,5 @@ def main(site_name, date_info):
if __name__ == "__main__": if __name__ == "__main__":
site_name = sys.argv[1] site_name = sys.argv[1]
date_info = sys.argv[2] date_info = sys.argv[2]
main(site_name, date_info) result_type = sys.argv[3] if len(sys.argv) > 3 else 'formal'
main(site_name, date_info, result_type)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment