Commit ad94b0f3 by chenyuanjie

fix

parent ef36ef57
......@@ -4,8 +4,11 @@
- 串行逐 filter_id 执行,减小数据库压力
- 支持断点续算:latest_computed_month 作为水位线,跳过已算月份
- Doris AGGREGATE KEY(filter_id, asin) + MIN(date_info) 自动保留首次入选月份
- us 站点利润率/Keepa 字段每日刷新:近 REFRESH_MONTHS_COUNT 月 DELETE + 重算
- 水位线规则:刷新/补算成功→维护到最新月份;失败→维护到失败的前一个月
"""
import os
import re
import sys
from datetime import datetime
......@@ -20,6 +23,12 @@ MYSQL_FILTER_TABLE = 'flow_increment_filter_sql' # MySQL 筛选模式日志表
DORIS_RESULT_DB = 'selection'
DORIS_RESULT_TABLE = 'user_selection_pattern'
SUPPORTED_SITES = ('us', 'uk', 'de')
REFRESH_MONTHS_COUNT = 3
REFRESH_FIELDS = {
'ocean_profit', 'air_profit',
'launch_time', 'launch_time_type',
'tracking_since', 'tracking_since_type',
}
# ===== 连接工厂 =====
......@@ -71,6 +80,10 @@ def _get_available_months(doris_cur, site):
return months
def _needs_refresh(where_sql):
return any(re.search(rf'\b{re.escape(field)}\b', where_sql) for field in REFRESH_FIELDS)
def _update_mysql_log(mysql_conn, filter_id, latest_month, status, msg):
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if latest_month is not None:
......@@ -110,19 +123,45 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
print(f" [SKIP] 站点 {site} 无可用月表")
return
# 断点续算:从上次水位线的下一个月开始
# 近 N 月刷新(where_sql 涉及每日刷新字段时)
needs_refresh = _needs_refresh(where_sql)
refresh_months = (
set(m for m in available[-REFRESH_MONTHS_COUNT:] if m >= base_month)
if (needs_refresh and site == 'us') else set()
)
# 水位线补算,跳过已被刷新覆盖的月份
start_month = _next_month(latest_done) if latest_done else base_month
end_month = available[-1]
months = [m for m in available if start_month <= m <= end_month]
new_months = [m for m in available if start_month <= m <= available[-1] and m not in refresh_months]
if not months:
print(f" [SKIP] 无新月份需计算(latest_computed={latest_done})")
all_months = sorted(refresh_months | set(new_months))
if not all_months:
print(f" [SKIP] 无需计算(latest_computed={latest_done},refresh={needs_refresh})")
return
print(f" 计算范围:{months[0]} ~ {months[-1]},共 {len(months)} 个月")
refresh_sorted = sorted(refresh_months)
print(f" 计算范围:{all_months[0]} ~ {all_months[-1]},共 {len(all_months)} 个月"
+ (f"(其中刷新:{refresh_sorted[0]} ~ {refresh_sorted[-1]})" if refresh_months else ""))
# Step 1:批量 DELETE 刷新月份旧数据
if refresh_months:
month_in = "','".join(refresh_sorted)
delete_sql = (f"DELETE FROM `{DORIS_RESULT_DB}`.`{DORIS_RESULT_TABLE}` "
f"WHERE filter_id = {filter_id} AND date_info IN ('{month_in}')")
try:
doris_cur.execute(delete_sql)
print(f" [DELETE] 已清除近 {len(refresh_months)} 月旧数据")
except Exception as e:
err = str(e)[:200]
print(f" [DELETE FAIL] {err}")
_update_mysql_log(mysql_conn, filter_id, latest_done, 'failed', f'刷新DELETE失败: {err}')
return
last_ok_month = None
for month in months:
# Step 2:逐月 INSERT
# DELETE 成功后将水位线退至刷新窗口前一月,防止窗口滑移导致已删月份永久丢失
pre_refresh = [m for m in available if m < refresh_sorted[0]] if refresh_months else []
last_ok_month = pre_refresh[-1] if pre_refresh else None
for month in all_months:
table = f'{site}_flow_asin_month_{month.replace("-", "_")}'
sql = f"""
INSERT INTO `{DORIS_RESULT_DB}`.`{DORIS_RESULT_TABLE}`
......@@ -136,7 +175,7 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
"""
try:
doris_cur.execute(sql)
print(f" [OK] {month}")
print(f" [OK] {month}" + (" [刷新]" if month in refresh_months else ""))
last_ok_month = month
except Exception as e:
err = str(e)[:200]
......@@ -146,7 +185,7 @@ def _compute_one_filter(row, doris_cur, mysql_conn, months_by_site):
_update_mysql_log(
mysql_conn, filter_id, last_ok_month, 'success',
f'计算完成,共处理 {len(months)} 个月({months[0]} ~ {months[-1]})'
f'完成:新增 {len(new_months)} 月,刷新 {len(refresh_months)} 月({all_months[0]} ~ {all_months[-1]})'
)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment