Commit a4290282 by fangxingjun

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents 1a7ede51 6f48ffd3
...@@ -94,7 +94,11 @@ class Templates(object): ...@@ -94,7 +94,11 @@ class Templates(object):
self.test_flag = 'normal' self.test_flag = 'normal'
self.beginning_offsets_dict = {} # history消费时, 初始的偏移量 self.beginning_offsets_dict = {} # history消费时, 初始的偏移量
# 记录最后一次收到非空批次的时间,用于无新数据超时检测 # 记录最后一次收到非空批次的时间,用于无新数据超时检测
self.last_data_time = time.time() # self.last_data_time = time.time()
# 后台状态监控线程标记(防止重复启动)
self._state_monitor_started = False
# kafka_stream_stop 防重入标记
self._stopping = False
# redis连接对象--用来锁定--解决并发 # redis连接对象--用来锁定--解决并发
self.client = get_redis_h14() self.client = get_redis_h14()
...@@ -345,6 +349,9 @@ class Templates(object): ...@@ -345,6 +349,9 @@ class Templates(object):
pass pass
def kafka_stream_stop(self): def kafka_stream_stop(self):
if self._stopping:
return
self._stopping = True
import threading, os import threading, os
self.start_process_instance() # 开启海豚调度 self.start_process_instance() # 开启海豚调度
...@@ -357,12 +364,32 @@ class Templates(object): ...@@ -357,12 +364,32 @@ class Templates(object):
except Exception as e: except Exception as e:
print(e, traceback.format_exc()) print(e, traceback.format_exc())
finally: finally:
os._exit(0) # 强制退出 JVM,不被 try/except 拦截 os._exit(0) # 强制退出 JVM,不被 try/catch 拦截
t = threading.Thread(target=_do_stop, daemon=False) t = threading.Thread(target=_do_stop, daemon=False)
t.start() t.start()
# foreachBatch 回调从此处正常返回,不阻塞等待 stop 完成 # foreachBatch 回调从此处正常返回,不阻塞等待 stop 完成
def _start_state_monitor_thread(self, interval=900):
"""启动后台守护线程,独立轮询爬虫状态。
解决 Kafka 无新数据时 forEachBatch 不触发、状态检查永远不执行的问题。"""
if self._state_monitor_started:
return
self._state_monitor_started = True
import threading
def _monitor():
while True:
time.sleep(interval)
try:
self.kafka_consumption_is_finished()
except Exception as e:
print(f"[状态监控] 检查异常: {e}", traceback.format_exc())
t = threading.Thread(target=_monitor, daemon=True)
t.start()
print(f"[状态监控] 后台监控线程已启动,每 {interval}s 检查一次爬虫状态")
# def kafka_consumption_is_finished(self): # def kafka_consumption_is_finished(self):
# while True: # while True:
# try: # try:
...@@ -474,25 +501,21 @@ class Templates(object): ...@@ -474,25 +501,21 @@ class Templates(object):
self.query.awaitTermination() self.query.awaitTermination()
def handle_kafka_stream_templates(self, kafka_df, epoch_id): def handle_kafka_stream_templates(self, kafka_df, epoch_id):
has_data = self.spider_type == 'asin详情' and kafka_df.count() > 0 if self.spider_type == 'asin详情' and kafka_df.count() > 0:
if has_data:
kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime") kafka_df = self.deduplication_kafka_data(kafka_df, "asin", "asinUpdateTime")
self.handle_kafka_stream(kafka_df, epoch_id) self.handle_kafka_stream(kafka_df, epoch_id)
if has_data:
# 处理完成后更新时间戳,避免长批次处理耗时误触发超时
self.last_data_time = time.time()
if self.test_flag == 'normal': if self.test_flag == 'normal':
self.kafka_consumption_is_finished() # 启动守护线程,判断是否抓取结束
# 仅当前批次无新数据时才做超时检测 self._start_state_monitor_thread()
# 若距上次有效数据已超过 30 分钟,说明爬虫可能已完成但状态表尚未更新 # self.kafka_consumption_is_finished()
# 进入轮询,每 2 分钟重新检查一次,直到状态更新后 kafka_consumption_is_finished() 内部 exit(0)
if not has_data: # 以下超时轮询逻辑已由 _start_state_monitor_thread() 后台线程替代,注释保留备查
elapsed = time.time() - self.last_data_time # if not has_data:
if elapsed > 30 * 60: # elapsed = time.time() - self.last_data_time
print(f"[超时检测] 已 {elapsed / 60:.1f} 分钟无新数据,进入状态轮询(每2分钟检查一次),等待爬虫状态更新") # if elapsed > 30 * 60:
while True: # while True:
time.sleep(120) # time.sleep(120)
self.kafka_consumption_is_finished() # self.kafka_consumption_is_finished()
def handle_kafka_stream(self, kafka_df, epoch_id): def handle_kafka_stream(self, kafka_df, epoch_id):
pass pass
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment