Commit ef36ef57 by chenyuanjie

用户自定义选品模式

parent f67c2f62
"""
@Author : CT
@Description : 根据用户筛选模式日志批量计算选品结果,写入 Doris selection.user_selection_pattern
- 串行逐 filter_id 执行,减小数据库压力
- 支持断点续算:latest_computed_month 作为水位线,跳过已算月份
- Doris AGGREGATE KEY(filter_id, asin) + MIN(date_info) 自动保留首次入选月份
"""
import os
import sys
from datetime import datetime
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DorisHelper import DorisHelper
from utils.db_util import DBUtil, DbTypes
import pymysql
import pandas as pd
# ===== 配置 =====
MYSQL_FILTER_TABLE = 'flow_increment_filter_sql' # MySQL 筛选模式日志表
DORIS_RESULT_DB = 'selection'
DORIS_RESULT_TABLE = 'user_selection_pattern'
SUPPORTED_SITES = ('us', 'uk', 'de')
# ===== 连接工厂 =====
def _get_doris_conn():
info = DorisHelper.get_connection_info('selection')
return pymysql.connect(
host=info['ip'],
port=info['jdbc_port'],
user=info['user'],
password=info['pwd'],
database=DORIS_RESULT_DB,
charset='utf8mb4',
autocommit=True,
)
def _get_mysql_engine():
return DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name='us')
# ===== 工具函数 =====
def _next_month(month_str):
"""'2026-05' → '2026-06'"""
y, m = int(month_str[:4]), int(month_str[5:])
m += 1
if m > 12:
y, m = y + 1, 1
return f'{y}-{m:02d}'
def _get_available_months(doris_cur, site):
"""返回指定站点已存在的月表对应月份列表,升序,排除 _test 表"""
prefix = f'{site}_flow_asin_month_'
doris_cur.execute("""
SELECT TABLE_NAME FROM information_schema.TABLES
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME LIKE %s
AND TABLE_NAME NOT LIKE '%%\_test'
ORDER BY TABLE_NAME
""", (DORIS_RESULT_DB, f'{prefix}%'))
months = []
for (table_name,) in doris_cur.fetchall():
suffix = table_name[len(prefix):] # '2026_05'
month = suffix.replace('_', '-') # '2026-05'
if len(month) == 7:
months.append(month)
return months
def _update_mysql_log(mysql_conn, filter_id, latest_month, status, msg):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if latest_month is not None:
# 正常情况:同步更新水位线和状态
sql = f"""UPDATE `{MYSQL_FILTER_TABLE}`
SET latest_computed_month = %s,
last_run_status = %s,
last_run_at = %s,
last_run_msg = %s
WHERE filter_id = %s"""
params = (latest_month, status, now, msg[:500], filter_id)
else:
# 首月即失败:不更新水位线,仅记录失败状态
sql = f"""UPDATE `{MYSQL_FILTER_TABLE}`
SET last_run_status = %s,
last_run_at = %s,
last_run_msg = %s
WHERE filter_id = %s"""
params = (status, now, msg[:500], filter_id)
cur = mysql_conn.cursor()
cur.execute(sql, params)
mysql_conn.commit()
cur.close()
# ===== 核心计算 =====
def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
filter_id = row['filter_id']
site = row['site']
base_month = row['base_month']
where_sql = row['where_sql']
latest_done = row['latest_computed_month'] # None 表示从未计算过
available = months_by_site.get(site, [])
if not available:
print(f" [SKIP] 站点 {site} 无可用月表")
return
# 断点续算:从上次水位线的下一个月开始
start_month = _next_month(latest_done) if latest_done else base_month
end_month = available[-1]
months = [m for m in available if start_month <= m <= end_month]
if not months:
print(f" [SKIP] 无新月份需计算(latest_computed={latest_done})")
return
print(f" 计算范围:{months[0]} ~ {months[-1]},共 {len(months)} 个月")
last_ok_month = None
for month in months:
table = f'{site}_flow_asin_month_{month.replace("-", "_")}'
sql = f"""
INSERT INTO `{DORIS_RESULT_DB}`.`{DORIS_RESULT_TABLE}`
(filter_id, asin, date_info)
SELECT
{filter_id} AS filter_id,
asin,
'{month}' AS date_info
FROM `{DORIS_RESULT_DB}`.`{table}`
WHERE {where_sql}
"""
try:
doris_cur.execute(sql)
print(f" [OK] {month}")
last_ok_month = month
except Exception as e:
err = str(e)[:200]
print(f" [FAIL] {month} 错误:{err}")
_update_mysql_log(mysql_conn, filter_id, last_ok_month, 'failed', err)
return
_update_mysql_log(
mysql_conn, filter_id, last_ok_month, 'success',
f'计算完成,共处理 {len(months)} 个月({months[0]} ~ {months[-1]})'
)
# ===== 入口 =====
def main():
doris_conn = _get_doris_conn()
engine = _get_mysql_engine()
mysql_conn = engine.raw_connection()
try:
doris_cur = doris_conn.cursor()
# 预加载各站点可用月份
months_by_site = {site: _get_available_months(doris_cur, site) for site in SUPPORTED_SITES}
for site, months in months_by_site.items():
rng = f"{months[0]} ~ {months[-1]}" if months else "无"
print(f"[站点 {site}] 可用月份:{rng}(共 {len(months)} 个)")
# 读取全部 active 筛选模式
df = pd.read_sql(
f"""SELECT filter_id, site, base_month, where_sql, latest_computed_month
FROM `{MYSQL_FILTER_TABLE}`
WHERE status = 'active'
ORDER BY filter_id""",
engine
)
total = len(df)
print(f"\n共 {total} 条 active 筛选模式,开始串行计算...\n")
for i, row in df.iterrows():
print(f"[{i + 1}/{total}] filter_id={row['filter_id']} site={row['site']}")
try:
_compute_one_filter(row, doris_cur, mysql_conn, months_by_site)
except Exception as e:
print(f" [ERROR] 未预期异常:{e}")
print("\n全部完成")
finally:
doris_conn.close()
mysql_conn.close()
if __name__ == '__main__':
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment