搜索词周/月报表下载

parent 64417b68
# -*- coding: utf-8 -*-
"""
ABA Report 自动化下载框架
流程:
1. kill 残留 Chrome → 启动浏览器(复用系统用户数据/登录态)
2. 访问数据页 → 判断登录状态(未登录会跳转登录页,停止执行并通知)
3. 选择日期范围、站点等筛选条件 → 点击查询
4. 点击下载按钮 → 弹窗出现 → 点击弹窗中的确认按钮
5. 等待状态变为 Open → 点击 Open 按钮 → 打开新标签页
6. 新标签页中找到最新一条下载记录 → 获取下载链接(而非弹窗保存)
定时策略(使用 APScheduler):
- 每周日 23:00 执行周报下载,失败则推迟一天重试,最多重试 MAX_RETRY_DAYS 天
- 登录失效也算失败,推迟到明天重试(等用户手动登录后自动继续)
- 后续可轻松添加每月任务,只需加一行 scheduler.add_job(...)
"""
import time
import subprocess
from datetime import datetime, timedelta, date
import requests
from loguru import logger
from DrissionPage import ChromiumPage, ChromiumOptions
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from config import (
WEEKLY_DAY_OF_WEEK, WEEKLY_HOUR, WEEKLY_MINUTE,
MONTHLY_DAY, MONTHLY_HOUR, MONTHLY_MINUTE,
MAX_RETRY_DAYS, RETRY_INTERVAL_MINUTES, DATA_URL, SITES, SITE_PICKER_MAP, SITE_COUNTRY_NAME, SITE_DM_COUNTRY
)
from secure_db_client import get_remote_engine
import re
import os
from urllib.parse import urlparse, parse_qs, unquote
# ======================== Chrome 进程管理 ========================
def is_chrome_running():
"""
检测系统中是否有 chrome.exe 进程在运行
作用:在启动自动化浏览器之前检测,避免用户数据目录被占用导致启动失败
:return: True=有 Chrome 在跑, False=没有
"""
try:
# tasklist: Windows 查看进程列表的命令
# /FI "IMAGENAME eq chrome.exe": 只过滤 chrome.exe 进程
# /NH: 不显示表头
output = subprocess.check_output(
'tasklist /FI "IMAGENAME eq chrome.exe" /NH',
shell=True, stderr=subprocess.DEVNULL
).decode('gbk', errors='ignore') # Windows 中文系统用 gbk 编码
return 'chrome.exe' in output.lower()
except Exception:
return False
def kill_chrome():
"""
强制关闭所有 Chrome 进程,并确认进程完全退出
作用:释放用户数据目录的锁,让自动化可以正常启动浏览器
"""
try:
# taskkill: Windows 杀进程的命令
# /F: 强制终止 /IM: 按进程名匹配 /T: 同时终止子进程
subprocess.call('taskkill /F /IM chrome.exe /T', shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
# 等待进程完全退出,反复检测直到确认没有 chrome 进程
# Chrome 有时会自动重启(崩溃恢复机制),所以需要循环检测
for i in range(10):
time.sleep(2)
if not is_chrome_running():
logger.info("Chrome 进程已完全退出")
return
logger.debug(f"Chrome 仍在运行,再次尝试关闭(第 {i + 1} 次)")
subprocess.call('taskkill /F /IM chrome.exe /T', shell=True,
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logger.warning("多次尝试后 Chrome 仍未完全退出")
except Exception as e:
logger.error(f"关闭 Chrome 失败: {e}")
def ensure_chrome_closed():
"""
确保 Chrome 已关闭,在运行就 kill
作用:自动化执行前的前置检查,保证浏览器能正常启动
"""
if is_chrome_running():
logger.info("检测到 Chrome 正在运行,强制关闭")
kill_chrome()
# ======================== 日期计算 ========================
def get_last_week_range():
"""
从数据库 date_20_to_30 表查询上一周的日期范围(周日~周六为一周)
返回: (week_start, week_end, date_info)
- week_start: 上周日日期,date 对象
- week_end: 上周六日期,date 对象
- date_info: year_week 字符串,如 "2026-11"
"""
sql = """
SELECT
MIN(date) AS week_start,
MAX(date) AS week_end,
year_week AS date_info
FROM date_20_to_30
WHERE year_week = (
SELECT year_week
FROM date_20_to_30
WHERE date = (
SELECT MAX(date) FROM date_20_to_30 WHERE date < (
SELECT MIN(date) FROM date_20_to_30 WHERE year_week = (
SELECT year_week FROM date_20_to_30 WHERE date = CURDATE()
)
)
)
)
"""
try:
engine = get_remote_engine(site_name='us', db_type='mysql')
date_df = engine.read_sql(sql)
if date_df.empty:
logger.error("从数据库查询上周日期范围失败:无结果")
return None, None, None
week_start = date_df['week_start'].iloc[0]
week_end = date_df['week_end'].iloc[0]
date_info = str(date_df['date_info'].iloc[0])
# 如果返回的是字符串,转为 date 对象
if isinstance(week_start, str):
week_start = date.fromisoformat(week_start)
if isinstance(week_end, str):
week_end = date.fromisoformat(week_end)
# 如果是 pandas Timestamp,转为 date 对象
if hasattr(week_start, 'date'):
week_start = week_start.date()
if hasattr(week_end, 'date'):
week_end = week_end.date()
logger.info(f"从数据库获取上周范围: {week_start} ~ {week_end} (date_info: {date_info})")
return week_start, week_end, date_info
except Exception as e:
logger.error(f"查询上周日期范围异常: {e}")
return None, None, None
def get_last_month_range():
"""
从数据库 date_20_to_30 表查询上一个月的日期范围
返回: (month_start, month_end, date_info)
- month_start: 上月第一天,date 对象
- month_end: 上月最后一天,date 对象
- date_info: year_month 字符串,如 "2026-02"
"""
sql = """
SELECT
MIN(date) AS month_start,
MAX(date) AS month_end,
`year_month` AS date_info
FROM date_20_to_30
WHERE `year_month` = (
SELECT `year_month`
FROM date_20_to_30
WHERE date = (
SELECT MAX(date) FROM date_20_to_30 WHERE date < (
SELECT MIN(date) FROM date_20_to_30 WHERE `year_month` = (
SELECT `year_month` FROM date_20_to_30 WHERE date = CURDATE()
)
)
)
)
"""
try:
engine = get_remote_engine(site_name='us', db_type='mysql')
date_df = engine.read_sql(sql)
if date_df.empty:
logger.error("从数据库查询上月日期范围失败:无结果")
return None, None, None
month_start = date_df['month_start'].iloc[0]
month_end = date_df['month_end'].iloc[0]
date_info = str(date_df['date_info'].iloc[0])
if isinstance(month_start, str):
month_start = date.fromisoformat(month_start)
if isinstance(month_end, str):
month_end = date.fromisoformat(month_end)
if hasattr(month_start, 'date'):
month_start = month_start.date()
if hasattr(month_end, 'date'):
month_end = month_end.date()
logger.info(f"从数据库获取上月范围: {month_start} ~ {month_end} (date_info: {date_info})")
return month_start, month_end, date_info
except Exception as e:
logger.error(f"查询上月日期范围异常: {e}")
return None, None, None
# ======================== 浏览器 ========================
def create_browser():
"""
启动浏览器,使用系统默认用户数据目录
作用:打开一个带有用户书签、登录态、插件的 Chrome,而不是全新空白的浏览器
前提:Chrome 必须是关闭状态,否则用户数据目录会被占用导致启动失败
:return: ChromiumPage 页面对象,失败返回 None
"""
co = ChromiumOptions()
# 指定 Chrome 浏览器路径
# co.set_browser_path(r"C:\Users\Administrator\AppData\Local\google\Chrome\Application\chrome.exe")
# co.set_local_port(9333)
# co.set_user_data_path(r"C:\Users\Administrator\AppData\Local\Google\Chrome\User Data")
# 使用系统 Chrome 的默认用户数据目录(包含登录态、书签、插件等)
co.use_system_user_path()
# Win7 兼容性参数
# co.set_argument('--no-sandbox')
# co.set_argument('--disable-gpu')
# 启动时窗口最大化
co.set_argument('--start-maximized')
# 禁用自动翻译,强制英文界面,避免页面元素被翻译后选择器匹配不到
# co.set_argument('--disable-features=Translate')
# co.set_argument('--lang=en-US')
try:
page = ChromiumPage(co)
logger.info("浏览器启动成功(使用系统用户数据)")
return page
except Exception as e:
logger.error(f"浏览器启动失败: {e}")
return None
# ======================== 登录状态 ========================
def check_login_status(page):
"""
判断是否已登录(数据页是否正确打开)
主要依据:URL 中包含 brand-analytics
辅助验证:页面标题包含 Brand Analytics、h1 标签包含 Top search terms
:return: True=已登录, False=未登录(被重定向到了登录页)
"""
# 主要判断:URL 是否还在数据页
if "brand-analytics" not in page.url:
logger.warning(f"URL 不包含 brand-analytics,当前 URL: {page.url}")
return False
# 辅助验证:页面标题
if "Brand Analytics" not in (page.title or ""):
logger.warning(f"页面标题不包含 Brand Analytics,当前标题: {page.title}")
return False
# # 辅助验证:h1 标签内容
# h1 = page.ele('tag:h1', timeout=3)
# if not h1 or "Top search terms" not in (h1.text or ""):
# logger.warning("页面 h1 标签不包含 Top search terms")
# return False
return True
# ======================== 语言切换 ========================
def ensure_english_language(page):
"""
确保页面语言为英文,避免中文/其他语言导致元素选择器匹配失败。
通过右上角语言图标判断当前语言,如果不是 EN 则悬浮菜单切换到 English。
:return: True=已是英文或切换成功, False=切换失败
"""
locale_wrapper = page.ele('css:.locale-icon-wrapper', timeout=5)
if not locale_wrapper:
logger.warning("未找到语言图标,跳过语言检查")
return True
current_lang = locale_wrapper.text.strip()
if current_lang.startswith('EN'):
logger.info(f"当前语言已是英文: {current_lang}")
return True
logger.info(f"当前语言为 {current_lang},需要切换到英文")
# 悬浮到语言按钮,触发下拉菜单
locale_trigger = page.ele('css:div[data-test-tag="nav-locale-button"] .flyoutTriggerWrap', timeout=5)
if not locale_trigger:
logger.error("未找到语言切换按钮")
return False
locale_trigger.hover()
time.sleep(1)
# 点击 English 选项(data-test-tag 以 locale-list-item-en_ 开头)
en_option = page.ele('css:a[data-test-tag^="locale-list-item-en_"]', timeout=5)
if not en_option:
logger.error("未找到 English 语言选项")
return False
en_option.click()
time.sleep(3)
page.wait.doc_loaded()
# 验证切换结果
locale_wrapper = page.ele('css:.locale-icon-wrapper', timeout=5)
if locale_wrapper and locale_wrapper.text.strip().startswith('EN'):
logger.info("语言已成功切换到英文")
return True
else:
logger.error(f"语言切换后验证失败,当前: {locale_wrapper.text.strip() if locale_wrapper else '未知'}")
return False
# ======================== 步骤1: 打开数据页 ========================
def navigate_to_data_page(page):
"""
打开数据页面(只调用一次)
作用:导航到报告下载的页面,如果没有登录状态会被重定向到登录页
"""
page.get(DATA_URL)
page.wait.doc_loaded()
logger.info(f"已打开数据页: {DATA_URL}")
# ======================== 步骤1.5: 选择国家 ========================
def select_country(page, site):
"""
通过 countryPicker 下拉框切换到目标国家
:param site: 站点代码,如 "US", "UK", "DE" 等
:return: True=成功, False=失败
"""
picker_value = SITE_PICKER_MAP.get(site)
if not picker_value:
logger.error(f"[{site}] 未在 SITE_PICKER_MAP 中找到对应的国家代码")
return False
country_picker = page.ele('#countryPicker', timeout=10)
if not country_picker:
logger.error(f"[{site}] 未找到 countryPicker 下拉框")
return False
# 如果当前已是目标国家,跳过
if country_picker.attr('value') == picker_value:
logger.info(f"[{site}] 已是目标国家: {picker_value}")
return True
# 展开下拉框
country_picker.shadow_root.ele('css:.select-header').click()
time.sleep(1)
# 点击目标国家选项(kat-option 在 light DOM 中)
target_option = page.ele(f'css:#countryPicker kat-option[value="{picker_value}"]', timeout=5)
if not target_option:
logger.error(f"[{site}] 未找到国家选项: {picker_value}")
page.ele('tag:body').click()
time.sleep(1)
return False
target_option.click()
time.sleep(3) # 切换国家后等待页面数据刷新
page.wait.doc_loaded()
# 验证切换结果
if country_picker.attr('value') != picker_value:
logger.error(f"[{site}] 国家切换失败,当前值: {country_picker.attr('value')}")
return False
logger.info(f"[{site}] 已切换到国家: {picker_value}")
return True
# ======================== 步骤2: 选择筛选条件并查询 ========================
def select_filters_and_query(page, week_start, week_end):
"""
选择日期范围、站点等筛选条件,然后点击查询
下拉框 value 对应的是周六(week_end)的日期,格式 YYYY-MM-DD
操作方式:模拟点击展开下拉框 → 点击目标选项(不用 JS 赋值)
:return: True=筛选成功, False=筛选失败
"""
target_value = week_end.isoformat() # 如 "2026-03-14"
logger.info(f"设置筛选条件,目标周: {target_value}")
# 1. 确认 Reporting range 是 weekly,不是则点击切换
reporting_range = page.ele('#reporting-range', timeout=10)
if reporting_range.attr('value') != 'weekly':
# 通过 shadow_root 点击 header 展开下拉框
reporting_range.shadow_root.ele('css:.select-header').click()
time.sleep(1)
# 通过 shadow_root 点击 Weekly 选项(kat-option 在 shadow DOM 内)
weekly_option = reporting_range.shadow_root.ele('css:kat-option[value="weekly"]', timeout=5)
if not weekly_option:
logger.error("未找到 Weekly 选项")
return False
weekly_option.click()
time.sleep(2) # 切换后 Select week 的选项列表会刷新,多等一下
# 验证
if reporting_range.attr('value') != 'weekly':
logger.error(f"Reporting range 切换失败,当前值: {reporting_range.attr('value')}")
return False
logger.info("Reporting range 已切换为 weekly")
else:
logger.info("Reporting range 已是 weekly")
# 2. 选择目标周(Select week 下拉框)
week_dropdown = page.ele('#weekly-week', timeout=10)
current_value = week_dropdown.attr('value')
if current_value != target_value:
# 通过 shadow_root 点击 header 展开下拉框
week_dropdown.shadow_root.ele('css:.select-header').click()
time.sleep(1)
# 通过 shadow_root 查找目标周选项
target_option = week_dropdown.shadow_root.ele(f'css:kat-option[value="{target_value}"]', timeout=5)
if not target_option:
logger.error(f"未找到目标周选项: {target_value},下拉框中可能没有该周数据")
# 点击空白处收起下拉框,避免影响后续操作
page.ele('tag:body').click()
time.sleep(1)
return False
target_option.click()
time.sleep(1)
logger.info(f"Select week 已切换为 {target_value}(原值: {current_value})")
else:
logger.info(f"Select week 已是目标周: {target_value}")
# 3. 二次验证:确认下拉框的 value 确实变了
final_value = page.ele('#weekly-week').attr('value')
if final_value != target_value:
logger.error(f"选择验证失败!期望: {target_value},实际: {final_value}")
return False
# 4. 点击 Apply 按钮
apply_btn = page.ele('css:kat-button[data-test-id="RequiredFilterApplyButton"][label="Apply"]', timeout=10)
apply_btn.click()
logger.info("已点击 Apply 按钮")
# 5. 等待页面加载完成
page.wait.doc_loaded()
time.sleep(3) # 额外等待数据渲染
logger.info("筛选条件已应用,页面加载完成")
return True
# ======================== 步骤2-B: 月报筛选条件 ========================
def select_filters_and_query_monthly(page, month_end):
"""
月报筛选:Reporting range 切为 monthly → 选年份 → 选月份 → 点 Apply
:param month_end: 上月最后一天,date 对象(如 2026-02-28)
:return: True=筛选成功, False=筛选失败
"""
target_year = str(month_end.year) # 如 "2026"
target_value = month_end.isoformat() # 如 "2026-02-28"
month_id = f"{target_year}-month" # 如 "2026-month"
logger.info(f"设置月报筛选条件,目标: {target_year} / {target_value}")
# 1. 切换 Reporting range 为 monthly
reporting_range = page.ele('#reporting-range', timeout=10)
if reporting_range.attr('value') != 'monthly':
reporting_range.shadow_root.ele('css:.select-header').click()
time.sleep(1)
monthly_option = reporting_range.shadow_root.ele('css:kat-option[value="monthly"]', timeout=5)
if not monthly_option:
logger.error("未找到 Monthly 选项")
return False
monthly_option.click()
time.sleep(2)
if reporting_range.attr('value') != 'monthly':
logger.error(f"Reporting range 切换失败,当前值: {reporting_range.attr('value')}")
return False
logger.info("Reporting range 已切换为 monthly")
else:
logger.info("Reporting range 已是 monthly")
# 2. 选择年份(Select year)
year_dropdown = page.ele('#monthly-year', timeout=10)
if year_dropdown.attr('value') != target_year:
year_dropdown.shadow_root.ele('css:.select-header').click()
time.sleep(1)
year_option = year_dropdown.shadow_root.ele(f'css:kat-option[value="{target_year}"]', timeout=5)
if not year_option:
logger.error(f"未找到年份选项: {target_year}")
page.ele('tag:body').click()
time.sleep(1)
return False
year_option.click()
time.sleep(2) # 等待 Select month 下拉框加载
logger.info(f"Select year 已切换为 {target_year}")
else:
logger.info(f"Select year 已是 {target_year}")
# 3. 选择月份(Select month,ID 为 #{年份}-month)
month_dropdown = page.ele(f'#{month_id}', timeout=10)
if not month_dropdown:
logger.error(f"未找到 Select month 下拉框: #{month_id}")
return False
if month_dropdown.attr('value') != target_value:
month_dropdown.shadow_root.ele('css:.select-header').click()
time.sleep(1)
month_option = month_dropdown.shadow_root.ele(f'css:kat-option[value="{target_value}"]', timeout=5)
if not month_option:
logger.error(f"未找到月份选项: {target_value}")
page.ele('tag:body').click()
time.sleep(1)
return False
month_option.click()
time.sleep(1)
logger.info(f"Select month 已切换为 {target_value}")
else:
logger.info(f"Select month 已是 {target_value}")
# 4. 二次验证
final_value = page.ele(f'#{month_id}').attr('value')
if final_value != target_value:
logger.error(f"月份选择验证失败!期望: {target_value},实际: {final_value}")
return False
# 5. 点击 Apply
apply_btn = page.ele('css:kat-button[data-test-id="RequiredFilterApplyButton"][label="Apply"]', timeout=10)
apply_btn.click()
logger.info("已点击 Apply 按钮")
# 6. 等待页面加载完成
page.wait.doc_loaded()
time.sleep(3)
logger.info("月报筛选条件已应用,页面加载完成")
return True
# ======================== 步骤3: 批量提交下载请求 ========================
def submit_download_request(page, site, date_type, week_start, week_end, wait_timeout=60):
"""
提交单站点的下载请求(不打开 Download Manager):
1. 切换国家
2. 选择筛选条件
3. 点 Generate Download → 弹窗中再点 Generate Download
4. 等待按钮变为 Open Download Manager(表示已进入后端队列)
5. 关闭弹窗(关键:不跳转 Manager,准备处理下一个站点)
:param page: 数据页的页面对象
:param site: 站点代码,如 "US","UK"
:param date_type: 'week' 或 'month'
:param week_start: 周期开始日期
:param week_end: 周期结束日期
:param wait_timeout: 等待按钮变为 Open Download Manager 的超时时间(秒)
:return: True=提交成功, False=失败
"""
logger.info(f"[{site}] === 提交下载请求 ===")
# 1. 切换国家
if not select_country(page, site):
logger.error(f"[{site}] 国家切换失败")
return False
# 2. 选筛选条件
if date_type == 'month':
filter_ok = select_filters_and_query_monthly(page, week_end)
else:
filter_ok = select_filters_and_query(page, week_start, week_end)
if not filter_ok:
logger.error(f"[{site}] 筛选条件设置失败")
return False
# 等待 Apply 后数据渲染
time.sleep(2)
# 3. 点击页面上的 Generate Download 按钮
generate_btn = page.ele('#GenerateDownloadButton', timeout=10)
if not generate_btn:
logger.error(f"[{site}] 未找到 Generate Download 按钮")
return False
generate_btn.click()
logger.info(f"[{site}] 已点击 Generate Download 按钮")
time.sleep(3)
# 4. 弹窗出现,定位弹窗按钮
modal_btn = page.ele('#downloadModalGenerateDownloadButton', timeout=10)
if not modal_btn:
logger.error(f"[{site}] 未找到弹窗中的 Generate Download 按钮")
return False
# 等弹窗动画结束
time.sleep(2)
current_label = modal_btn.attr('label')
logger.info(f"[{site}] 弹窗按钮 label: {current_label}")
# 5. 点击弹窗 Generate Download(如果不是已就绪状态)
if current_label != 'Open Download Manager':
modal_btn.click()
logger.info(f"[{site}] 已点击弹窗 Generate Download")
time.sleep(2)
# 6. 等待按钮变为 Open Download Manager(表示已进队列)
waited = 0
interval = 3
submitted = False
while waited < wait_timeout:
time.sleep(interval)
waited += interval
btn = page.ele('#downloadModalGenerateDownloadButton', timeout=2)
if btn and btn.attr('label') == 'Open Download Manager':
logger.info(f"[{site}] 按钮已变为 Open Download Manager,提交成功")
submitted = True
break
logger.debug(f"[{site}] 等待中,已 {waited}/{wait_timeout}s")
if not submitted:
logger.error(f"[{site}] 等待 Open Download Manager 超时,提交失败")
return False
else:
logger.info(f"[{site}] 弹窗按钮已是 Open Download Manager(可能上次提交残留),视为成功")
# 7. 关闭弹窗,准备处理下一个站点
try:
close_btn = page.ele('#downloadModalCloseButton', timeout=3)
if close_btn:
close_btn.click()
logger.debug(f"[{site}] 已关闭弹窗")
time.sleep(1)
except Exception as e:
logger.debug(f"[{site}] 关闭弹窗异常(可忽略): {e}")
return True
def open_download_manager(page):
"""
通过独立路由 /brand-analytics/download-manager 直接打开 Download Manager 标签页
跳过"再点 Generate → 弹窗 → 等 label 变 Open Download Manager → 点击跳转"的繁琐流程
:param page: 数据页的页面对象
:return: Download Manager 标签页对象,失败返回 None
"""
# 从 DATA_URL 提取域名,拼接 Download Manager 完整 URL
# 例: https://sellercentral.amazon.com/brand-analytics/download-manager
parsed = urlparse(DATA_URL)
manager_url = f"{parsed.scheme}://{parsed.netloc}/brand-analytics/download-manager"
try:
# 新开标签页访问 Download Manager
new_tab = page.new_tab(manager_url)
new_tab.wait.doc_loaded()
time.sleep(2) # 额外等待表格 DOM 渲染
logger.info(f"[open_manager] 已打开 Download Manager: {manager_url}")
return new_tab
except Exception as e:
logger.error(f"[open_manager] 打开 Download Manager 失败: {e}")
return None
# ======================== 步骤4: 在 Download Manager 批量等待并获取下载链接 ========================
def _download_from_row(tab, row, timeout=30):
"""
从指定行触发下载并拦截 report/download 请求,提取 preSignedUrl(内部辅助函数)
:param tab: Download Manager 标签页
:param row: 已匹配到的目标行元素
:param timeout: 等待网络响应的超时时间(秒)
:return: 下载链接 URL 或 None
"""
# 1. 启动网络监听
tab.listen.start('report/download')
# 2. 找下载按钮
download_btn = row.ele('css:.css-1oickjg', timeout=5)
if not download_btn:
logger.error("未找到 Download 按钮元素(.css-1oickjg)")
tab.listen.stop()
return None
# 3. 记录当前标签页,点击下载
tabs_before = tab.browser.tab_ids
download_btn.click()
logger.debug("已点击 Download 按钮,等待拦截 report/download 响应...")
# 4. 等待响应
resp = tab.listen.wait(timeout=timeout)
tab.listen.stop()
download_url = None
if not resp:
logger.error("未拦截到 report/download 请求")
else:
try:
download_url = resp.response.body.get('preSignedUrl')
if not download_url:
logger.error(f"响应体中未提取到 preSignedUrl: {resp.response.body}")
except Exception as e:
logger.error(f"提取 preSignedUrl 异常: {e}")
# 5. 关闭下载触发的新标签页(浏览器有时会因为 download 链接弹一个新 tab)
time.sleep(2)
tabs_after = tab.browser.tab_ids
new_tabs = [t for t in tabs_after if t not in tabs_before]
for t in new_tabs:
try:
tab.browser.get_tab(t).close()
logger.debug("已关闭下载触发的新标签页")
except Exception:
pass
return download_url
def get_download_urls_batch(tab, pending_tasks, max_wait_minutes=20, poll_interval=5):
"""
批量等待已提交的下载任务在 Download Manager 中变为 Ready,并依次取下载链接
工作原理:
- 已提交 N 个站点,前 N 行就是本次新提交的记录(按时间倒序,最新在最上)
- 不刷新页面(Manager 页面会自动更新 Action 列状态 Pending → Download)
- 每 poll_interval 秒重新读 DOM,扫描前 N 行
- 对每个 pending site 用 country + timeframe + dates 在前 N 行找匹配
- 找到且 Action 含 'Download' → 触发下载,从 pending 移除
- 找到但 Action 还是 Pending → 跳过,下轮再来
- 没找到匹配 → 当成"还没出现",下轮再来
- pending 空 或 超过 max_wait_minutes → 退出
:param tab: Download Manager 标签页
:param pending_tasks: 待处理任务列表,每项为 dict:
{'site': 'US', 'date_type': 'week', 'period_start': date, 'period_end': date}
:param max_wait_minutes: 总等待上限(分钟)
:param poll_interval: 轮询间隔(秒)
:return: dict {site: download_url 或 None}
"""
N = len(pending_tasks) # 扫描范围(固定不变)
results = {task['site']: None for task in pending_tasks}
pending = list(pending_tasks) # 副本,处理过的会被 remove
max_seconds = max_wait_minutes * 60
T0 = time.time()
logger.info(
f"开始批量等待 {N} 个站点 Action 就绪,扫描范围: 前 {N} 行,"
f"上限 {max_wait_minutes} 分钟,每 {poll_interval}s 检查一次"
)
# 首次进入循环前,等表格 DOM 稳定 + 新提交的 N 条全部出现
time.sleep(8)
while pending and (time.time() - T0) < max_seconds:
# 1. 读取当前所有行(不刷新页面,Manager 自动更新)
try:
rows = tab.eles('css:div[role="rowgroup"] div[role="row"]', timeout=10)
except Exception as e:
logger.warning(f"读取行元素异常: {e}")
rows = []
if not rows:
logger.warning("未读到任何数据行,等下轮重试")
else:
# 2. 只扫前 N 行(本次新提交的)
target_rows = rows[:N]
# 3. 遍历每个 pending site,在前 N 行里找属于自己的那一行
for task in pending[:]: # 用副本遍历方便 remove
site = task['site']
expected_country = SITE_DM_COUNTRY.get(site, site)
expected_time_frame = 'Monthly' if task['date_type'] == 'month' else 'Weekly'
ps = task['period_start']
pe = task['period_end']
start_formats = {
ps.strftime('%d/%m/%Y'),
ps.strftime('%m/%d/%Y'),
ps.strftime('%Y-%m-%d'),
}
end_formats = {
pe.strftime('%d/%m/%Y'),
pe.strftime('%m/%d/%Y'),
pe.strftime('%Y-%m-%d'),
}
matched_row = None
matched_action = None
for row in target_rows:
try:
cells = row.eles('css:div')
except Exception:
continue
if len(cells) < 8:
continue
try:
row_country = cells[1].text.strip()
row_time_frame = cells[2].text.strip()
row_start = cells[3].text.strip()
row_end = cells[4].text.strip()
row_action = cells[7].text.strip()
except Exception:
continue
if (row_country == expected_country and
row_time_frame == expected_time_frame and
row_start in start_formats and
row_end in end_formats):
matched_row = row
matched_action = row_action
break # country 是唯一识别,找到自己的行就停
if matched_row is None:
logger.debug(f"[{site}] 前 {N} 行无匹配,下轮再扫")
continue
# 4. 检查 Action 列状态
if 'Download' in matched_action:
logger.info(f"[{site}] Action 已就绪 ('{matched_action}'),触发下载")
url = _download_from_row(tab, matched_row)
if url:
results[site] = url
logger.success(f"[{site}] 下载链接获取成功: {url}")
else:
logger.error(f"[{site}] 下载触发失败,放弃该站点(外层会重试)")
pending.remove(task)
else:
logger.debug(f"[{site}] 状态: {matched_action},下轮再来")
# 5. 判断退出
if not pending:
logger.success(f"所有站点已完成,总耗时 {int(time.time() - T0)}s")
break
# 6. 睡眠 poll_interval,进入下一轮
elapsed = int(time.time() - T0)
logger.info(
f"剩余 pending: {[t['site'] for t in pending]},已等待 {elapsed}s / {max_seconds}s"
)
time.sleep(poll_interval)
# 兜底逻辑:如果实测发现 Manager 页面不会自动更新 Action 列状态,
# 解开下面三行注释,改为每轮刷新页面.
# try:
# tab.refresh()
# tab.wait.doc_loaded()
# time.sleep(2)
# except Exception as e:
# logger.warning(f"刷新失败: {e}")
# 循环结束,标记未完成的任务
if pending:
for task in pending:
logger.error(f"[{task['site']}] 等待 {max_wait_minutes} 分钟超时,未获取下载链接")
return results
# ======================== 消息通知 ========================
def send_notification(event, **kwargs):
"""
通用消息通知函数,根据事件类型组装消息
:param event: 事件类型
:param kwargs: 事件相关参数
事件类型:
- login_failed: 登录失败(需要人工介入)
- task_summary: 最终任务汇总(所有重试结束后只发一次)
"""
period_type = "周报" if kwargs.get('date_type') == 'week' else "月报"
date_info = kwargs.get('date_info', '')
week_start = kwargs.get('week_start', '')
week_end = kwargs.get('week_end', '')
now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if event == 'login_failed':
sites = kwargs.get('sites', [])
msg = (
f"【ABA {period_type} - 登录失败】\n"
f"站点: {', '.join(sites) if sites else '未知'}\n"
f"周期: {date_info} ({week_start} ~ {week_end})\n"
f"原因: 未检测到登录状态,请手动登录后重试\n"
f"时间: {now}"
)
elif event == 'task_summary':
success_sites = kwargs.get('success_sites', [])
failed_sites = kwargs.get('failed_sites', [])
attempt = kwargs.get('attempt', 0)
status = "全部成功" if not failed_sites else "部分失败"
retry_info = f"(第 {attempt + 1} 次执行)" if attempt > 0 else ""
msg = (
f"【ABA {period_type} - 任务汇总】{retry_info}\n"
f"状态: {status}\n"
f"周期: {date_info} ({week_start} ~ {week_end})\n"
f"成功: {', '.join(success_sites) if success_sites else '无'}\n"
f"失败: {', '.join(failed_sites) if failed_sites else '无'}\n"
f"时间: {now}"
)
else:
msg = f"【ABA 通知】未知事件: {event},参数: {kwargs}"
logger.info(f"通知消息:\n{msg}")
user = ['hejiangming', 'fangxingjun']
title = f"ABA {period_type} 周期 {date_info} 通知"
send_wx_msg(user, title, content=msg)
def send_wx_msg(users: list, title: str, content: str, msgtype: str = "textcard"):
"""
:param users: 填写需要推送的微信用户名list
:param title: 推送的标题(如果msgtype采用markdown形式,则不附带标题)
:param content: 推送的主体内容
:param msgtype: 推送的消息类型(textcard:默认卡片类型;markdown:markdaown结构)
"""
if users is not None:
accounts = ",".join(users)
# 排除users_list=[''] 无需发送
if bool(accounts):
host = "http://120.79.147.190:8080"
url = f'{host}/soundasia_selection/dolphinScheduler/sendMessage'
data = {
'account': accounts,
'title': title,
'content': content,
'msgtype': msgtype
}
try:
requests.post(url=url, data=data, timeout=15)
except:
pass
return True
# ======================== 工具函数 ========================
def extract_filename(url):
"""
从预签名 URL 中提取文件名
优先从 response-content-disposition 参数中提取 filename
"""
parsed = urlparse(url)
params = parse_qs(parsed.query)
disposition = params.get("response-content-disposition", [""])[0]
match = re.search(r"filename[*]?=['\"]?(?:UTF-8'')?([^;\"']+)", unquote(disposition))
if match:
return match.group(1)
return os.path.basename(parsed.path)
# ======================== 保存下载记录到数据库 ========================
def save_download_record(site, date_type, week_start, week_end, date_info, download_url):
"""
将下载记录保存到 MySQL
:param site: 站点代码,如 "DE", "UK"
:param date_type: 日期类型,如 "week", "month"
:param week_start: 目标周开始日期
:param week_end: 目标周结束日期
:param date_info: 周期标识,周报如 "2026-11",月报如 "2026-02"
:param download_url: 预签名下载链接
:return: True=保存成功, False=失败
"""
try:
engine = get_remote_engine(site_name='us', db_type='mysql')
site_name = site.lower()
country_name = SITE_COUNTRY_NAME.get(site, site)
date_start = week_start.isoformat()
date_end = week_end.isoformat()
file_name = extract_filename(download_url)
hex_url = download_url.encode().hex()
state = 1
final_sql = (
f"INSERT INTO aba_report_download "
f"(site_name, date_type, date_info, country_name, date_start, date_end, "
f"file_name, download_url, state, created_time, updated_time) VALUES "
f"('{site_name}', '{date_type}', '{date_info}', '{country_name}', "
f"'{date_start}', '{date_end}', '{file_name}', UNHEX('{hex_url}'), "
f"{state}, NOW(), NOW()) "
f"ON DUPLICATE KEY UPDATE "
f"download_url=UNHEX('{hex_url}'), file_name='{file_name}', "
f"state={state}, updated_time=NOW()"
)
with engine.begin() as conn:
conn.execute(final_sql)
logger.info(f"[{site}] 下载记录已保存到数据库")
return True
except Exception as e:
logger.error(f"[{site}] 保存数据库失败: {e}")
return False
def get_already_downloaded_sites(sites, date_type, date_info):
"""
查询 aba_report_download 表,返回该周期内已有下载记录的站点列表
只判断记录是否存在,不看 state(state 是流转状态: 1未开始/2下载中/3下载完成/4同步完成,会变化)
记录存在 = 当时已成功拿到下载链接并写库 = 不需要重复下载
用途:任务计划程序在 2/3/4 号多次触发月报时,已下载成功的周期直接跳过,避免重复下载
:param sites: 站点代码列表,如 ["US"] 或 ["UK", "DE"]
:param date_type: 日期类型,"week" 或 "month"
:param date_info: 周期标识,周报如 "2026-11",月报如 "2026-02"
:return: 已有记录的站点列表(大写),如 ["US"];查询失败返回 [](照常执行,宁可重复也不漏)
"""
site_names = ','.join(f"'{s.lower()}'" for s in sites)
sql = f"""
SELECT site_name FROM aba_report_download
WHERE date_type = '{date_type}'
AND date_info = '{date_info}'
AND site_name IN ({site_names})
"""
try:
engine = get_remote_engine(site_name='us', db_type='mysql')
df = engine.read_sql(sql)
return [s.upper() for s in df['site_name'].tolist()] if not df.empty else []
except Exception as e:
logger.error(f"查询已下载站点异常: {e}")
return []
# ======================== 推送接口 ========================
def push_to_api(download_infos, date_type, date_info, week_start):
"""
将下载成功的站点信息推送到接口
:param download_infos: 下载信息列表,每项包含 site, download_url, file_name
:param date_type: 日期类型,"week" 或 "month"
:param date_info: 周期标识,如 "2026-11"
:param week_start: 起始日期
"""
api_url = "https://selection.yswg.com.cn/soundasia_selection/bigfile/importByUrl"
type_map = {"week": "1", "month": "2"}
payload = []
for info in download_infos:
payload.append({
"fileUrl": info['download_url'],
"fileName": info['file_name'],
"site": info['site'],
"type": type_map.get(date_type, "1"),
"dateInfo": str(date_info),
"startDate": str(week_start),
})
try:
import json
headers = {"Content-Type": "application/json"}
resp = requests.post(api_url, data=json.dumps(payload), headers=headers, timeout=30)
logger.info(f"接口推送响应{resp.text}")
if resp.status_code == 200:
logger.success(f"接口推送成功,共 {len(payload)} 条记录")
else:
logger.error(f"接口推送失败,状态码: {resp.status_code},响应: {resp.text}")
except Exception as e:
logger.error(f"接口推送异常: {e}")
# ======================== 主流程 ========================
def run_weekly_download(sites, date_type, week_start, week_end, date_info):
"""
打开数据页(一次) → 检查登录(一次) → 三阶段执行:
阶段 1: 遍历 sites 批量提交下载请求(不打开 Manager)
阶段 2: 打开 Download Manager,批量等待所有 Action 就绪,依次取链接
阶段 3: 保存数据库 + 返回结果
:param sites: 站点代码列表,如 ["US"] 或 ["UK", "DE", "ES", "FR", "IT", "NL"]
:param date_type: 日期类型,"week" 或 "month"
:param week_start: 目标周开始日期
:param week_end: 目标周结束日期
:param date_info: 周期标识,周报如 "2026-11",月报如 "2026-02"
:return: (success_sites, failed_sites, success_download_infos) 或 False(浏览器/登录失败)
"""
# 打开数据页,失败则关闭浏览器重试,最多 3 次
page = None
max_open_retries = 3
for open_attempt in range(1, max_open_retries + 1):
ensure_chrome_closed()
page = create_browser()
if not page:
logger.error(f"浏览器启动失败(第 {open_attempt}/{max_open_retries} 次)")
continue
navigate_to_data_page(page)
if check_login_status(page):
logger.info(f"数据页访问成功(第 {open_attempt} 次)")
break
logger.warning(f"数据页访问失败(第 {open_attempt}/{max_open_retries} 次),关闭浏览器重试...")
try:
page.quit()
except Exception:
pass
page = None
time.sleep(5)
if not page:
logger.error("多次重试后仍无法访问数据页,判定为登录失效")
send_notification('login_failed', date_type=date_type, date_info=date_info, week_start=week_start,
week_end=week_end, sites=sites)
return False
try:
# 确保页面语言为英文(只一次)
if not ensure_english_language(page):
logger.error("语言切换失败,可能导致后续元素定位异常")
return False
# ============ 阶段 1: 批量提交所有站点的下载请求 ============
logger.info("=" * 60)
logger.info(f"阶段 1: 批量提交 {len(sites)} 个站点的下载请求")
logger.info(f"目标周期: {week_start} ~ {week_end} ({date_type})")
logger.info("=" * 60)
submitted_sites = []
failed_submit_sites = []
for site in sites:
try:
if submit_download_request(page, site, date_type, week_start, week_end):
submitted_sites.append(site)
logger.success(f"[{site}] 提交成功")
else:
failed_submit_sites.append(site)
logger.error(f"[{site}] 提交失败")
except Exception as e:
logger.error(f"[{site}] 提交异常: {e}")
failed_submit_sites.append(site)
if not submitted_sites:
logger.error("所有站点提交均失败,流程终止")
return [], list(sites), []
logger.info(f"提交成功 {len(submitted_sites)} 个: {submitted_sites}")
if failed_submit_sites:
logger.warning(f"提交失败 {len(failed_submit_sites)} 个: {failed_submit_sites}")
# ============ 阶段 2: 打开 Download Manager 批量等待 ============
logger.info("=" * 60)
logger.info(f"阶段 2: 打开 Download Manager,批量等待 {len(submitted_sites)} 个站点 Action 就绪")
logger.info("=" * 60)
manager_tab = open_download_manager(page)
if not manager_tab:
logger.error("未能打开 Download Manager,所有已提交站点视为失败")
return [], list(sites), []
pending_tasks = [
{
'site': site,
'date_type': date_type,
'period_start': week_start,
'period_end': week_end,
}
for site in submitted_sites
]
results = get_download_urls_batch(
manager_tab, pending_tasks, max_wait_minutes=20, poll_interval=5
)
# 关闭 Manager 标签页
try:
manager_tab.close()
except Exception:
pass
# ============ 阶段 3: 处理结果(存库 + 整理返回) ============
logger.info("=" * 60)
logger.info("阶段 3: 处理下载结果")
logger.info("=" * 60)
success_sites = []
failed_sites = list(failed_submit_sites) # 提交失败的算作失败
success_download_infos = []
for site, url in results.items():
if url:
success_sites.append(site)
save_download_record(site, date_type, week_start, week_end, date_info, url)
success_download_infos.append({
'site': site.lower(),
'download_url': url,
'file_name': extract_filename(url),
})
else:
failed_sites.append(site)
if failed_sites:
logger.warning(f"以下站点下载失败: {', '.join(failed_sites)}")
else:
logger.success(f"所有站点下载完成: {', '.join(sites)}")
return success_sites, failed_sites, success_download_infos
finally:
try:
page.quit()
logger.info("浏览器已关闭")
except Exception:
pass
# ======================== 带重试的任务包装 ========================
def weekly_task_with_retry(sites, date_type='week'):
"""
定时任务入口(由 APScheduler 触发,或手动测试调用)
执行逻辑:
1. 首次执行 run_weekly_download()
2. 成功 → 结束,等下次触发
3. 失败(登录失效/数据未更新等) → 推迟一天再试,最多重试 MAX_RETRY_DAYS 天
:param sites: 站点代码列表,如 ["US"] 或 ["UK", "DE", "ES", "FR", "IT"]
:param date_type: 日期类型,周任务传 "week",月任务传 "month"
注意:重试期间会阻塞当前任务,但不会影响调度器中的其他任务
因为 APScheduler 默认每个任务在独立线程中执行
"""
weekday_names = ['周一', '周二', '周三', '周四', '周五', '周六', '周日']
# 首次执行时从数据库获取目标周期,重试时复用同一个周期
if date_type == 'month':
week_start, week_end, date_info = get_last_month_range()
else:
week_start, week_end, date_info = get_last_week_range()
if not week_start or not week_end:
logger.error(f"获取{'上月' if date_type == 'month' else '上周'}日期范围失败,终止执行")
return
# 检查本周期是否已有下载记录,已成功的站点跳过
# 场景:任务计划程序 2/3/4 号多次触发月报,3 号成功后 4 号触发时直接退出
done_sites = get_already_downloaded_sites(sites, date_type, date_info)
remaining_sites = [s for s in sites if s not in done_sites] # 待下载站点,重试时只包含失败的
if not remaining_sites:
logger.success(f"周期 {date_info} 所有站点已有下载记录({', '.join(done_sites)}),本次跳过执行")
return
if done_sites:
logger.info(f"周期 {date_info} 已下载站点: {', '.join(done_sites)},本次只处理: {', '.join(remaining_sites)}")
logger.info(f"待下载站点: {', '.join(remaining_sites)}")
all_success_sites = [] # 累计所有成功的站点
all_download_infos = [] # 累计所有成功站点的下载信息,用于推送接口
for attempt in range(MAX_RETRY_DAYS + 1):
today_name = weekday_names[datetime.now().weekday()]
if attempt == 0:
logger.info(f"首次执行(当前: {today_name}),站点: {', '.join(remaining_sites)}")
else:
logger.info(f"第 {attempt} 次重试(当前: {today_name}),站点: {', '.join(remaining_sites)}")
result = run_weekly_download(remaining_sites, date_type, week_start, week_end, date_info)
# run_weekly_download 返回 (success_sites, failed_sites, success_download_infos) 或 False(浏览器/登录失败)
if result is False:
# 登录/浏览器失败,login_failed 通知已在内部发送,这里不重复发汇总
failed_sites = list(remaining_sites)
else:
success_sites, failed_sites, success_download_infos = result
all_success_sites.extend(success_sites)
all_download_infos.extend(success_download_infos)
remaining_sites = failed_sites # 下次只重试失败的站点
# 发送任务汇总通知(每次执行都发,不管成功失败)
send_notification('task_summary', date_type=date_type, date_info=date_info,
week_start=week_start, week_end=week_end,
success_sites=all_success_sites, failed_sites=failed_sites, attempt=attempt)
# 全部成功 → 推送接口 → 结束
if not failed_sites:
logger.success("所有站点数据下载成功")
push_to_api(all_download_infos, date_type, date_info, week_start)
return
# 最后一次机会也失败了 → 推送已成功的部分 → 放弃
if attempt >= MAX_RETRY_DAYS:
logger.error(f"已重试 {MAX_RETRY_DAYS} 次仍失败,放弃本周期")
if all_download_infos:
push_to_api(all_download_infos, date_type, date_info, week_start)
return
# 还有机会,等待指定间隔后重试
sleep_seconds = RETRY_INTERVAL_MINUTES * 60
next_retry_time = datetime.now() + timedelta(minutes=RETRY_INTERVAL_MINUTES)
logger.warning(
f"本次失败站点: {', '.join(remaining_sites)},"
f"将在 {next_retry_time.strftime('%Y-%m-%d %H:%M')} 重试"
f"(已尝试 {attempt + 1}/{MAX_RETRY_DAYS + 1} 次)"
)
time.sleep(sleep_seconds)
# ======================== 调度器 ========================
def start(sites):
"""
启动 APScheduler 定时调度器
:param sites: 站点代码列表,如 ["US"] 或 ["UK", "DE", "ES", "FR", "IT"]
APScheduler 核心概念:
- BlockingScheduler: 阻塞式调度器,启动后主线程就一直等待,适合脚本独占运行
- CronTrigger: 类似 Linux crontab 的触发器,可以指定 星期几/几号/几点 执行
- add_job(函数, 触发器): 注册一个定时任务
CronTrigger 常用参数:
- day_of_week: 星期几触发,'sun'=周日, 'mon'=周一, '0'=周一, '6'=周日
- day: 每月几号触发,1=1号, 'last'=最后一天
- hour/minute: 几点几分触发
- month: 几月触发,不写就是每月
添加新任务示例:
# 每月1号 09:00 执行月报下载
# scheduler.add_job(
# monthly_task_with_retry, # 要执行的函数
# CronTrigger(day=1, hour=9, minute=0), # 每月1号 09:00
# id='monthly_report', # 任务唯一ID,用于管理(暂停/删除等)
# name='月报下载',
# args=[sites]
# )
"""
scheduler = BlockingScheduler()
# --- 每周任务:周日 23:00 执行周报下载 ---
scheduler.add_job(
weekly_task_with_retry,
CronTrigger(day_of_week=WEEKLY_DAY_OF_WEEK, hour=WEEKLY_HOUR, minute=WEEKLY_MINUTE),
args=[sites, 'week'],
id='weekly_aba_report',
name='ABA周报下载',
misfire_grace_time=None
)
# --- 每月任务:每月1号 23:00 执行月报下载 ---
scheduler.add_job(
weekly_task_with_retry,
CronTrigger(day=MONTHLY_DAY, hour=MONTHLY_HOUR, minute=MONTHLY_MINUTE),
args=[sites, 'month'],
id='monthly_aba_report',
name='ABA月报下载',
misfire_grace_time=None
)
logger.info(f"定时调度器启动,站点: {', '.join(sites)},已注册任务:")
for job in scheduler.get_jobs():
logger.info(f" - {job.name} (ID: {job.id}, 触发器: {job.trigger})")
# 启动调度器(阻塞主线程,程序会一直运行等待触发)
scheduler.start()
def manual_download(sites, date_type, start_date_str, end_date_str, date_info):
"""
手动指定日期范围导出,用于补导历史漏掉的周期
:param sites: 站点代码列表,如 ["US"] 或 ["UK", "DE"]
:param date_type: 日期类型,如 "week", "month"
:param start_date_str: 开始日期,格式 'YYYY-MM-DD',比如 '2026-03-01'
:param end_date_str: 结束日期,格式 'YYYY-MM-DD',比如 '2026-03-07'
:param date_info: 周期标识,周报如 "2026-11",月报如 "2026-02"
用法示例(在 __main__ 里调用):
manual_download(sites, 'week', '2026-03-01', '2026-03-07', '2026-10')
manual_download(sites, 'month', '2026-02-01', '2026-02-28', '2026-02')
"""
week_start = date.fromisoformat(start_date_str)
week_end = date.fromisoformat(end_date_str)
logger.info(
f"手动导出模式({date_type}),站点: {', '.join(sites)},目标周期: {week_start} ~ {week_end} (date_info: {date_info})")
remaining_sites = list(sites) # 待下载站点,重试时只包含失败的
all_success_sites = [] # 累计所有成功的站点
all_download_infos = [] # 累计所有成功站点的下载信息,用于推送接口
result = run_weekly_download(remaining_sites, date_type, week_start, week_end, date_info)
if result is False:
# 登录/浏览器失败,login_failed 通知已在内部发送
failed_sites = list(sites)
else:
success_sites, failed_sites, success_download_infos = result
all_success_sites.extend(success_sites)
all_download_infos.extend(success_download_infos)
# 发送任务汇总通知
send_notification('task_summary', date_type=date_type, date_info=date_info,
week_start=week_start, week_end=week_end,
success_sites=all_success_sites, failed_sites=failed_sites, attempt=0)
# 推送已成功的部分到接口
if all_download_infos:
push_to_api(all_download_infos, date_type, date_info, week_start)
if not failed_sites:
logger.success("所有站点数据下载成功")
else:
logger.warning(f"手动导出部分失败,失败站点: {', '.join(failed_sites)}")
if __name__ == "__main__":
logger.add(
"logs/aba_report_{time:YYYY-MM-DD}.log",
rotation="1 day",
retention="30 days",
encoding="utf-8",
level="INFO"
)
# 站点列表从 config.py 中读取,部署时改 config.py 即可
import sys
# python aba_report.py week → 执行周报
# python aba_report.py month → 执行月报
# python aba_report.py → 不带参数,走下面原有逻辑
if len(sys.argv) > 1:
task_type = sys.argv[1]
if task_type == 'week':
weekly_task_with_retry(SITES, 'week')
elif task_type == 'month':
weekly_task_with_retry(SITES, 'month')
else:
logger.error(f"未知的任务类型: {task_type},请使用 week 或 month")
else:
# --- 手动补导历史数据(指定日期范围) ---
# manual_download(SITES, 'week', '2026-03-08', '2026-03-14', '2026-11')
# manual_download(SITES, 'month', '2026-02-01', '2026-02-28', '2026-02')
# --- 手动执行当前周期(带重试) ---
weekly_task_with_retry(SITES, 'week')
# weekly_task_with_retry(SITES, 'month')
# --- 启动定时调度(每周日自动执行) ---
# start(SITES)
# -*- coding: utf-8 -*-
"""
ABA Report 配置文件
部署时只需修改 DATA_URL 和 SITES
"""
# ======================== 定时任务配置 ========================
# 每周任务:周日 23:00 执行(day_of_week: 'sun'=周日, 'mon'=周一, 'tue', 'wed', 'thu', 'fri', 'sat')
WEEKLY_DAY_OF_WEEK = 'sun'
WEEKLY_HOUR = 18
WEEKLY_MINUTE = 0
# 每月任务:每月1号 22:00 执行(比周任务 23:00 早一小时,避免冲突)
MONTHLY_DAY = 1
MONTHLY_HOUR = 22
MONTHLY_MINUTE = 0
# 失败后最多重试次数(周和月通用)
MAX_RETRY_DAYS = 5
# 每次重试间隔(分钟)
RETRY_INTERVAL_MINUTES = 30
# ======================== 部署配置========================
# 电脑A:
DATA_URL = "https://sellercentral.amazon.com/brand-analytics/dashboard/top-search-terms"
SITES = ["US"]
# 电脑B:
# DATA_URL = "https://sellercentral.amazon.co.uk/brand-analytics/dashboard/top-search-terms"
# SITES = ["UK", "DE", "ES", "FR", "IT"]
# ======================== 站点映射配置 ========================
# 站点代码 → countryPicker 下拉框的 value(小写,用于页面切换国家)
SITE_PICKER_MAP = {
"US": "us",
"UK": "gb",
"DE": "de",
"ES": "es",
"FR": "fr",
"IT": "it",
"NL": "nl",
}
# 站点代码 → 国家英文全称(用于存数据库 country_name 字段)
SITE_COUNTRY_NAME = {
"US": "United States",
"UK": "United Kingdom",
"DE": "Germany",
"ES": "Spain",
"FR": "France",
"IT": "Italy",
"NL": "Netherlands",
}
# 站点代码 → Download Manager 页面 Country 列显示的国家缩写(大写)
SITE_DM_COUNTRY = {
"US": "US",
"UK": "GB",
"DE": "DE",
"ES": "ES",
"FR": "FR",
"IT": "IT",
"NL": "NL",
}
import json
import pandas as pd
import numpy as np
import orjson, requests, time
from typing import List
# -------- 映射字典 --------
site_name_db_dict = {
"us": "selection",
"uk": "selection_uk",
"de": "selection_de",
"es": "selection_es",
"fr": "selection_fr",
"it": "selection_it",
}
db_type_alias_map = {
"mysql": "mysql", # 阿里云mysql
"postgresql_14": "postgresql_14", # pg14爬虫库-内网
"postgresql_14_outer": "postgresql_14_outer", # pg14爬虫库-外网
"postgresql_15": "postgresql_15", # pg15正式库-内网
"postgresql_15_outer": "postgresql_15_outer", # pg15正式库-外网
"postgresql_cluster": "postgresql_cluster", # pg集群-内网
"postgresql_cluster_outer": "postgresql_cluster_outer", # pg集群-外网
"doris": "doris", # doris集群-内网
}
DEFAULT_SERVERS = [
# "http://192.168.200.210:7777", # 内网
# "http://192.168.10.217:7777", # 内网-h7
# "http://61.145.136.61:7777", # 外网
# "http://61.145.136.61:7779", # 外网
"http://61.145.136.61:7780", # 外网
]
# ---------------------------
def df_to_json_records(df: pd.DataFrame) -> list:
"""保证 DataFrame 可安全序列化为 JSON records(处理 NaN / ±Inf)"""
df_clean = df.copy()
# 1️⃣ 替换 ±Inf -> NaN
num_cols = df_clean.select_dtypes(include=[np.number]).columns
if len(num_cols):
df_clean[num_cols] = df_clean[num_cols].replace([np.inf, -np.inf], np.nan)
# 2️⃣ 替换 NaN -> None(注意:有时 astype(object) 不彻底,需用 applymap)
df_clean = df_clean.applymap(lambda x: None if pd.isna(x) else x)
# 3️⃣ 转为 dict records
return df_clean.to_dict("records")
def clean_json_field_for_orjson(v):
"""清洗单个 JSON 字段的值,使其符合 orjson 要求并避免空字典入库"""
if v is None or pd.isna(v):
return None
# 1️⃣ 如果是空字典对象,返回 None
if isinstance(v, dict) and not v:
return None
# 2️⃣ 如果是空字符串或仅为 "{}",返回 None
if isinstance(v, str):
stripped = v.strip()
if not stripped or stripped == "{}":
return None
try:
parsed = json.loads(stripped)
if isinstance(parsed, dict) and not parsed:
return None
return json.dumps(parsed, ensure_ascii=False)
except Exception:
return v # 非 JSON 字符串则原样保留
return v
def fully_clean_for_orjson(df: pd.DataFrame) -> pd.DataFrame:
"""全面清洗 DataFrame 以符合 orjson 要求"""
df = df.replace([np.inf, -np.inf], np.nan)
df = df.applymap(lambda x: None if pd.isna(x) else x)
# 找出所有可能为 JSON 字符串的字段
json_like_cols = [col for col in df.columns if col.endswith('_json')]
# 针对每个 JSON-like 字段,应用清洗函数
for col in json_like_cols:
df[col] = df[col].apply(clean_json_field_for_orjson)
return df
class RemoteTransaction:
def __init__(self, db: str, database: str,
session: requests.Session, urls: List[str]):
self.db = db
self.database = database
self.session = session
self.urls = urls
self.sql_queue = []
# def execute(self, sql: str):
# self.sql_queue.append(sql)
def execute(self, sql: str, params=None):
"""
params 可取:
• None → 纯文本 SQL
• dict → 单条参数化 e.g. {"id":1,"name":"a"}
• list/tuple → 批量 executemany
- list[dict] ↔ INSERT .. VALUES (:id,:name)
- list[tuple] ↔ INSERT .. VALUES (%s,%s)
"""
self.sql_queue.append({"sql": sql, "params": params})
def __enter__(self): return self
def __exit__(self, exc_type, exc, tb):
for url in self.urls:
try:
self.session.post(
url + "/transaction",
json={"db_type": self.db,
"sql_list": self.sql_queue,
"database": self.database}, # site_name not needed on server, kept for clarity
timeout=3000,
).raise_for_status()
return
except Exception as e:
print(f"[WARN] 事务失败 {url}: {e}")
raise RuntimeError("All servers failed for transaction")
class RemoteEngine:
def __init__(self, db: str, database: str,
server_urls: List[str], retries: int = 2):
"""
:param db: db_type--数据库类型
:param database: 数据库名称
:param server_urls:
:param retries:
"""
self.db = db
self.database = database
self.urls = [u.rstrip("/") for u in server_urls]
self.session = requests.Session()
self.retries = retries
def _request(self, endpoint: str, payload):
for url in self.urls:
for _ in range(self.retries):
try:
json_bytes = orjson.dumps(payload)
r = self.session.post(f"{url}/{endpoint}",
data=json_bytes,
headers={"Content-Type": "application/json"},
timeout=3000)
# r = self.session.post(f"{url}/{endpoint}",
# json=payload, timeout=10)
r.raise_for_status()
return r.json()
except Exception as e:
print(f"[WARN] {endpoint} fail @ {url}: {e}")
time.sleep(1)
raise RuntimeError(f"All servers failed for {endpoint}")
# def _request(self, endpoint: str, payload):
# # 用 orjson,“allow_nan” 会把 NaN/Inf 写成 null
# # json_bytes = orjson.dumps(payload,
# # option=orjson.OPT_NON_STR_KEYS | orjson.OPT_NAIVE_UTC | orjson.OPT_OMIT_MICROSECOND | orjson.OPT_ALLOW_INF_AND_NAN)
# json_bytes = orjson.dumps(
# payload,
# option=orjson.OPT_NON_STR_KEYS | orjson.OPT_NAIVE_UTC | orjson.OPT_ALLOW_INF_AND_NAN
# )
#
# headers = {"Content-Type": "application/json"}
#
# for url in self.urls:
# for _ in range(self.retries):
# try:
# r = self.session.post(f"{url}/{endpoint}",
# data=json_bytes, headers=headers,
# timeout=15)
# r.raise_for_status()
# return r.json()
# except Exception as e:
# print(f"[WARN] {endpoint} fail @ {url}: {e}")
# time.sleep(1)
# raise RuntimeError(f"All servers failed for {endpoint}")
# ---------- 公共 API ----------
def read_sql(self, sql: str) -> pd.DataFrame:
data = self._request("query",
{"db_type": self.db,
"sql": sql,
"database": self.database})
return pd.DataFrame(data["result"])
def to_sql(self, df: pd.DataFrame, table: str, if_exists="append"):
return self._request("insert",
{"db_type": self.db,
"table": table,
"if_exists": if_exists,
"data": fully_clean_for_orjson(df=df).to_dict("records"),
# "data": df_to_json_records(df), # ← 清洗后的 records
"database": self.database})
def read_then_update(
self,
select_sql: str,
update_table: str,
set_values: dict,
where_keys: List[str],
error_if_empty: bool = False,
):
"""
动态生成 UPDATE:把 select_sql 读到的行,按 where_keys 精准更新 set_values
返回 (DataFrame, rows_updated)
"""
payload = {
"db_type": self.db,
"database": self.database,
"select_sql": select_sql,
"update_table": update_table,
"set_values": set_values,
"where_keys": where_keys,
"error_if_empty": error_if_empty,
}
resp = self._request("read_then_update", payload)
df = pd.DataFrame(resp["read_result"])
rows_updated = resp.get("rows_updated", 0)
return df
def begin(self):
return RemoteTransaction(self.db, self.database,
self.session, self.urls)
# ---------------------------------
# db -- 数据库类型
# database -- 站点
def get_remote_engine(site_name: str, db_type: str, database: str = None,
servers: List[str] = None) -> RemoteEngine:
"""
:param site_name: 站点
:param db_type: 数据库类型
:param database: 数据库名称-默认为None, 否则通过站点来匹配
:param servers: 服务器url地址
:return: 数据库连接对象
"""
if not database:
database = site_name_db_dict[site_name]
if site_name not in site_name_db_dict:
raise ValueError(f"Unknown site_name: {site_name}")
if db_type not in db_type_alias_map:
raise ValueError(f"Unknown db_type: {db_type}")
# print(f"db_type: {db_type_alias_map[db_type]}, database: {database}")
return RemoteEngine(
db=db_type_alias_map[db_type],
database=database,
server_urls=servers or DEFAULT_SERVERS,
)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment