Commit 66c45f86 by fangxingjun

Merge branch 'developer' of 47.106.101.75:abel_cjy/Amazon-Selection-Data into developer

parents 72ac75ce 44e5900a
......@@ -345,15 +345,23 @@ class Templates(object):
pass
def kafka_stream_stop(self):
try:
import threading, os
self.start_process_instance() # 开启海豚调度
def _do_stop():
try:
if self.query is not None:
self.query.stop() # 退出实时消费(移除awaitTermination避免线程冲突)
self.query.stop() # 在子线程中调用,避免 foreachBatch 回调内死锁
if self.spark is not None:
self.spark.stop()
exit(0) # 退出程序
except Exception as e:
print(e, traceback.format_exc())
finally:
os._exit(0) # 强制退出 JVM,不被 try/except 拦截
t = threading.Thread(target=_do_stop, daemon=False)
t.start()
# foreachBatch 回调从此处正常返回,不阻塞等待 stop 完成
# def kafka_consumption_is_finished(self):
# while True:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment