1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import json
import os
import sys
from typing import Dict
import requests
sys.path.append(os.path.dirname(sys.path[0]))
class DolphinschedulerHelper(object):
_admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df"
_ip_port = "http://113.100.143.162:12345"
_project_map = {}
_project_df_map = {}
@classmethod
def get_http_header(cls):
return {
"token": cls._admin_token,
}
pass
@classmethod
def get_project_map(cls):
if len(cls._project_map) == 0:
url = f"{cls._ip_port}/dolphinscheduler/projects"
resp = requests.get(
url,
headers=cls.get_http_header(),
params={
"pageNo": 1,
"pageSize": 10,
}
)
resp_json = json.loads(resp.content.decode("utf-8"))
for item in resp_json['data']['totalList']:
cls._project_map[item['name']] = item['code']
return cls._project_map
@classmethod
def get_project_df_map(cls, project_code):
if cls._project_df_map.get(project_code) == None:
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/simple-list"
resp = requests.get(
url,
headers=cls.get_http_header()
)
resp_json = json.loads(resp.content.decode("utf-8"))
cls._project_df_map[project_code] = {}
for item in resp_json['data']:
cls._project_df_map[project_code][item['name']] = item['code']
return cls._project_df_map[project_code]
@classmethod
def start_process_instance(cls,site_name: str, date_info: str, table_list: str, flag: str, warning_Type: str = "NONE"):
"""
启动一个海豚流程
:param project_name: 项目名
:param process_df_name: 流程名
:param startParams: 启动全局参数
:param warning_Type: 警告类型 NONE ALL
:return:
"""
project_name = "big_data_selection"
process_df_name = "pg爬虫库同步pg生产库"
startParams = {
"site_name": site_name,
"date_info": date_info,
"table_list": table_list,
"flag": flag
}
project_map = cls.get_project_map()
project_code = project_map.get(project_name)
process_df_map: Dict = cls.get_project_df_map(project_code)
process_df_code = process_df_map.get(process_df_name)
url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-process-instance"
startParams['_sender'] = "api"
req_params = {
"processDefinitionCode": process_df_code,
"scheduleTime": "",
# 失败策略
"failureStrategy": "CONTINUE",
"warningType": warning_Type,
# 告警组 2 是http推送报警
"warningGroupId": "2",
"execType": "",
"startNodeList": "",
"taskDependType": "TASK_POST",
"runMode": "RUN_MODE_SERIAL",
"processInstancePriority": "MEDIUM",
"startParams": json.dumps(startParams),
"expectedParallelismNumber": "",
"dryRun": "0",
}
print(req_params)
resp = requests.post(
url,
headers=cls.get_http_header(),
data=req_params
)
resp_json = json.loads(resp.content.decode("utf-8"))
if bool(resp_json['success']):
return True
else:
raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!")