Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
A
Amazon-Selection-Data
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
abel_cjy
Amazon-Selection-Data
Commits
67c72130
Commit
67c72130
authored
Mar 20, 2026
by
吴济苍
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
【需求】新增海豚任务监控机器人
parent
96c4c74c
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
90 additions
and
14 deletions
+90
-14
dolphinscheduler_task_monitor.py
Pyspark_job/script/dolphinscheduler_task_monitor.py
+46
-0
DolphinschedulerHelper.py
Pyspark_job/utils/DolphinschedulerHelper.py
+23
-0
common_util.py
Pyspark_job/utils/common_util.py
+21
-14
No files found.
Pyspark_job/script/dolphinscheduler_task_monitor.py
0 → 100644
View file @
67c72130
import
os
import
sys
sys
.
path
.
append
(
os
.
path
.
dirname
(
sys
.
path
[
0
]))
# 上级目录
from
utils.DolphinschedulerHelper
import
DolphinschedulerHelper
from
utils.redis_utils
import
RedisUtils
from
utils.common_util
import
CommonUtil
if
__name__
==
'__main__'
:
# 监听半小时之内报错的任务
client
=
RedisUtils
.
getClient
()
redis_key
=
"dolphinscheduler_task_monitor:lateTime"
lateTime
=
client
.
get
(
redis_key
)
lateTime
=
lateTime
or
"2026-03-20 00:00:00"
import
json
print
(
lateTime
)
req_params
=
{
"pageNo"
:
1
,
"pageSize"
:
100
,
"stateType"
:
"FAILURE"
,
"startDate"
:
lateTime
}
project_name
=
"big_data_selection"
errList
=
DolphinschedulerHelper
.
list_projects_task
(
project_name
=
project_name
,
req_params
=
req_params
)
errMsg
=
[]
for
it
in
errList
:
task_name
=
"-"
.
join
(
it
[
'name'
]
.
split
(
"-"
)[:
-
2
])
paramMap
=
DolphinschedulerHelper
.
view_process_instance_variables
(
project_name
,
it
[
"id"
])
errMsg
.
append
(
f
"""任务[{task_name}]执行失败,启动参数为:{json.dumps(paramMap)}"""
)
from
datetime
import
datetime
now
=
datetime
.
now
()
formatted_time
=
now
.
strftime
(
"
%
Y-
%
m-
%
d
%
H:
%
M:
%
S"
)
if
len
(
errMsg
)
>
0
:
all_msg
=
"
\n
"
.
join
(
errMsg
)
msg
=
f
"""截止到日期{lateTime}到{formatted_time},海豚任务报错详情如下
\n
{all_msg}"""
CommonUtil
.
send_msg_robot
(
msg
)
client
.
set
(
redis_key
,
formatted_time
)
Pyspark_job/utils/DolphinschedulerHelper.py
View file @
67c72130
...
@@ -498,3 +498,26 @@ class DolphinschedulerHelper(object):
...
@@ -498,3 +498,26 @@ class DolphinschedulerHelper(object):
return
resp_json
[
'msg'
]
return
resp_json
[
'msg'
]
else
:
else
:
raise
Exception
(
f
"任务停止失败"
)
raise
Exception
(
f
"任务停止失败"
)
@classmethod
def
list_projects_task
(
cls
,
project_name
:
str
=
_def_project_name
,
req_params
=
None
):
"""
根据当前运行脚本判断是否是海豚上正在运行的任务并获取任务参数
:param project_name: 默认是 big_data_selection
"""
if
req_params
is
None
:
req_params
=
{}
project_map
=
cls
.
get_project_map
()
project_code
=
project_map
.
get
(
project_name
)
url
=
f
"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances"
resp
=
requests
.
get
(
url
,
headers
=
cls
.
get_http_header
(),
params
=
req_params
)
resp_json
=
json
.
loads
(
resp
.
content
.
decode
(
"utf-8"
))
if
bool
(
resp_json
[
'success'
]):
return
resp_json
[
'data'
][
'totalList'
]
else
:
return
None
Pyspark_job/utils/common_util.py
View file @
67c72130
...
@@ -1884,17 +1884,24 @@ outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
...
@@ -1884,17 +1884,24 @@ outputformat 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
df_latest_asin_detail_with_parent
=
df_latest_asin_detail_with_parent
.
withColumnRenamed
(
f
"new_{column}"
,
f
"{column}"
)
df_latest_asin_detail_with_parent
=
df_latest_asin_detail_with_parent
.
withColumnRenamed
(
f
"new_{column}"
,
f
"{column}"
)
return
df_asin_detail
,
df_latest_asin_detail_with_parent
return
df_asin_detail
,
df_latest_asin_detail_with_parent
@staticmethod
def
send_msg_robot
(
msg
:
str
):
webhook_url
=
"https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=c519c702-6164-45c8-98b5-a87e52150f19"
headers
=
{
"Content-Type"
:
"application/json"
}
data
=
{
"msgtype"
:
"text"
,
"text"
:
{
"content"
:
msg
}
}
resp
=
requests
.
post
(
webhook_url
,
json
=
data
,
headers
=
headers
)
if
resp
.
status_code
==
200
:
result
=
resp
.
json
()
if
result
.
get
(
"errcode"
)
==
0
:
print
(
"发送成功"
)
else
:
print
(
"发送失败:"
,
result
)
else
:
print
(
"HTTP错误:"
,
resp
.
status_code
,
resp
.
text
)
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment