import json import os import sys from typing import Dict import requests sys.path.append(os.path.dirname(sys.path[0])) class DolphinschedulerHelper(object): _admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df" _ip_port = "http://113.100.143.162:12345" _project_map = {} _project_df_map = {} @classmethod def get_http_header(cls): return { "token": cls._admin_token, } pass @classmethod def get_project_map(cls): if len(cls._project_map) == 0: url = f"{cls._ip_port}/dolphinscheduler/projects" resp = requests.get( url, headers=cls.get_http_header(), params={ "pageNo": 1, "pageSize": 10, } ) resp_json = json.loads(resp.content.decode("utf-8")) for item in resp_json['data']['totalList']: cls._project_map[item['name']] = item['code'] return cls._project_map @classmethod def get_project_df_map(cls, project_code): if cls._project_df_map.get(project_code) == None: url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/simple-list" resp = requests.get( url, headers=cls.get_http_header() ) resp_json = json.loads(resp.content.decode("utf-8")) cls._project_df_map[project_code] = {} for item in resp_json['data']: cls._project_df_map[project_code][item['name']] = item['code'] return cls._project_df_map[project_code] @classmethod def start_process_instance(cls,site_name: str, date_info: str, table_list: str, flag: str, warning_Type: str = "NONE"): """ 启动一个海豚流程 :param project_name: 项目名 :param process_df_name: 流程名 :param startParams: 启动全局参数 :param warning_Type: 警告类型 NONE ALL :return: """ project_name = "big_data_selection" process_df_name = "pg爬虫库同步pg生产库" startParams = { "site_name": site_name, "date_info": date_info, "table_list": table_list, "flag": flag } project_map = cls.get_project_map() project_code = project_map.get(project_name) process_df_map: Dict = cls.get_project_df_map(project_code) process_df_code = process_df_map.get(process_df_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-process-instance" startParams['_sender'] = "api" req_params = { "processDefinitionCode": process_df_code, "scheduleTime": "", # 失败策略 "failureStrategy": "CONTINUE", "warningType": warning_Type, # 告警组 2 是http推送报警 "warningGroupId": "2", "execType": "", "startNodeList": "", "taskDependType": "TASK_POST", "runMode": "RUN_MODE_SERIAL", "processInstancePriority": "MEDIUM", "startParams": json.dumps(startParams), "expectedParallelismNumber": "", "dryRun": "0", } print(req_params) resp = requests.post( url, headers=cls.get_http_header(), data=req_params ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): return True else: raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!")