Commit ca8be90a by chenyuanjie

实时任务增加程序停止判断

parent d059daa2
...@@ -93,6 +93,8 @@ class Templates(object): ...@@ -93,6 +93,8 @@ class Templates(object):
# 测试标识 # 测试标识
self.test_flag = 'normal' self.test_flag = 'normal'
self.beginning_offsets_dict = {} # history消费时, 初始的偏移量 self.beginning_offsets_dict = {} # history消费时, 初始的偏移量
# 记录最后一次收到非空批次的时间,用于无新数据超时检测
self.last_data_time = time.time()
# redis连接对象--用来锁定--解决并发 # redis连接对象--用来锁定--解决并发
self.client = get_redis_h14() self.client = get_redis_h14()
...@@ -453,11 +455,25 @@ class Templates(object): ...@@ -453,11 +455,25 @@ class Templates(object):
self.query.awaitTermination() self.query.awaitTermination()
def handle_kafka_stream_templates(self, kafka_df, epoch_id): def handle_kafka_stream_templates(self, kafka_df, epoch_id):
if self.spider_type == 'asin详情' and kafka_df.count() > 0: has_data = self.spider_type == 'asin详情' and kafka_df.count() > 0
if has_data:
kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime") kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime")
self.handle_kafka_stream(kafka_df, epoch_id) self.handle_kafka_stream(kafka_df, epoch_id)
if has_data:
# 处理完成后更新时间戳,避免长批次处理耗时误触发超时
self.last_data_time = time.time()
if self.test_flag == 'normal': if self.test_flag == 'normal':
self.kafka_consumption_is_finished() self.kafka_consumption_is_finished()
# 仅当前批次无新数据时才做超时检测
# 若距上次有效数据已超过 30 分钟,说明爬虫可能已完成但状态表尚未更新
# 进入轮询,每 2 分钟重新检查一次,直到状态更新后 kafka_consumption_is_finished() 内部 exit(0)
if not has_data:
elapsed = time.time() - self.last_data_time
if elapsed > 30 * 60:
print(f"[超时检测] 已 {elapsed / 60:.1f} 分钟无新数据,进入状态轮询(每2分钟检查一次),等待爬虫状态更新")
while True:
time.sleep(120)
self.kafka_consumption_is_finished()
def handle_kafka_stream(self, kafka_df, epoch_id): def handle_kafka_stream(self, kafka_df, epoch_id):
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment