Commit 1a7ede51 by fangxingjun

no message

parent 9975fa69
import os
import sys
import time
import traceback
sys.path.append(os.path.dirname(sys.path[0]))
from utils.secure_db_client import get_remote_engine
def update_workflow_manager(site_name, date_type, date_info):
print(f"当前执行的参数: {site_name, date_type, date_info}")
if date_type == "month":
while True:
try:
site_name_pri_dict = {
"us": 2,
"uk": 4,
"de": 6,
}
engine_mysql = get_remote_engine(
site_name='us',
db_type='mysql'
)
priority = site_name_pri_dict[site_name]
update_sql_workflow_bg = f"""update selection.workflow_manager
set bg_dol_state=3, kafka_state=1, finished_count=COALESCE(finished_count, 0) + 1
WHERE workflow_name='月全流程' and site_name='{site_name}' and date_type='{date_type}' and date_info='{date_info}' and priority={priority}"""
engine_mysql.execute(update_sql_workflow_bg)
print(f"成功:workflow_manager进度表---更新月全流程的进度表, 以流量选品为准: {update_sql_workflow_bg}")
break
except Exception as e:
print(f"失败:workflow_manager进度表----更新月全流程的进度表, 以流量选品为准: {update_sql_workflow_bg}, {e}, 报错信息: {traceback.format_exc()}")
time.sleep(300)
continue
if __name__ == '__main__':
# site_name = 'us'
# date_type = 'month'
# date_info = '2026-01'
site_name = sys.argv[1] # 参数1:站点
date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day
date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
if site_name in ["us", "uk", "de"] and date_type == "month":
update_workflow_manager(site_name, date_type, date_info)
...@@ -408,8 +408,12 @@ class Templates(object): ...@@ -408,8 +408,12 @@ class Templates(object):
# sql = f""" # sql = f"""
# SELECT * FROM selection.workflow_progress WHERE site_name='{self.site_name}' AND date_type='{self.date_type}' AND date_info='{self.date_info}' AND page='{self.spider_type}' ORDER BY updated_at DESC LIMIT 1; # SELECT * FROM selection.workflow_progress WHERE site_name='{self.site_name}' AND date_type='{self.date_type}' AND date_info='{self.date_info}' AND page='{self.spider_type}' ORDER BY updated_at DESC LIMIT 1;
# """ # """
sql = f"""select * from selection.workflow_manager WHERE workflow_name='月全流程' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and bg_name='{self.site_name}_all_cal';""" sql = f"""select * from selection.workflow_manager WHERE workflow_name='月全流程' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and bg_name='{self.site_name}_all_cal';"""
print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}") if self.date_type == "day":
sql = f"""select * from selection.workflow_manager WHERE workflow_name='asin详情-天' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"""
print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}")
df = pd.read_sql(sql, con=self.engine_mysql) df = pd.read_sql(sql, con=self.engine_mysql)
if df.shape[0] == 0: if df.shape[0] == 0:
wx_users = ['fangxingjun', 'chenyuanjie'] wx_users = ['fangxingjun', 'chenyuanjie']
...@@ -690,8 +694,11 @@ class Templates(object): ...@@ -690,8 +694,11 @@ class Templates(object):
wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!" wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!"
else: else:
pass pass
sql = f"-- UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'" # sql = f"UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'"
sql = f"UPDATE selection.workflow_manager SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where workflow_name='月全流程' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and bg_name='{self.site_name}_all_cal'" sql = f"UPDATE selection.workflow_manager SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where workflow_name='月全流程' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and bg_name='{self.site_name}_all_cal'"
if self.date_type == 'day':
sql = f"UPDATE selection.workflow_manager SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where workflow_name='asin详情-天' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'"
for retry in range(5): for retry in range(5):
try: try:
DBUtil.exec_sql('mysql', 'us', sql) DBUtil.exec_sql('mysql', 'us', sql)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment