Commit 9975fa69 by fangxingjun

no message

parent 258f7942
......@@ -423,7 +423,7 @@ class Templates(object):
# if spider_state == 3 and status_val == 3:
# 新流程
spider_state = int(df.iloc[0]['spider_state'])
spider_is_ready = int(df.iloc[0]['spider_is_ready'])
spider_is_ready = df.iloc[0]['spider_is_ready']
if spider_state == 3 and spider_is_ready == 'no':
# 爬虫已完成,停止消费
print(f"爬虫'{self.spider_type}'已爬取完毕({self.site_name} {self.date_type} {self.date_info}), 退出kafka消费")
......@@ -675,11 +675,13 @@ class Templates(object):
script_name = sys.argv[0].split("/")[-1].split(".")[0]
if self.consumer_type == 'latest' and self.test_flag == 'normal' and script_name in ['kafka_flow_asin_detail', 'kafka_asin_detail', 'kafka_rank_asin_detail']:
if script_name == 'kafka_flow_asin_detail':
kafka_field = 'kafka_flow_state'
# kafka_field = 'kafka_flow_state'
kafka_field = 'kafka_state'
wx_users = ['chenyuanjie', 'pengyanbing']
wx_msg = f"站点: {self.site_name} 日期类型: {self.date_type} {self.date_info} asin详情实时消费数据到es准备工作已完成,可以开启详情爬取!"
elif script_name == 'kafka_rank_asin_detail':
kafka_field = 'kafka_flow_state'
# kafka_field = 'kafka_flow_state'
kafka_field = 'kafka_state'
wx_users = ['chenyuanjie', 'pengyanbing']
wx_msg = f"站点: {self.site_name} 日期类型:{self.date_type}, {self.date_info} 榜单asin详情实时消费数据到es准备工作已完成,可以开启详情爬取!"
elif script_name == 'kafka_asin_detail':
......@@ -688,14 +690,16 @@ class Templates(object):
wx_msg = f"站点: {self.site_name}, {self.date_type}, {self.date_info} asin详情实时消费数据到redis准备工作已完成,可以开启详情爬取!"
else:
pass
sql = f"UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'"
sql = f"-- UPDATE selection.workflow_progress SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='asin详情'"
sql = f"UPDATE selection.workflow_manager SET {kafka_field}=3, updated_at=CURRENT_TIMESTAMP where workflow_name='月全流程' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and bg_name='{self.site_name}_all_cal'"
for retry in range(5):
try:
DBUtil.exec_sql('mysql', 'us', sql)
CommonUtil.send_wx_msg(wx_users, f"asin详情kafka消费", wx_msg)
break
except Exception as e:
print(f"UPDATE workflow_progress 失败(第{retry + 1}次),等待10s重试", e, traceback.format_exc())
# print(f"UPDATE workflow_progress 失败(第{retry + 1}次),等待10s重试", e, traceback.format_exc())
print(f"UPDATE workflow_manager 失败(第{retry + 1}次),等待10s重试", e, traceback.format_exc())
if retry == 4:
CommonUtil.send_wx_msg(wx_users, f"\u26A0asin详情kafka实时消费\u26A0",
f"站点: {self.site_name} asin详情实时消费准备失败,请等待处理!")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment