Commit 44e5900a by chenyuanjie

kafka实时任务-线程死锁问题

parent 506ee903
...@@ -345,15 +345,23 @@ class Templates(object): ...@@ -345,15 +345,23 @@ class Templates(object):
pass pass
def kafka_stream_stop(self): def kafka_stream_stop(self):
try: import threading, os
self.start_process_instance() # 开启海豚调度 self.start_process_instance() # 开启海豚调度
def _do_stop():
try:
if self.query is not None: if self.query is not None:
self.query.stop() # 退出实时消费(移除awaitTermination避免线程冲突) self.query.stop() # 在子线程中调用,避免 foreachBatch 回调内死锁
if self.spark is not None: if self.spark is not None:
self.spark.stop() self.spark.stop()
exit(0) # 退出程序
except Exception as e: except Exception as e:
print(e, traceback.format_exc()) print(e, traceback.format_exc())
finally:
os._exit(0) # 强制退出 JVM,不被 try/except 拦截
t = threading.Thread(target=_do_stop, daemon=False)
t.start()
# foreachBatch 回调从此处正常返回,不阻塞等待 stop 完成
# def kafka_consumption_is_finished(self): # def kafka_consumption_is_finished(self):
# while True: # while True:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment