Commit 4c8ae1db by Peng

update threading_spider scripts

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
parent 42358832
...@@ -4,8 +4,8 @@ from datetime import datetime, timedelta ...@@ -4,8 +4,8 @@ from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from multiprocessing import Pool from multiprocessing import Pool
from amazon_spider.merchantwords_search_term_pg6 import search_temp_merchantwords from amazon_spider.merchant_serch_term_pg15 import search_temp_pg
from threading_spider.db_connectivity import connect_db from utils.db_connect import BaseUtils
import time import time
import pandas as pd import pandas as pd
import random import random
...@@ -14,12 +14,14 @@ from amazon_spider.VPS_IP import pppoe_ip ...@@ -14,12 +14,14 @@ from amazon_spider.VPS_IP import pppoe_ip
def select_sate_mysql(site, num=None): def select_sate_mysql(site, num=None):
db_class = connect_db(site) db_class = BaseUtils(site)
engine_pg6 = db_class.pg_21_db() # pg engine_pg6 = db_class.pg_connect_6() # pg
if num == 3: if num == 3:
sql_read = f'SELECT id FROM {site}_merchantwords_search_term_month_syn_2024 where state in (1,2) LIMIT 1' sql_read = f'SELECT id FROM {site}_merchantwords_search_term_month_syn_2026 where state in (1,2) LIMIT 1'
print(sql_read) print(sql_read)
df = pd.read_sql(sql_read, con=engine_pg6) # df = pd.read_sql(sql_read, con=engine_pg6)
df = engine_pg6.read_sql(sql_read)
if not df.empty:
id_tuple = list(df.id) id_tuple = list(df.id)
if id_tuple: if id_tuple:
id_tuple = [1] id_tuple = [1]
...@@ -30,6 +32,8 @@ def select_sate_mysql(site, num=None): ...@@ -30,6 +32,8 @@ def select_sate_mysql(site, num=None):
return False return False
else: else:
return True return True
else:
return False
def get_ip_address(): def get_ip_address():
...@@ -41,7 +45,7 @@ def get_ip_address(): ...@@ -41,7 +45,7 @@ def get_ip_address():
def long_time_task(site, proxy_name): def long_time_task(site, proxy_name):
spider_us = search_temp_merchantwords(site_name=site, read_size=270, proxy_name=proxy_name) spider_us = search_temp_pg(site_name=site, read_size=270, proxy_name=proxy_name)
spider_us.run_pol() spider_us.run_pol()
......
...@@ -33,6 +33,7 @@ def db_engine(site_name, db_type): ...@@ -33,6 +33,7 @@ def db_engine(site_name, db_type):
def select_search_term_state(month, site): def select_search_term_state(month, site):
# Bug1修复:return 1 移到 for 循环外,确保5次重试都能执行
for i in range(5): for i in range(5):
try: try:
sql_read = f"SELECT id FROM {site}_search_term_month_syn where state in (1,2) and date_info='2026-{month}' LIMIT 1" sql_read = f"SELECT id FROM {site}_search_term_month_syn where state in (1,2) and date_info='2026-{month}' LIMIT 1"
...@@ -45,7 +46,8 @@ def select_search_term_state(month, site): ...@@ -45,7 +46,8 @@ def select_search_term_state(month, site):
return id_tuple return id_tuple
except Exception as e: except Exception as e:
print(e, '报错11。', sql_read) print(e, '报错11。', sql_read)
return 1 time.sleep(3)
return 1 # 5次全部失败才返回1
def db_cursor_connect_update(sql, site): def db_cursor_connect_update(sql, site):
...@@ -64,22 +66,41 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -64,22 +66,41 @@ def select_sate_mysql(site, num=None, month=None, week=None):
db_class = connect_db(site) db_class = connect_db(site)
print('month::', month) print('month::', month)
if num == 1: if num == 1:
# Bug2修复:先抢锁,再在锁内查DB状态,避免多台机器同时通过判断
redis_client = BaseUtils().redis_db()
lock_key = "ALL站点-asin同步-pg-api_lock"
lock = redis_client.lock(lock_key, timeout=60)
if not lock.acquire(blocking=False):
print(f'{site} 站点:其他机器已抢到锁,跳过调度通知')
return False
try:
sql_select_ = f"select status_val from workflow_progress where date_info='2026-{week}' and date_type='week' and page='反查搜索词' and site_name='{site}'" sql_select_ = f"select status_val from workflow_progress where date_info='2026-{week}' and date_type='week' and page='反查搜索词' and site_name='{site}'"
print(sql_select_) print(sql_select_)
engine_us_mysql = db_engine('us', 'mysql') engine_us_mysql = db_engine('us', 'mysql')
df = engine_us_mysql.read_sql(sql_select_) df = engine_us_mysql.read_sql(sql_select_)
if int(df.status_val[0]) in (1, 2): # Bug3修复:df为空时不再抛 IndexError
redis_client = BaseUtils().redis_db() if df.empty or int(df.status_val[0]) not in (1, 2):
lock_key = "ALL站点-asin同步-pg-api_lock" print('5555555555555555555555555555555555')
lock = redis_client.lock(lock_key, timeout=15) # 10秒超时 lock.release()
return False
except Exception as e:
print(f'查询 workflow_progress 失败: {e}')
lock.release()
return False
update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='反查搜索词' and date_info='2026-{week}' and site_name='{site}' and date_type='week'" update_workflow_progress = f"update workflow_progress set status_val=3,status='抓取结束' where page='反查搜索词' and date_info='2026-{week}' and site_name='{site}' and date_type='week'"
print('update_workflow_progress: 修改状态3 ', update_workflow_progress) print('update_workflow_progress: 修改状态3 ', update_workflow_progress)
db_cursor_connect_update(update_workflow_progress, site) db_cursor_connect_update(update_workflow_progress, site)
lock.release()
account = 'pengyanbing' account = 'pengyanbing'
title = site + '站点 搜索词' title = site + '站点 搜索词'
content = f'{month} 月 搜索词 已结束,请确认下一步流程!时间:' content = f'{month} 月 搜索词 已结束,请确认下一步流程!时间:'
db_class.send_mg(account, title, content) db_class.send_mg(account, title, content)
ii = 0 ii = 0
id_tuple = None
for i in range(11): for i in range(11):
id_tuple = select_search_term_state(month, site) id_tuple = select_search_term_state(month, site)
time.sleep(200) time.sleep(200)
...@@ -102,14 +123,7 @@ def select_sate_mysql(site, num=None, month=None, week=None): ...@@ -102,14 +123,7 @@ def select_sate_mysql(site, num=None, month=None, week=None):
title = site + '站点 搜索词' title = site + '站点 搜索词'
content = f'{month} 月 搜索词 已结束,成功调度 ALL站点-asin同步-pg-api' content = f'{month} 月 搜索词 已结束,成功调度 ALL站点-asin同步-pg-api'
db_class.send_mg(account, title, content) db_class.send_mg(account, title, content)
if lock.locked():
lock.release()
return True return True
if lock.locked():
lock.release()
else:
print('5555555555555555555555555555555555')
return False
if num == 3: if num == 3:
# 搜索词多进程已经抓完。最后执行单进程抓取。 # 搜索词多进程已经抓完。最后执行单进程抓取。
...@@ -157,8 +171,8 @@ def long_time_task(site, proxy_name, month, data_chunk): ...@@ -157,8 +171,8 @@ def long_time_task(site, proxy_name, month, data_chunk):
if __name__ == '__main__': if __name__ == '__main__':
pppoe_ip() pppoe_ip()
site_list = ['us', 'de', 'uk'] # site_list = ['us', 'de', 'uk']
#site_list = ['us'] site_list = ['de']
num_processes = 5 num_processes = 5
month = int(sys.argv[1]) month = int(sys.argv[1])
week = int(sys.argv[2]) week = int(sys.argv[2])
...@@ -167,15 +181,15 @@ if __name__ == '__main__': ...@@ -167,15 +181,15 @@ if __name__ == '__main__':
month = '0' + str(month) month = '0' + str(month)
if week < 10: if week < 10:
week = '0' + str(week) week = '0' + str(week)
print(month, week) print('抓取月:',month)
for site in site_list: for site in site_list:
while True: while True:
current_time = datetime.now() current_time = datetime.now()
five_minutes_later = current_time + timedelta(minutes=1) one_minute_later = current_time + timedelta(minutes=1) # Bug4修复:变量名与实际值对齐
print('后五分钟时间', five_minutes_later) print('后一分钟时间', one_minute_later)
# 主进程统一读取数据 # 主进程统一读取数据
all_data = read_data_from_pg(site, month, read_size=1500) all_data = read_data_from_pg(site, month, read_size=1800)
if not all_data: if not all_data:
print('没有数据,检查是否全部完成') print('没有数据,检查是否全部完成')
if select_sate_mysql(site, num=3, month=month, week=week) == False: if select_sate_mysql(site, num=3, month=month, week=week) == False:
...@@ -199,7 +213,7 @@ if __name__ == '__main__': ...@@ -199,7 +213,7 @@ if __name__ == '__main__':
print('所有进程运行完毕!') print('所有进程运行完毕!')
current_time = datetime.now() current_time = datetime.now()
if current_time > five_minutes_later: if current_time > one_minute_later:
pppoe_ip() pppoe_ip()
else: else:
time.sleep(random.uniform(150, 220)) time.sleep(random.uniform(150, 220))
......
...@@ -117,10 +117,10 @@ def get_ip_address(): ...@@ -117,10 +117,10 @@ def get_ip_address():
def long_time_task(site, proxy_name, week): def long_time_task(site, proxy_name, week):
print("当前 抓取 站点 ", site) print("当前 抓取 站点 ", site)
if site == 'us': if site == 'us':
spider_us = search_temp_pg(site_name=site, read_size=300, proxy_name=proxy_name, week=week) spider_us = search_temp_pg(site_name=site, read_size=180, proxy_name=proxy_name, week=week)
spider_us.run_pol() spider_us.run_pol()
else: else:
spider_mysql = search_temp_mysql(site_name=site, read_size=300, proxy_name=proxy_name, spider_mysql = search_temp_mysql(site_name=site, read_size=180, proxy_name=proxy_name,
week=week) week=week)
spider_mysql.run_pol() spider_mysql.run_pol()
print('结束') print('结束')
...@@ -137,7 +137,7 @@ def start_run(week): ...@@ -137,7 +137,7 @@ def start_run(week):
# 计算后五分钟的时间 # 计算后五分钟的时间
five_minutes_later = current_time + timedelta(minutes=1) five_minutes_later = current_time + timedelta(minutes=1)
print('后五分钟时间', five_minutes_later) print('后五分钟时间', five_minutes_later)
p = Pool(3) p = Pool(5)
for i in range(3): for i in range(3):
p.apply_async(long_time_task, args=(site, proxy_name, week)) p.apply_async(long_time_task, args=(site, proxy_name, week))
print('等待所有子进程运行完成') print('等待所有子进程运行完成')
......
...@@ -37,7 +37,7 @@ def select_sate_mysql(site, num=None, page=None, week=None, table_name=None): ...@@ -37,7 +37,7 @@ def select_sate_mysql(site, num=None, page=None, week=None, table_name=None):
week = week week = week
if num == 2: if num == 2:
print('查询店铺产品。 feedback 抓取状态') print('查询店铺产品。 feedback 抓取状态')
sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='2025-{week}' and site_name='{site}' and date_type='week'" sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='2026-{week}' and site_name='{site}' and date_type='week'"
print(sql_read) print(sql_read)
engine_pg = db_engine('us', 'mysql') engine_pg = db_engine('us', 'mysql')
df = engine_pg.read_sql(sql_read) df = engine_pg.read_sql(sql_read)
...@@ -54,12 +54,12 @@ def feedback_products(site, proxy_name, week): ...@@ -54,12 +54,12 @@ def feedback_products(site, proxy_name, week):
print('抓取店铺') print('抓取店铺')
state = select_sate_mysql(num=2, page='店铺Feedback', site=site, week=week) state = select_sate_mysql(num=2, page='店铺Feedback', site=site, week=week)
if state: if state:
account_asin_us = async_account_feedback(site, read_size=150, proxy_name=proxy_name, week=week) account_asin_us = async_account_feedback(site, read_size=250, proxy_name=proxy_name, week=week)
account_asin_us.run() account_asin_us.run()
else: else:
state = select_sate_mysql(num=2, page='店铺产品', site=site, week=week) state = select_sate_mysql(num=2, page='店铺产品', site=site, week=week)
if state: if state:
products_asin_us = async_account_name_products(site, read_size=200, proxy_name=proxy_name, week=week) products_asin_us = async_account_name_products(site, read_size=250, proxy_name=proxy_name, week=week)
products_asin_us.run() products_asin_us.run()
...@@ -69,7 +69,7 @@ def long_time_task(site, proxy_name, week): ...@@ -69,7 +69,7 @@ def long_time_task(site, proxy_name, week):
if __name__ == '__main__': if __name__ == '__main__':
pppoe_ip() pppoe_ip()
site_list = ['us', 'de', 'uk', 'fr', 'es', 'it'] site_list = ['us','de','uk', 'fr', 'es', 'it']
week = int(sys.argv[1]) week = int(sys.argv[1])
proxy_name = None proxy_name = None
for site in site_list: for site in site_list:
...@@ -83,7 +83,6 @@ if __name__ == '__main__': ...@@ -83,7 +83,6 @@ if __name__ == '__main__':
process = multiprocessing.Process(target=long_time_task, args=(site, proxy_name, week)) process = multiprocessing.Process(target=long_time_task, args=(site, proxy_name, week))
processes.append(process) processes.append(process)
process.start() process.start()
time.sleep(2)
# 等待所有子进程执行完毕 # 等待所有子进程执行完毕
for process in processes: for process in processes:
process.join() process.join()
......
import sys
import os import os
import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import time, random
import multiprocessing import multiprocessing
from amazon_spider.asin_detail_mysql import async_asin_mysql import time
from amazon_spider.asin_detail_pg import async_asin_pg import random
from amazon_spider.day_asin_spider import async_asin_pg
from utils.db_connect import BaseUtils
from threading_spider.post_to_dolphin import DolphinschedulerHelper
from amazon_spider.VPS_IP import pppoe_ip from amazon_spider.VPS_IP import pppoe_ip
from threading_spider.update_spdier_state import select_sate_mysql
def long_time_task(site, proxy_name, week): def db_cursor_connect_update(sql, site):
print("当前 抓取 站点 ", site, proxy_name) for i in range(3):
if site == 'us': try:
print("开始=============== us asin 抓取 =======================", proxy_name) engine_us_mysql = BaseUtils(site_name='us').mysql_connect()
async_asin_us = async_asin_pg(site, proxy_name=proxy_name, week=week) print('更新sql:', sql)
async_asin_us.run() with engine_us_mysql.begin() as conn:
conn.execute(sql)
break
except:
print(site, 'db_cursor_connect 报错:', sql)
def read_data_from_pg(site, next_date, read_size=1500):
"""主进程:走代理PG read_then_update(含FOR UPDATE)读取数据"""
try:
date_info = next_date.replace('-', '_')
engine_pg = BaseUtils(site_name=site).pg_connect()
table = f'{site}_all_syn_st_day_{date_info}'
sql_read = f"SELECT asin, id, date_info, asin_is_variation, data_type, volume, weight_str FROM {table} WHERE state=1 FOR UPDATE SKIP LOCKED LIMIT {read_size}"
print(sql_read)
df_read = engine_pg.read_then_update(
select_sql=sql_read,
update_table=table,
set_values={"state": 2},
where_keys=["id"],
)
df_read.drop_duplicates(['asin'], inplace=True)
if df_read.shape[0] > 0:
df_read['volume'] = df_read['volume'].fillna('null')
df_read['weight_str'] = df_read['weight_str'].fillna('null')
asin_list = list(
df_read.id.astype("U") + '|' + df_read.asin + '|' + df_read.date_info + '|' + df_read.asin_is_variation.astype(
"U") + '|' + df_read.data_type.astype("U") + '|' + df_read.volume.astype(
"U") + '|' + df_read.weight_str.astype("U"))
print(f'读取 {len(asin_list)} 条ASIN')
return asin_list
else:
print('没有 state=1 的数据了')
return []
except Exception as e:
print(f'读取数据失败: {e}')
return []
def long_time_task(site, next_date, data_chunk):
print(f"当前进程 {os.getpid()} 分配到 {len(data_chunk)} 条数据")
async_asin_us = async_asin_pg(site_name=site, next_date=next_date)
async_asin_us.run_with_data(data_chunk)
def update_mysql_day_asin_state(site_name, next_date, num_state):
redis_client = BaseUtils().redis_db()
engine_mysql = BaseUtils(site_name='us').mysql_connect()
lock_key_state = f"{next_date}_day_asin_lock_state"
lock_state = redis_client.lock(lock_key_state, timeout=55) # 10秒超时
if num_state == 1:
select_day_status_val = f"select status_val from workflow_progress where date_type='day' and page='ASIN详情' and site_name='us' and date_info='{next_date}' and kafka_flow_state=1 and spider_state=1"
print('开始抓取 查询状态1 ::', select_day_status_val)
df_status_val = engine_mysql.read_sql(select_day_status_val)
status_val = df_status_val.loc[0, "status_val"] if not df_status_val.empty else None
if status_val == 1:
if lock_state.acquire(blocking=True):
sql_select_ = f"select status_val from workflow_progress where date_type='day' and page='ASIN详情' and site_name='{site_name}' and date_info='{next_date}' and kafka_flow_state=1 and spider_state=1"
print('sql_select 2222222:',sql_select_)
df_status_dict = engine_mysql.read_sql(sql_select_)
if not df_status_dict.empty:
print('查询kafka是否开启', df_status_dict.status_val[0])
if df_status_dict.status_val[0] in (1, 2):
update_month_spider_state = f"update workflow_progress set spider_state=2,status_val=2 WHERE site_name='{site_name}' and date_type='day' and date_info='{next_date}' and page='ASIN详情'"
print('spider_state=2,status_val=2 修改状态::',update_month_spider_state)
db_cursor_connect_update(update_month_spider_state, site_name)
DolphinschedulerHelper.start_process_instance_common(
project_name="big_data_selection",
process_df_name='ALL-实时day-插件+流量选品',
startParams={
"site_name": f"{site_name}",
"date_type": "day",
"date_info": f'{next_date}'
}
)
n = 0
while True:
try:
sql_read = f"select id from workflow_progress where date_type='day' and page='ASIN详情' and site_name='{site_name}' and date_info='{next_date}' and kafka_flow_state=3 and spider_state=2"
print('等待es启动::',sql_read)
df_report_date = engine_mysql.read_sql(sql_read)
if not df_report_date.empty:
print('抓取 day asin')
break
else:
n += 1
time.sleep(120)
if n > 25:
break
except:
time.sleep(10)
if num_state == 3:
print('抓取完成 抓取完成')
if lock_state.acquire(blocking=True):
sql_read = f"select id from workflow_progress where date_type='day' and page='ASIN详情' and site_name='{site_name}' and date_info='{next_date}' and kafka_flow_state=3 and spider_state=2"
print(sql_read, '抓取完成 修改状态 333333444444')
df_report_id = engine_mysql.read_sql(sql_read)
if not df_report_id.empty:
update_month_spider_state = f"update workflow_progress set spider_state=3,status_val=3 WHERE site_name='{site_name}' and date_type='day' and date_info='{next_date}' and page='ASIN详情'"
print(update_month_spider_state)
db_cursor_connect_update(update_month_spider_state, site_name)
if lock_state.locked():
lock_state.release()
def select_day_asin_id(site, next_date):
date_info = next_date.replace('-', '_')
engine_pg_14 = BaseUtils(site_name=site).pg_connect()
sql_read = f'SELECT id FROM {site}_all_syn_st_day_{date_info} where state in (1,2) LIMIT 1'
print('查询 {} 是否有asin需要抓取 ::'.format(date_info), sql_read)
df = engine_pg_14.read_sql(sql_read)
if not df.empty:
id_tuple = [1]
else:
id_tuple = None
if id_tuple is None:
update_mysql_day_asin_state(site, next_date, 3)
return False
else:
print("sql_read:", sql_read)
update_mysql_day_asin_state(site, next_date, 1)
return True
def check_workflow_status(site_name, date_info):
"""
检查 workflow_progress 表中 status_val 状态
:param site_name: 站点名称,如 'us'
:param date_info: 日期信息,如 '2026-02-26'
:return: True-有值可继续,False-无值跳过
"""
while True:
try:
engine_mysql = BaseUtils(site_name='us').mysql_connect()
sql_read = f"select status_val from workflow_progress where date_type='day' and page='ASIN详情' and site_name='{site_name}' and date_info='{date_info}'"
print(f'check_workflow_status SQL: {sql_read}')
df = engine_mysql.read_sql(sql_read)
if df.empty:
return False # 没记录,跳过
else: else:
print("开始=============== asin 抓取 =======================") return True # 有记录 可以往下抓取数据
async_asin_es = async_asin_mysql(site, proxy_name=proxy_name, week=week) except:
async_asin_es.run() time.sleep(10)
def start_run(week): def start_day_spider():
pppoe_ip() pppoe_ip()
site_list = ['de', 'uk'] site_list = ['us']
proxy_name = None # 动态生成:前3天到今天,共4个日期
today = datetime.now()
date_list = [(today - timedelta(days=i)).strftime('%Y-%m-%d') for i in range(4, -1, -1)]
print(f'本次日ASIN任务日期范围: {date_list}')
#本次日ASIN任务日期范围: ['2026-03-23', '2026-03-24', '2026-03-25', '2026-03-26']
for site in site_list: for site in site_list:
if select_sate_mysql(site=site, week=week): for next_date in date_list:
if not check_workflow_status(site, next_date):
print(f'跳过 {next_date},继续下一个日期')
continue
num_processes = 5
while True: while True:
if select_day_asin_id(site, next_date):
current_time = datetime.now() current_time = datetime.now()
# 计算后五分钟的时间 date_info = next_date.replace('-', '_')
five_minutes_later = current_time + timedelta(minutes=2) five_minutes_later = current_time + timedelta(minutes=2)
print('后五分钟时间', five_minutes_later) print('后五分钟时间', five_minutes_later)
# 主进程统一读取数据
all_data = read_data_from_pg(site, next_date, read_size=1500)
if not all_data:
print('读取数据为空,等待重试')
time.sleep(30)
continue
# 平均分配给子进程
chunks = [all_data[i::num_processes] for i in range(num_processes)]
processes = [] processes = []
for _ in range(4): for idx in range(num_processes):
process = multiprocessing.Process(target=long_time_task, args=(site, proxy_name, week)) if chunks[idx]:
process = multiprocessing.Process(target=long_time_task, args=(site, date_info, chunks[idx]))
processes.append(process) processes.append(process)
process.start() process.start()
# 等待所有子进程执行完毕
for process in processes: for process in processes:
process.join() process.join()
print('所有进程运行完毕!开始切换ip') print('所有进程运行完毕!开始切换ip')
current_time = datetime.now() current_time = datetime.now()
if current_time > five_minutes_later: if current_time > five_minutes_later:
pppoe_ip() pppoe_ip()
else: else:
time.sleep(random.uniform(150, 300)) time.sleep(random.uniform(150, 220))
pppoe_ip() pppoe_ip()
if select_sate_mysql(site=site, num=3, week=week) == False: else:
break break
return None
if __name__ == '__main__': if __name__ == '__main__':
try: print('抓取 日 asin')
week = int(sys.argv[1]) start_day_spider()
except: \ No newline at end of file
week = None
start_run(week)
import sys print('抓取 月 asin')
import os import os
import sys
from datetime import datetime, timedelta from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
...@@ -8,41 +9,115 @@ import multiprocessing ...@@ -8,41 +9,115 @@ import multiprocessing
from amazon_spider.asin_detail_pg import async_asin_pg from amazon_spider.asin_detail_pg import async_asin_pg
from amazon_spider.VPS_IP import pppoe_ip from amazon_spider.VPS_IP import pppoe_ip
from threading_spider.update_spdier_state import select_sate_mysql from threading_spider.update_spdier_state import select_sate_mysql
from amazon_save_db.save_asin_detail_pg import Save_asin_detail # 主进程读数据用
from threading_spider.Poll_site_spider import start_day_spider
def read_data_from_pg(save_detail_reader):
"""主进程:复用 Save_asin_detail.read_db_data(),保留 minid_maxid 分段读取"""
asin_6field = save_detail_reader.read_db_data()
if not asin_6field:
return []
# read_db_data() 执行后 save_detail_reader.df_read 里有 id 列
# 组装成7字段格式:id|asin|date_info|is_variation|data_type|volume|weight_str
df = save_detail_reader.df_read
all_data = list(
df.id.astype("U") + '|' + df.asin + '|' + df.date_info + '|' +
df.asin_is_variation.astype("U") + '|' + df.data_type.astype("U") + '|' +
df.volume.astype("U") + '|' + df.weight_str.astype("U"))
print(f'主进程读取 {len(all_data)} 条ASIN')
return all_data
def long_time_task(site, proxy_name, month, spider_int): def long_time_task(site, month, spider_int, data_chunk):
print("当前 抓取 站点 ", site, proxy_name) """子进程:接收分发数据,不自己读数据库"""
print(f"开始=============== {site} asin 抓取 =======================", proxy_name) print(f"当前进程 {os.getpid()} 分配到 {len(data_chunk)} 条数据")
async_asin_us = async_asin_pg(site, proxy_name=proxy_name, month=month, spider_int=spider_int) print(f"开始=============== {site} asin 抓取 =======================")
async_asin_us.run() async_asin_us = async_asin_pg(site_name=site, month=month, spider_int=spider_int, skip_read_init=True)
async_asin_us.run_with_data(data_chunk)
if __name__ == '__main__': if __name__ == '__main__':
pppoe_ip() pppoe_ip()
site_list = ['us','de','uk'] site_list = ['uk','de']
month = int(sys.argv[1]) month = int(sys.argv[1])
spider_int = int(sys.argv[2]) spider_int = 0
week = int(sys.argv[3]) week = int(sys.argv[2])
if int(month) < 10: if int(month) < 10:
month = '0' + str(month) month = '0' + str(month)
if week < 10: if week < 10:
week = '0' + str(week) week = '0' + str(week)
proxy_name = None num_processes = 5
day_spider_ran_date = None # 记录当天是否已触发日任务,防止重复执行
for site in site_list: for site in site_list:
if select_sate_mysql(site=site, week=week, spider_int=spider_int, month=month): if select_sate_mysql(site=site, week=week, spider_int=spider_int, month=month):
# 主进程创建 reader(保留 minid_maxid 分段机制)
reader = Save_asin_detail(site_name=site, month=month, spider_int=spider_int)
reader.read_size = 1500 # 原来180,主进程一次读1500条分给子进程
empty_count = 0
while True: while True:
# ====== 凌晨2点切换日ASIN任务 ======
_today = datetime.now().strftime('%Y-%m-%d')
if 2 <= datetime.now().hour <= 5 and day_spider_ran_date != _today:
day_spider_ran_date = _today
print(f'==== {datetime.now()} 凌晨2点,执行日ASIN抓取 ====')
try:
start_day_spider()
except Exception as e:
print(f'日ASIN任务异常: {e}')
print(f'==== {datetime.now()} 日ASIN任务完成,继续月度任务 ====')
# ====================================
current_time = datetime.now() current_time = datetime.now()
# 计算后五分钟的时间
five_minutes_later = current_time + timedelta(minutes=2) five_minutes_later = current_time + timedelta(minutes=2)
print('后五分钟时间', five_minutes_later) print('后五分钟时间', five_minutes_later)
# ====== 主进程统一读取数据(带 minid_maxid 分段 + FOR UPDATE SKIP LOCKED)======
all_data = read_data_from_pg(reader)
if not all_data:
empty_count += 1
# 分段区间内读空 → 清掉分段限制,下次走 read_db_data2() 全表扫
if reader.minid_maxid_list:
print(f'分段区间无数据,清除分段限制,尝试无分段读取')
reader.minid_maxid_list = []
continue # 立即重试,不sleep
# 无分段也读空 → 可能数据被其他机器抢完了
if empty_count >= 3:
print(f'连续{empty_count}次读空,检查是否全部完成')
if select_sate_mysql(site=site, num=3, month=month, week=week,
spider_int=spider_int) == False:
break # 全部完成,退出
# 还没完 → 其他机器还在处理,sleep退让减少空转
time.sleep(60)
empty_count = 0
continue
time.sleep(10)
continue
# ====== 读到数据,重置空计数 ======
empty_count = 0
# 动态调整进程数:数据少时减少进程避免浪费
if len(all_data) < 300:
actual_processes = max(1, len(all_data) // 60)
else:
actual_processes = num_processes
# 平均分配给子进程
chunks = [all_data[i::actual_processes] for i in range(actual_processes)]
processes = [] processes = []
for _ in range(4): for idx in range(actual_processes):
process = multiprocessing.Process(target=long_time_task, args=(site, proxy_name, month, spider_int)) if chunks[idx]:
process = multiprocessing.Process(
target=long_time_task,
args=(site, month, spider_int, chunks[idx]))
processes.append(process) processes.append(process)
process.start() process.start()
# 等待所有子进程执行完毕
for process in processes: for process in processes:
process.join() process.join()
print('所有进程运行完毕!开始切换ip') print('所有进程运行完毕!开始切换ip')
current_time = datetime.now() current_time = datetime.now()
if current_time > five_minutes_later: if current_time > five_minutes_later:
...@@ -50,5 +125,7 @@ if __name__ == '__main__': ...@@ -50,5 +125,7 @@ if __name__ == '__main__':
else: else:
time.sleep(random.uniform(150, 220)) time.sleep(random.uniform(150, 220))
pppoe_ip() pppoe_ip()
if select_sate_mysql(site=site, num=3, month=month, week=week, spider_int=spider_int) == False:
if select_sate_mysql(site=site, num=3, month=month, week=week,
spider_int=spider_int) == False:
break break
import os
import sys
from datetime import datetime, timedelta
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
import multiprocessing
from amazon_spider.sp_serch_term_spider import search_temp_pg
import time
import random
from amazon_spider.VPS_IP import pppoe_ip
from utils.db_connect import BaseUtils
import redis
from amazon_params.params import REDIS_CONN, DORIS_ADV_DIRECT_CONN
import pymysql
def read_data_with_lock(site, read_size=500):
"""主进程:Redis分布式锁 + 直连Doris读取数据 + 更新state=2"""
try:
r = redis.Redis(
host=REDIS_CONN['redis_host'],
port=REDIS_CONN['redis_port'],
password=REDIS_CONN['redis_pwd'],
db=REDIS_CONN['redis_db']
)
except Exception as e:
print(f'Redis连接失败: {e}')
return []
lock_key = f'lock:sp_search_term_read:{site}'
lock_value = f'{os.getpid()}_{time.time()}'
lock_acquired = False
# 加锁,120秒过期
for _ in range(60):
try:
if r.set(lock_key, lock_value, nx=True, ex=120):
lock_acquired = True
break
except Exception as e:
print(f'Redis加锁异常: {e}')
return []
print('等待其他VPS释放锁...')
time.sleep(2)
else:
print('等待锁超时,跳过本轮')
return []
conn = None
try:
conn = pymysql.connect(**DORIS_ADV_DIRECT_CONN)
cursor = conn.cursor()
table = f'{site}_sp_search_term_syn'
# SELECT
sql_select = f'SELECT id, search_term, url, time_batch FROM {table} WHERE state=1 ORDER BY id ASC LIMIT {read_size}'
print(sql_select)
cursor.execute(sql_select)
rows = cursor.fetchall()
if not rows:
print('没有 state=1 的数据了')
return []
# UPDATE state=2
ids = [str(row[0]) for row in rows]
print(len(ids), '#' * 100)
id_str = ','.join(ids)
sql_update = f'UPDATE {table} SET state=2 WHERE id IN ({id_str})'
cursor.execute(sql_update)
conn.commit()
print(f'直连Doris读取 {len(rows)} 条,已更新state=2')
# 组装成 search_term_list 格式
search_term_list = [f'{row[0]}|-|{row[1]}|-|{row[2]}|-|{row[3]}' for row in rows]
return search_term_list
except Exception as e:
print(f'读取数据失败: {e}')
return []
finally:
# 关闭数据库连接
if conn:
try:
conn.close()
except:
pass
# 释放锁(只释放自己的锁)
if lock_acquired:
try:
if r.get(lock_key) == lock_value.encode():
r.delete(lock_key)
except Exception as e:
print(f'释放Redis锁失败(不影响主流程): {e}')
def long_time_task(site, data_chunk):
"""子进程:用分配到的数据抓取"""
print(f"当前进程 {os.getpid()} 分配到 {len(data_chunk)} 条数据")
spider_us = search_temp_pg(site_name=site, read_size=100)
spider_us.run_pol_with_data(data_chunk)
if __name__ == '__main__':
pppoe_ip()
site_list = ['us']
num_processes = 5
start_time = datetime.now()
for site in site_list:
while True:
try:
current_time = datetime.now()
five_minutes_later = current_time + timedelta(minutes=1)
print('后五分钟时间', five_minutes_later)
# 主进程统一读取数据
all_data = read_data_with_lock(site, read_size=1500)
if not all_data:
print('没有数据,等待60秒后重新检查...')
time.sleep(60)
continue
# 平均分配给子进程
chunks = [all_data[i::num_processes] for i in range(num_processes)]
processes = []
for idx in range(num_processes):
if chunks[idx]:
process = multiprocessing.Process(target=long_time_task, args=(site, chunks[idx]))
processes.append(process)
process.start()
for process in processes:
process.join()
print('所有进程运行完毕!')
current_time = datetime.now()
if current_time > five_minutes_later:
pppoe_ip()
else:
time.sleep(random.uniform(150, 220))
pppoe_ip()
except Exception as e:
print(f'[主循环异常] 等待30秒后重试: {e}')
import traceback
traceback.print_exc()
time.sleep(30)
continue
...@@ -56,6 +56,7 @@ class DolphinschedulerHelper(object): ...@@ -56,6 +56,7 @@ class DolphinschedulerHelper(object):
return cls._project_df_map[project_code] return cls._project_df_map[project_code]
@classmethod @classmethod
def start_process_instance_common(cls, project_name: str, def start_process_instance_common(cls, project_name: str,
process_df_name: str, process_df_name: str,
...@@ -123,63 +124,83 @@ class DolphinschedulerHelper(object): ...@@ -123,63 +124,83 @@ class DolphinschedulerHelper(object):
data=req_params data=req_params
) )
resp_json = json.loads(resp.content.decode("utf-8")) resp_json = json.loads(resp.content.decode("utf-8"))
if bool(resp_json['success']): resp_state = bool(resp_json['success'])
title = f"【海豚调度】调度api触发提示"
if resp_state:
DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state)
return True return True
else: else:
DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state)
raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!") raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!")
@classmethod @classmethod
def start_process_instance(cls,site_name: str, date_info: str, table_list: str, flag: str, warning_Type: str = "NONE"): def send_wx_msg(cls, users: list, title: str, content: str, msgtype: str = "textcard"):
""" """
启动一个海豚流程 通过选品wx消息推送接口,推送消息到oa
:param project_name: 项目名 :param users: 填写需要推送的微信用户名list
:param process_df_name: 流程名 :param title: 推送的标题(如果msgtype采用markdown形式,则不附带标题)
:param startParams: 启动全局参数 :param content: 推送的主体内容
:param warning_Type: 警告类型 NONE ALL :param msgtype: 推送的消息类型(textcard:默认卡片类型;markdown:markdaown结构)
:return:
""" """
project_name = "big_data_selection" if users is not None:
process_df_name = "pg爬虫库同步pg生产库" accounts = ",".join(users)
startParams = { # 排除users_list=[''] 无需发送
"site_name": site_name, if bool(accounts):
"date_info": date_info, host = "http://120.79.147.190:8080"
"table_list": table_list, url = f'{host}/soundasia_selection/dolphinScheduler/sendMessage'
"flag": flag data = {
'account': accounts,
'title': title,
'content': content,
'msgtype': msgtype
} }
try:
requests.post(url=url, data=data, timeout=15)
except:
pass
return True
@classmethod
def send_startup_state_to_oa(cls, project_name: str, process_df_name: str, resp_state: bool):
"""
根据api触发海豚oa消息推送(推送人由维护在海豚调度任务中的wx_user决定)
:param project_name:项目名称
:param process_df_name:流程名称
:param resp_state:任务调度启动状态
:return
"""
wx_user_list = DolphinschedulerHelper.get_process_df_manger(project_name, process_df_name)
title = f"【海豚调度】调度api触发提示"
if resp_state:
msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发成功!"
else:
msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!"
if bool(wx_user_list):
DolphinschedulerHelper.send_wx_msg(wx_user_list, title, msg)
@classmethod
def get_process_df_manger(cls, project_name: str, process_df_name: str):
"""
获取海豚流程定义的全局变量对应的wx_user
:param project_name:
:param process_df_name:
:return:
"""
project_map = cls.get_project_map() project_map = cls.get_project_map()
project_code = project_map.get(project_name) project_code = project_map.get(project_name)
process_df_map: Dict = cls.get_project_df_map(project_code) process_df_map: Dict = cls.get_project_df_map(project_code)
process_df_code = process_df_map.get(process_df_name) process_df_code = process_df_map.get(process_df_name)
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/{process_df_code}/view-variables"
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-process-instance" resp = requests.get(
startParams['_sender'] = "api"
req_params = {
"processDefinitionCode": process_df_code,
"scheduleTime": "",
# 失败策略
"failureStrategy": "CONTINUE",
"warningType": warning_Type,
# 告警组 2 是http推送报警
"warningGroupId": "2",
"execType": "",
"startNodeList": "",
"taskDependType": "TASK_POST",
"runMode": "RUN_MODE_SERIAL",
"processInstancePriority": "MEDIUM",
"startParams": json.dumps(startParams),
"expectedParallelismNumber": "",
"dryRun": "0",
}
print(req_params)
resp = requests.post(
url, url,
headers=cls.get_http_header(), headers=cls.get_http_header(),
data=req_params
) )
resp_json = json.loads(resp.content.decode("utf-8")) resp_json = json.loads(resp.content.decode("utf-8"))
if bool(resp_json['success']): if bool(resp_json['success']):
return True globalParams: list = resp_json['data']['globalParams']
else: paramMap = {param['prop']: param['value'] for param in globalParams}
raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!") wx_user = paramMap.get("wx_user") or ""
\ No newline at end of file return wx_user.split(",")
return None
...@@ -31,7 +31,7 @@ def db_cursor_connect_update(sql, site): ...@@ -31,7 +31,7 @@ def db_cursor_connect_update(sql, site):
def db_cursor_connect_msyql_read(select_state1_sql): def db_cursor_connect_msyql_read(select_state1_sql):
for i in range(3): for i in range(10):
try: try:
engine_us_mysql = db_engine_us('us','mysql') engine_us_mysql = db_engine_us('us','mysql')
print(select_state1_sql) print(select_state1_sql)
...@@ -58,30 +58,9 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -58,30 +58,9 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
print('当前抓取月::', month) print('当前抓取月::', month)
if int(week) < 10: if int(week) < 10:
week = f'0{int(week)}' week = f'0{int(week)}'
year_week = f'2025-{week}' year_week = f'2026-{week}'
year_month = f'2025-{month}' year_month = f'2026-{month}'
if num is None: if num is None:
if site in ['fr', 'es', 'it']:
print('查询 周工作进度 工作进度 状态')
select_state1_sql = f"select status_val from workflow_progress where site_name='{site}' and date_info='{year_week}' and date_type='week' and page='反查搜索词'"
print(select_state1_sql)
df = db_cursor_connect_msyql_read(select_state1_sql)
if not df.empty:
if df.status_val[0] == 3:
print(f"{site} 站点 {week} 周 搜索词 已完成 执行下一步")
select_sate_sql = f"select status_val from workflow_progress where site_name='{site}' and date_info='{year_week}' and date_type='week' and page='ASIN详情'"
print(select_sate_sql)
df1 = db_cursor_connect_msyql_read(select_state1_sql)
if df1.status_val[0] != 3:
update_workflow_progress = f"update workflow_progress set status_val=2,status='抓取中' where page='ASIN详情' and date_info='2025-{week}' and site_name='{site}' and date_type='week' and status_val in(1,0)"
db_cursor_connect_update(update_workflow_progress, site)
print(f"{site} 站点 未全部完成 数据 抓取")
return True
else:
return False
else:
return False
else:
print('查询 工作进度 工作进度 状态') print('查询 工作进度 工作进度 状态')
# 定义锁的键 # 定义锁的键
redis_client = BaseUtils().redis_db() redis_client = BaseUtils().redis_db()
...@@ -109,8 +88,8 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -109,8 +88,8 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
startParams={ startParams={
"site_name": f"{site}", "site_name": f"{site}",
"date_type": "month", "date_type": "month",
"date_info": f'2025-{month}' "date_info": f'2026-{month}'
} },warning_Type="ALL"
) )
print('发送消息通知') print('发送消息通知')
account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,zhouyuchen' account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,zhouyuchen'
...@@ -145,9 +124,11 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -145,9 +124,11 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
except: except:
time.sleep(10) time.sleep(10)
if num == 1: if num == 1:
if site in ['fr', 'es', 'it']: redis_client = BaseUtils().redis_db()
print(f' {site} _all_syn_st 完成') lock_key_state = f"{year_week}_{site}_lock_state"
else: lock_state = redis_client.lock(lock_key_state, timeout=55) # 10秒超时
try:
if lock_state.acquire(blocking=True):
sql_select_ = f"select status_val from workflow_progress where date_info='{year_week}' and date_type='week' and page='ASIN详情' and site_name='{site}'" sql_select_ = f"select status_val from workflow_progress where date_info='{year_week}' and date_type='week' and page='ASIN详情' and site_name='{site}'"
print(sql_select_) print(sql_select_)
df_status_dict = db_cursor_connect_msyql_read(sql_select_) df_status_dict = db_cursor_connect_msyql_read(sql_select_)
...@@ -158,12 +139,11 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -158,12 +139,11 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
db_cursor_connect_update(update_workflow_progress, site) db_cursor_connect_update(update_workflow_progress, site)
db_class.send_mg('pengyanbing', '修改进度表', update_workflow_progress) db_class.send_mg('pengyanbing', '修改进度表', update_workflow_progress)
ii = 0 ii = 0
for i in range(10): for i in range(12):
time.sleep(180) time.sleep(180)
ii += 1 ii += 1
if ii > 10: if ii > 10:
break break
# SELECT * from workflow_progress WHERE site_name='us' and page='asin详情' and date_type='month' and status_val=1 and status='月ASIN导出完成 and date_info='
update_month_asin_state = f"update workflow_progress set status_val=3,status='月ASIN抓取完成' WHERE site_name='{site}' and page='asin详情' and date_type='month' and status_val=1 and status='月ASIN导出完成' and date_info='{year_month}'" update_month_asin_state = f"update workflow_progress set status_val=3,status='月ASIN抓取完成' WHERE site_name='{site}' and page='asin详情' and date_type='month' and status_val=1 and status='月ASIN导出完成' and date_info='{year_month}'"
print(update_month_asin_state) print(update_month_asin_state)
db_cursor_connect_update(update_month_asin_state, site) db_cursor_connect_update(update_month_asin_state, site)
...@@ -176,15 +156,21 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -176,15 +156,21 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
startParams={ startParams={
"site_name": f"{site}", "site_name": f"{site}",
"date_type": "month", "date_type": "month",
"date_info": f'2025-{month}' "date_info": f'2026-{month}'
} }
) )
time_strftime = time.strftime("%Y-%m-%d %X", time.localtime()) time_strftime = time.strftime("%Y-%m-%d %X", time.localtime())
# print('发送消息通知')
account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,chenjianyun,zhouyuchen' account = 'pengyanbing,chenyuanjie,hezhe,wangrui4,fangxingjun,chenjianyun,zhouyuchen'
title = site + '站点 asin 详情' title = site + '站点 asin 详情'
content = str(month) + ' 月 asin 详情 已结束,请确认下一步流程!时间:' + time_strftime content = str(month) + ' 月 asin 详情 已结束,请确认下一步流程!时间:' + time_strftime
db_class.send_mg(account, title, content) db_class.send_mg(account, title, content)
except LockError:
print("获取锁失败")
except Exception as e:
print(f"错误: {e}")
finally:
if lock_state.locked():
lock_state.release()
if num == 2: if num == 2:
sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='{year_month}' and site_name='{site}' and date_type='month'" sql_read = f"SELECT status_val FROM workflow_progress where page='{page}' and date_info='{year_month}' and site_name='{site}' and date_type='month'"
print(sql_read) print(sql_read)
...@@ -198,7 +184,7 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -198,7 +184,7 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
if num == 3: if num == 3:
for i in range(5): for i in range(5):
try: try:
sql_read = f'SELECT id FROM {site}_all_syn_st_month_2025_{month} where state in (1,2) LIMIT 1' sql_read = f'SELECT id FROM {site}_all_syn_st_month_2026_{month} where state in (1,2) LIMIT 1'
print("sql_read:", sql_read) print("sql_read:", sql_read)
df = db_cursor_connect_pg_read(site,sql_read) df = db_cursor_connect_pg_read(site,sql_read)
if not df.empty: if not df.empty:
...@@ -206,22 +192,25 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi ...@@ -206,22 +192,25 @@ def select_sate_mysql(site=None, num=None, page=None, month=None, week=None, spi
else: else:
id_tuple = None id_tuple = None
if id_tuple is None: if id_tuple is None:
if site in ['us', 'de', 'uk']:
select_sate_mysql(site, num=1, month=month, week=week, spider_int=spider_int) select_sate_mysql(site, num=1, month=month, week=week, spider_int=spider_int)
else:
select_sate_mysql(site, num=1, week=week)
return False return False
else: else:
return True return True
except Exception as e: except Exception as e:
print('查询状态 1 2 报错',e) print('查询状态 1 2 报错',e)
# if __name__ == '__main__': # if __name__ == '__main__':
# from post_to_dolphin import DolphinschedulerHelper
#
# process_df_name = '内部asin-评分评论数计算'
# # DolphinschedulerHelper.start_process_instance(
# DolphinschedulerHelper.start_process_instance_common( # DolphinschedulerHelper.start_process_instance_common(
# project_name="big_data_selection", # project_name="big_data_selection",
# process_df_name='ALL站点-启动30day/月流程', # process_df_name=process_df_name,
# startParams={ # startParams={
# "site_name": "uk", # "site_name": 'us',
# "date_type": "month", # "date_type": 'day',
# "date_info": '2025-10' # "date_info": '2026-04-01'
# } # },
# warning_Type="ALL"
# ) # )
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment