Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
6f48ffd3
Commit
6f48ffd3
authored
May 08, 2026
by
chenyuanjie
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
实时任务停止,独立线程判断
parent
9975fa69
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
41 additions
and
18 deletions
+41
-18
templates.py
Pyspark_job/utils/templates.py
+41
-18
No files found.
Pyspark_job/utils/templates.py
View file @
6f48ffd3
...
...
@@ -94,7 +94,11 @@ class Templates(object):
self
.
test_flag
=
'normal'
self
.
beginning_offsets_dict
=
{}
# history消费时, 初始的偏移量
# 记录最后一次收到非空批次的时间,用于无新数据超时检测
self
.
last_data_time
=
time
.
time
()
# self.last_data_time = time.time()
# 后台状态监控线程标记(防止重复启动)
self
.
_state_monitor_started
=
False
# kafka_stream_stop 防重入标记
self
.
_stopping
=
False
# redis连接对象--用来锁定--解决并发
self
.
client
=
get_redis_h14
()
...
...
@@ -345,6 +349,9 @@ class Templates(object):
pass
def
kafka_stream_stop
(
self
):
if
self
.
_stopping
:
return
self
.
_stopping
=
True
import
threading
,
os
self
.
start_process_instance
()
# 开启海豚调度
...
...
@@ -357,12 +364,32 @@ class Templates(object):
except
Exception
as
e
:
print
(
e
,
traceback
.
format_exc
())
finally
:
os
.
_exit
(
0
)
# 强制退出 JVM,不被 try/
except
拦截
os
.
_exit
(
0
)
# 强制退出 JVM,不被 try/
catch
拦截
t
=
threading
.
Thread
(
target
=
_do_stop
,
daemon
=
False
)
t
.
start
()
# foreachBatch 回调从此处正常返回,不阻塞等待 stop 完成
def
_start_state_monitor_thread
(
self
,
interval
=
900
):
"""启动后台守护线程,独立轮询爬虫状态。
解决 Kafka 无新数据时 forEachBatch 不触发、状态检查永远不执行的问题。"""
if
self
.
_state_monitor_started
:
return
self
.
_state_monitor_started
=
True
import
threading
def
_monitor
():
while
True
:
time
.
sleep
(
interval
)
try
:
self
.
kafka_consumption_is_finished
()
except
Exception
as
e
:
print
(
f
"[状态监控] 检查异常: {e}"
,
traceback
.
format_exc
())
t
=
threading
.
Thread
(
target
=
_monitor
,
daemon
=
True
)
t
.
start
()
print
(
f
"[状态监控] 后台监控线程已启动,每 {interval}s 检查一次爬虫状态"
)
# def kafka_consumption_is_finished(self):
# while True:
# try:
...
...
@@ -470,25 +497,21 @@ class Templates(object):
self
.
query
.
awaitTermination
()
def
handle_kafka_stream_templates
(
self
,
kafka_df
,
epoch_id
):
has_data
=
self
.
spider_type
==
'asin详情'
and
kafka_df
.
count
()
>
0
if
has_data
:
if
self
.
spider_type
==
'asin详情'
and
kafka_df
.
count
()
>
0
:
kafka_df
=
self
.
deduplication_kafka_data
(
kafka_df
,
"asin"
,
"asinUpdateTime"
)
self
.
handle_kafka_stream
(
kafka_df
,
epoch_id
)
if
has_data
:
# 处理完成后更新时间戳,避免长批次处理耗时误触发超时
self
.
last_data_time
=
time
.
time
()
if
self
.
test_flag
==
'normal'
:
self
.
kafka_consumption_is_finished
()
# 仅当前批次无新数据时才做超时检测
#
若距上次有效数据已超过 30 分钟,说明爬虫可能已完成但状态表尚未更新
# 进入轮询,每 2 分钟重新检查一次,直到状态更新后 kafka_consumption_is_finished() 内部 exit(0)
if
not
has_data
:
elapsed
=
time
.
time
()
-
self
.
last_data_time
if
elapsed
>
30
*
60
:
print
(
f
"[超时检测] 已 {elapsed / 60:.1f} 分钟无新数据,进入状态轮询(每2分钟检查一次),等待爬虫状态更新"
)
while
True
:
time
.
sleep
(
120
)
self
.
kafka_consumption_is_finished
()
# 启动守护线程,判断是否抓取结束
self
.
_start_state_monitor_thread
()
#
self.kafka_consumption_is_finished()
# 以下超时轮询逻辑已由 _start_state_monitor_thread() 后台线程替代,注释保留备查
# if not has_data:
# elapsed = time.time() - self.last_data_time
# if elapsed > 30 * 60:
#
while True:
#
time.sleep(120)
#
self.kafka_consumption_is_finished()
def
handle_kafka_stream
(
self
,
kafka_df
,
epoch_id
):
pass
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment