Commit a2b6cfa6 by chenyuanjie

流量选品-实时消费-停止逻辑更新

parent 3dd677b9
...@@ -354,41 +354,81 @@ class Templates(object): ...@@ -354,41 +354,81 @@ class Templates(object):
except Exception as e: except Exception as e:
print(e, traceback.format_exc()) print(e, traceback.format_exc())
# def kafka_consumption_is_finished(self):
# while True:
# try:
# # if self.site_name == 'us':
# # # sql = f"SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;"
# # sql = f"""
# # SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}'
# # and date_info in
# # -- (SELECT MAX(year_week) as date_info from date_20_to_30 WHERE `year_month` = '2024-02' and week_day =1
# # (SELECT year_week as date_info from date_20_to_30 WHERE `year_month` = '{self.date_info}' and week_day =1
# # )
# # ORDER BY created_at desc LIMIT 1;
# #
# # """
# # else:
# # sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_info='{self.date_info}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;"
# sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='{self.spider_type}' and spider_state=3;"
# print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}")
# df = pd.read_sql(sql, con=self.engine_mysql)
# if df.shape[0]:
# status_val = list(df.status_val)[0]
# if int(status_val) == 3:
# print(f"spider_type:{self.spider_type}已经爬取完毕, 退出kafka消费和停止程序")
# if self.consumer_type == "latest":
# if HdfsUtils.delete_hdfs_file_with_checkpoint(self.check_path):
# print("实时消费正常完成,删除对应的检查点文件")
# self.kafka_stream_stop()
# else:
# print(f"spider_type:{self.spider_type}还在爬取中, 继续下一个批次数据消费")
# break
# except Exception as e:
# print(f"判断判断爬虫'{self.spider_type}'是否结束---出现异常, 等待20s", e, traceback.format_exc())
# time.sleep(20)
# self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
# continue
def kafka_consumption_is_finished(self): def kafka_consumption_is_finished(self):
while True: """判断爬虫是否已结束,若已结束则停止kafka消费并退出程序"""
max_retries = 10 # 最大重试次数,避免数据库异常时无限循环
retry_count = 0
while retry_count < max_retries:
try: try:
# if self.site_name == 'us': sql = f"""
# # sql = f"SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;" SELECT * FROM selection.workflow_progress WHERE site_name='{self.site_name}' AND date_type='{self.date_type}' AND date_info='{self.date_info}' AND page='{self.spider_type}' ORDER BY updated_at DESC LIMIT 1;
# sql = f""" """
# SELECT * from workflow_progress WHERE site_name='{self.site_name}' and page='{self.spider_type}' print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}")
# and date_info in
# -- (SELECT MAX(year_week) as date_info from date_20_to_30 WHERE `year_month` = '2024-02' and week_day =1
# (SELECT year_week as date_info from date_20_to_30 WHERE `year_month` = '{self.date_info}' and week_day =1
# )
# ORDER BY created_at desc LIMIT 1;
#
# """
# else:
# sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_info='{self.date_info}' and page='{self.spider_type}' ORDER BY created_at desc LIMIT 1;"
sql = f"SELECT * from selection.workflow_progress WHERE site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and page='{self.spider_type}' and spider_state=3;"
print(f"判断爬虫'{self.spider_type}'是否结束, sql: {sql}")
df = pd.read_sql(sql, con=self.engine_mysql) df = pd.read_sql(sql, con=self.engine_mysql)
if df.shape[0]: if df.shape[0] == 0:
status_val = list(df.status_val)[0] wx_users = ['fangxingjun', 'chenyuanjie']
if int(status_val) == 3: wx_msg = f"未找到爬虫'{self.spider_type}'的进度记录, 检查asin导入流程"
print(f"spider_type:{self.spider_type}已经爬取完毕, 退出kafka消费和停止程序") CommonUtil.send_wx_msg(wx_users, f"实时消费kafka任务异常: {self.site_name} {self.date_type} {self.date_info}", wx_msg)
if self.consumer_type == "latest": sys.exit(1)
if HdfsUtils.delete_hdfs_file_with_checkpoint(self.check_path):
print("实时消费正常完成,删除对应的检查点文件") spider_state = int(df.iloc[0]['spider_state'])
self.kafka_stream_stop() status_val = int(df.iloc[0]['status_val'])
else: if spider_state == 3 and status_val == 3:
print(f"spider_type:{self.spider_type}还在爬取中, 继续下一个批次数据消费") # 爬虫已完成,停止消费
print(f"爬虫'{self.spider_type}'已爬取完毕({self.site_name} {self.date_type} {self.date_info}), 退出kafka消费")
if self.consumer_type == "latest":
# if HdfsUtils.delete_hdfs_file_with_checkpoint(self.check_path):
# print("实时消费正常完成, 删除对应的检查点文件")
self.kafka_stream_stop()
else:
# 爬虫还在进行中
print(f"爬虫'{self.spider_type}'还在爬取中(spider_state={spider_state}, status_val={status_val}), 继续消费")
break break
except Exception as e: except Exception as e:
print(f"判断判断爬虫'{self.spider_type}'是否结束---出现异常, 等待20s", e, traceback.format_exc()) retry_count += 1
print(f"判断爬虫'{self.spider_type}'是否结束---出现异常(第{retry_count}次重试), 等待20s", e, traceback.format_exc())
time.sleep(20) time.sleep(20)
self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us") self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")
continue
if retry_count >= max_retries:
print(f"判断爬虫'{self.spider_type}'是否结束---重试{max_retries} 次后仍失败, 跳过检查继续消费")
def kafka_stream(self, processing_time): def kafka_stream(self, processing_time):
kafka_df = self.create_kafka_df_object(consumer_type="latest", topic_name=self.topic_name, schema=self.schema) kafka_df = self.create_kafka_df_object(consumer_type="latest", topic_name=self.topic_name, schema=self.schema)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment