Commit a858f39d by chenyuanjie

用户自定义选品模式-增加刷新机制

parent ad94b0f3
...@@ -4,8 +4,8 @@ ...@@ -4,8 +4,8 @@
- 串行逐 filter_id 执行,减小数据库压力 - 串行逐 filter_id 执行,减小数据库压力
- 支持断点续算:latest_computed_month 作为水位线,跳过已算月份 - 支持断点续算:latest_computed_month 作为水位线,跳过已算月份
- Doris AGGREGATE KEY(filter_id, asin) + MIN(date_info) 自动保留首次入选月份 - Doris AGGREGATE KEY(filter_id, asin) + MIN(date_info) 自动保留首次入选月份
- us 站点利润率/Keepa 字段每日刷新:近 REFRESH_MONTHS_COUNT 月 DELETE + 重算 - us 站点利润率/Keepa 字段每日刷新:TRUNCATE 分区(p{filter_id}) + 全量重算
- 水位线规则:刷新/补算成功→维护到最新月份;失败→维护到失败的前一个月 - 水位线规则:成功→维护到最新月份;失败→维护到失败的前一个月
""" """
import os import os
import re import re
...@@ -22,9 +22,8 @@ import pandas as pd ...@@ -22,9 +22,8 @@ import pandas as pd
MYSQL_FILTER_TABLE = 'flow_increment_filter_sql' # MySQL 筛选模式日志表 MYSQL_FILTER_TABLE = 'flow_increment_filter_sql' # MySQL 筛选模式日志表
DORIS_RESULT_DB = 'selection' DORIS_RESULT_DB = 'selection'
DORIS_RESULT_TABLE = 'user_selection_pattern' DORIS_RESULT_TABLE = 'user_selection_pattern'
SUPPORTED_SITES = ('us', 'uk', 'de') SUPPORTED_SITES = ('us', 'uk', 'de')
REFRESH_MONTHS_COUNT = 3 REFRESH_FIELDS = {
REFRESH_FIELDS = {
'ocean_profit', 'air_profit', 'ocean_profit', 'air_profit',
'launch_time', 'launch_time_type', 'launch_time', 'launch_time_type',
'tracking_since', 'tracking_since_type', 'tracking_since', 'tracking_since_type',
...@@ -123,44 +122,43 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site): ...@@ -123,44 +122,43 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
print(f" [SKIP] 站点 {site} 无可用月表") print(f" [SKIP] 站点 {site} 无可用月表")
return return
# 近 N 月刷新(where_sql 涉及每日刷新字段时) is_refresh = _needs_refresh(where_sql) and site == 'us'
needs_refresh = _needs_refresh(where_sql)
refresh_months = (
set(m for m in available[-REFRESH_MONTHS_COUNT:] if m >= base_month)
if (needs_refresh and site == 'us') else set()
)
# 水位线补算,跳过已被刷新覆盖的月份 if is_refresh:
start_month = _next_month(latest_done) if latest_done else base_month # 刷新模式:TRUNCATE 分区 + 从 base_month 全量重算
new_months = [m for m in available if start_month <= m <= available[-1] and m not in refresh_months] doris_cur.execute(f"SELECT auto_partition_name('list', {filter_id})")
partition_name = doris_cur.fetchone()[0]
truncate_sql = (f"TRUNCATE TABLE `{DORIS_RESULT_DB}`.`{DORIS_RESULT_TABLE}` "
f"PARTITION ({partition_name})")
try:
doris_cur.execute(truncate_sql)
print(f" [TRUNCATE] 已清空分区 {partition_name}")
except Exception as e:
err_str = str(e)
if 'does not exist' in err_str:
# 首次计算时分区尚未创建,跳过 TRUNCATE 直接 INSERT
print(f" [TRUNCATE SKIP] 分区 {partition_name} 不存在(首次计算),跳过清空")
else:
err = err_str[:200]
print(f" [TRUNCATE FAIL] {err}")
_update_mysql_log(mysql_conn, filter_id, latest_done, 'failed', f'TRUNCATE失败: {err}')
return
all_months = [m for m in available if m >= base_month]
mode_label = '刷新重算'
last_ok_month = None
else:
# 补算模式:水位线增量
start_month = _next_month(latest_done) if latest_done else base_month
all_months = [m for m in available if start_month <= m <= available[-1]]
mode_label = '新增'
last_ok_month = None
all_months = sorted(refresh_months | set(new_months))
if not all_months: if not all_months:
print(f" [SKIP] 无需计算(latest_computed={latest_done},refresh={needs_refresh})") print(f" [SKIP] 无需计算(latest_computed={latest_done},refresh={is_refresh})")
return return
refresh_sorted = sorted(refresh_months) print(f" [{mode_label}] 计算范围:{all_months[0]} ~ {all_months[-1]},共 {len(all_months)} 个月")
print(f" 计算范围:{all_months[0]} ~ {all_months[-1]},共 {len(all_months)} 个月"
+ (f"(其中刷新:{refresh_sorted[0]} ~ {refresh_sorted[-1]})" if refresh_months else ""))
# Step 1:批量 DELETE 刷新月份旧数据
if refresh_months:
month_in = "','".join(refresh_sorted)
delete_sql = (f"DELETE FROM `{DORIS_RESULT_DB}`.`{DORIS_RESULT_TABLE}` "
f"WHERE filter_id = {filter_id} AND date_info IN ('{month_in}')")
try:
doris_cur.execute(delete_sql)
print(f" [DELETE] 已清除近 {len(refresh_months)} 月旧数据")
except Exception as e:
err = str(e)[:200]
print(f" [DELETE FAIL] {err}")
_update_mysql_log(mysql_conn, filter_id, latest_done, 'failed', f'刷新DELETE失败: {err}')
return
# Step 2:逐月 INSERT
# DELETE 成功后将水位线退至刷新窗口前一月,防止窗口滑移导致已删月份永久丢失
pre_refresh = [m for m in available if m < refresh_sorted[0]] if refresh_months else []
last_ok_month = pre_refresh[-1] if pre_refresh else None
for month in all_months: for month in all_months:
table = f'{site}_flow_asin_month_{month.replace("-", "_")}' table = f'{site}_flow_asin_month_{month.replace("-", "_")}'
sql = f""" sql = f"""
...@@ -175,7 +173,7 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site): ...@@ -175,7 +173,7 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
""" """
try: try:
doris_cur.execute(sql) doris_cur.execute(sql)
print(f" [OK] {month}" + (" [刷新]" if month in refresh_months else "")) print(f" [OK] {month}")
last_ok_month = month last_ok_month = month
except Exception as e: except Exception as e:
err = str(e)[:200] err = str(e)[:200]
...@@ -185,7 +183,7 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site): ...@@ -185,7 +183,7 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
_update_mysql_log( _update_mysql_log(
mysql_conn, filter_id, last_ok_month, 'success', mysql_conn, filter_id, last_ok_month, 'success',
f'完成:新增 {len(new_months)} 月,刷新 {len(refresh_months)} 月({all_months[0]} ~ {all_months[-1]})' f'完成:{mode_label} {len(all_months)} 个月({all_months[0]} ~ {all_months[-1]})'
) )
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment