Commit 258f7942 by fangxingjun

no message

parent d8909e46
...@@ -405,9 +405,10 @@ class Templates(object): ...@@ -405,9 +405,10 @@ class Templates(object):
retry_count = 0 retry_count = 0
while retry_count < max_retries: while retry_count < max_retries:
try: try:
sql = f""" # sql = f"""
SELECT * FROM selection.workflow_progress WHERE site_name='{self.site_name}' AND date_type='{self.date_type}' AND date_info='{self.date_info}' AND page='{self.spider_type}' ORDER BY updated_at DESC LIMIT 1; # SELECT * FROM selection.workflow_progress WHERE site_name='{self.site_name}' AND date_type='{self.date_type}' AND date_info='{self.date_info}' AND page='{self.spider_type}' ORDER BY updated_at DESC LIMIT 1;
""" # """
sql = f"""select * from selection.workflow_manager WHERE workflow_name='月全流程' and site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and bg_name='{self.site_name}_all_cal';"""
print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}") print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}")
df = pd.read_sql(sql, con=self.engine_mysql) df = pd.read_sql(sql, con=self.engine_mysql)
if df.shape[0] == 0: if df.shape[0] == 0:
...@@ -416,9 +417,14 @@ class Templates(object): ...@@ -416,9 +417,14 @@ class Templates(object):
CommonUtil.send_wx_msg(wx_users, f"实时消费kafka任务异常: {self.site_name} {self.date_type} {self.date_info}", wx_msg) CommonUtil.send_wx_msg(wx_users, f"实时消费kafka任务异常: {self.site_name} {self.date_type} {self.date_info}", wx_msg)
sys.exit(1) sys.exit(1)
# 旧流程
# spider_state = int(df.iloc[0]['spider_state'])
# status_val = int(df.iloc[0]['status_val'])
# if spider_state == 3 and status_val == 3:
# 新流程
spider_state = int(df.iloc[0]['spider_state']) spider_state = int(df.iloc[0]['spider_state'])
status_val = int(df.iloc[0]['status_val']) spider_is_ready = int(df.iloc[0]['spider_is_ready'])
if spider_state == 3 and status_val == 3: if spider_state == 3 and spider_is_ready == 'no':
# 爬虫已完成,停止消费 # 爬虫已完成,停止消费
print(f"爬虫'{self.spider_type}'已爬取完毕({self.site_name} {self.date_type} {self.date_info}), 退出kafka消费") print(f"爬虫'{self.spider_type}'已爬取完毕({self.site_name} {self.date_type} {self.date_info}), 退出kafka消费")
if self.consumer_type == "latest": if self.consumer_type == "latest":
...@@ -427,7 +433,8 @@ class Templates(object): ...@@ -427,7 +433,8 @@ class Templates(object):
self.kafka_stream_stop() self.kafka_stream_stop()
else: else:
# 爬虫还在进行中 # 爬虫还在进行中
print(f"爬虫'{self.spider_type}'还在爬取中(spider_state={spider_state}, status_val={status_val}), 继续消费") # print(f"爬虫'{self.spider_type}'还在爬取中(spider_state={spider_state}, status_val={status_val}), 继续消费")
print(f"爬虫'{self.spider_type}'还在爬取中(spider_state={spider_state}, spider_is_ready={spider_is_ready}), 继续消费")
break break
except Exception as e: except Exception as e:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment