Commit e97a3b65 by 吴济苍

[优化]新增海豚监听任务定时重启

parent f6f075d7
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.DolphinschedulerHelper import DolphinschedulerHelper
if __name__ == '__main__':
projectName = "listener"
process_df_name = "keyword_pcp竞价监听"
DolphinschedulerHelper.restart_listener(projectName, process_df_name, startParams={"wx_user": "wujicang"})
...@@ -8,7 +8,6 @@ import requests ...@@ -8,7 +8,6 @@ import requests
sys.path.append(os.path.dirname(sys.path[0])) sys.path.append(os.path.dirname(sys.path[0]))
class DolphinschedulerHelper(object): class DolphinschedulerHelper(object):
_admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df" _admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df"
_ip_port = "http://hadoop12:12345" _ip_port = "http://hadoop12:12345"
...@@ -451,3 +450,51 @@ class DolphinschedulerHelper(object): ...@@ -451,3 +450,51 @@ class DolphinschedulerHelper(object):
msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!" msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!"
if bool(wx_user_list): if bool(wx_user_list):
CommonUtil.send_wx_msg(wx_user_list, title, msg) CommonUtil.send_wx_msg(wx_user_list, title, msg)
@classmethod
def restart_listener(cls, project_name: str, process_df_name: str, startParams: Dict):
"""
重启海豚任务
:param project_name:项目名称
:param process_df_name:流程名
:return:
"""
process_id = DolphinschedulerHelper.get_first_process_instance(project_name, process_df_name)
tasks = DolphinschedulerHelper.get_process_instance_tasks(project_name, process_id)
for task in tasks:
state = task['state']
if state == 'RUNNING_EXECUTION':
DolphinschedulerHelper.stop_process(project_name, process_id)
# 重启任务
DolphinschedulerHelper.start_process_instance(
project_name,
process_df_name,
startParams=startParams,
warning_Type="ALL"
)
@classmethod
def stop_process(cls, project_name: str, process_instance_id: str):
"""
终止海豚任务进程
:param project_name:项目名称
:param process_instance_id:示例id
:return:
"""
project_map = cls.get_project_map()
project_code = project_map.get(project_name)
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/execute"
req_params = {
"processInstanceId": process_instance_id,
"executeType": 'STOP',
}
resp = requests.post(
url,
headers=cls.get_http_header(),
data=req_params
)
resp_json = json.loads(resp.content.decode("utf-8"))
if bool(resp_json['success']):
return resp_json['msg']
else:
raise Exception(f"任务停止失败")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment