import json import os import sys import time from typing import Dict import requests sys.path.append(os.path.dirname(sys.path[0])) class DolphinschedulerHelper(object): _admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df" _ip_port = "http://hadoop12:12345" _project_map = {} _project_df_map = {} _def_project_name = "big_data_selection" @classmethod def get_http_header(cls): return { "token": cls._admin_token, # "Content-Type": "application/x-www-form-urlencoded", } pass @classmethod def get_project_map(cls): if len(cls._project_map) == 0: url = f"{cls._ip_port}/dolphinscheduler/projects" resp = requests.get( url, headers=cls.get_http_header(), params={ "pageNo": 1, "pageSize": 10, } ) resp_json = json.loads(resp.content.decode("utf-8")) for item in resp_json['data']['totalList']: cls._project_map[item['name']] = item['code'] return cls._project_map @classmethod def get_project_df_map(cls, project_code): if cls._project_df_map.get(project_code) == None: url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/simple-list" resp = requests.get( url, headers=cls.get_http_header() ) resp_json = json.loads(resp.content.decode("utf-8")) cls._project_df_map[project_code] = {} for item in resp_json['data']: cls._project_df_map[project_code][item['name']] = item['code'] return cls._project_df_map[project_code] @classmethod def queryProcessInstanceStateById(cls, project_name: str, process_instance_id: int, ): """ 返回第一个id :param project_name: :param process_df_name: :return: """ project_map = cls.get_project_map() project_code = project_map.get(project_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances/{process_instance_id}" resp = requests.get( url, headers=cls.get_http_header(), ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): return resp_json['data']['state'] else: raise Exception(f"任务【{project_name}/{process_instance_id}】获取状态失败!") @classmethod def watch_success_state(cls, project_name: str, process_instance_id: int, sleep: int = 5 ): """ 监控流程是否成功 :param project_name: :param process_instance_id: :param sleep: :return: """ while True: state = cls.queryProcessInstanceStateById( project_name=project_name, process_instance_id=process_instance_id ) if state == 'SUCCESS': return True elif state == 'FAILURE': return False elif state == 'RUNNING_EXECUTION': time.sleep(sleep) @classmethod def get_process_df_manger(cls, project_name: str, process_df_name: str): """ 获取海豚流程定义的全局变量对应的wx_user :param project_name: :param process_df_name: :return: """ project_map = cls.get_project_map() project_code = project_map.get(project_name) process_df_map: Dict = cls.get_project_df_map(project_code) process_df_code = process_df_map.get(process_df_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/{process_df_code}/view-variables" resp = requests.get( url, headers=cls.get_http_header(), ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): globalParams: list = resp_json['data']['globalParams'] paramMap = {param['prop']: param['value'] for param in globalParams} wx_user = paramMap.get("wx_user") or "" return wx_user.split(",") return None @classmethod def get_first_process_instance(cls, project_name: str, process_df_name: str, ): """ 返回第一个id :param project_name: :param process_df_name: :return: """ time.sleep(2) project_map = cls.get_project_map() project_code = project_map.get(project_name) process_df_map: Dict = cls.get_project_df_map(project_code) process_df_code = process_df_map.get(process_df_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances" req_params = { "pageNo": 1, "pageSize": 5, "processDefineCode": process_df_code, "startDate": "", "endDate": "", "host": "", } resp = requests.get( url, headers=cls.get_http_header(), params=req_params ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): total = resp_json['data']['totalList'] if len(total) > 0: return total[0]['id'] else: raise Exception(f"任务【{project_name}/{process_df_name}】获取id失败!") @classmethod def check_process_instance(cls, project_name: str, process_df_name: str, startDate: str = "" ): project_map = cls.get_project_map() project_code = project_map.get(project_name) process_df_map: Dict = cls.get_project_df_map(project_code) process_df_code = process_df_map.get(process_df_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances" req_params = { "pageNo": 1, "pageSize": 10, "processDefineCode": process_df_code, # "stateType": "RUNNING_EXECUTION", "startDate": startDate, "endDate": "", "host": "", } print(req_params) resp = requests.get( url, headers=cls.get_http_header(), params=req_params ) resp_json = json.loads(resp.content.decode("utf-8")) print(json.dumps(resp_json)) if bool(resp_json['success']): return resp_json['data']['total'] return 0 @classmethod def start_check_process_df(cls, project_name: str, process_df_name: str, ): project_map = cls.get_project_map() project_code = project_map.get(project_name) process_df_map: Dict = cls.get_project_df_map(project_code) process_df_code = process_df_map.get(process_df_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-check" req_params = { "processDefinitionCode": process_df_code, } resp = requests.post( url, headers=cls.get_http_header(), data=req_params ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): print(resp_json) return True return False @classmethod def start_process_instance(cls, project_name: str, process_df_name: str, startParams: Dict, # 警告类型 warning_Type: str = "NONE" ): """ 启动一个海豚流程 :param project_name: 项目名 :param process_df_name: 流程名 :param startParams: 启动全局参数 :param warning_Type: 警告类型 NONE ALL :return: """ project_map = cls.get_project_map() project_code = project_map.get(project_name) process_df_map: Dict = cls.get_project_df_map(project_code) process_df_code = process_df_map.get(process_df_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-process-instance" startParams['_sender'] = "api" # processDefinitionCode: 9651368013984 # scheduleTime: # failureStrategy: CONTINUE # warningType: ALL # warningGroupId: 5 # execType: # startNodeList: # taskDependType: TASK_POST # runMode: RUN_MODE_SERIAL # processInstancePriority: MEDIUM # workerGroup: default # environmentCode: 5769107604288 # startParams: {"site_name":"us","date_type":"week","wx_user":"wujicang"} # expectedParallelismNumber: # dryRun: 0 req_params = { "processDefinitionCode": process_df_code, "scheduleTime": "", # 失败策略 "failureStrategy": "CONTINUE", "warningType": warning_Type, # 告警组 2 是http推送报警 "warningGroupId": "2", "execType": "", "startNodeList": "", "taskDependType": "TASK_POST", "runMode": "RUN_MODE_SERIAL", "processInstancePriority": "MEDIUM", "workerGroup": "h5", # 环境组 "environmentCode": "10337211525600", "startParams": json.dumps(startParams), "expectedParallelismNumber": "", "dryRun": "0", } print(req_params) resp = requests.post( url, headers=cls.get_http_header(), data=req_params ) resp_json = json.loads(resp.content.decode("utf-8")) resp_state = bool(resp_json['success']) if resp_state: DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state) return True else: DolphinschedulerHelper.send_startup_state_to_oa(project_name, process_df_name, resp_state) raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!") @classmethod def start_and_watch_process_instance(cls, project_name: str, process_df_name: str, startParams: Dict, sleep=600): DolphinschedulerHelper.start_process_instance( project_name=project_name, process_df_name=process_df_name, startParams=startParams, warning_Type="ALL" ) print(f"任务【{project_name}/{process_df_name}】启动成功!") id = DolphinschedulerHelper.get_first_process_instance( project_name=project_name, process_df_name=process_df_name, ) success_flag = DolphinschedulerHelper.watch_success_state( project_name=project_name, process_instance_id=id, sleep=sleep ) return success_flag @classmethod def view_process_instance_variables(cls, project_name: str, process_instance_id: str): """ 获取海豚流程示例对应的全局变量 :param project_name:项目名称 :param process_instance_id:示例名称 :return: """ project_map = cls.get_project_map() project_code = project_map.get(project_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances/{process_instance_id}/view-variables" resp = requests.get( url, headers=cls.get_http_header(), params=None ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): globalParams: list = resp_json['data']['globalParams'] paramMap = {param['prop']: param['value'] for param in globalParams} return paramMap else: raise Exception(f"【{project_name}/{process_instance_id}】获取执行参数失败!") @classmethod def get_process_instance_tasks(cls, project_name: str, process_instance_id: str): """ 获取海豚流程示例对应的任务列表 :param project_name:项目名称 :param process_instance_id:示例id :return: """ project_map = cls.get_project_map() project_code = project_map.get(project_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances/{process_instance_id}/tasks" resp = requests.get( url, headers=cls.get_http_header(), params=None ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): return resp_json['data']['taskList'] else: raise Exception(f"【{project_name}/{process_instance_id}】失败!") @classmethod def get_running_process_task(cls, project_name: str = _def_project_name): """ 根据当前运行脚本判断是否是海豚上正在运行的任务并获取任务参数 :param project_name: 默认是 big_data_selection """ import sys import re # 测试环境 from utils.common_util import CommonUtil test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, '') == 'test' if test_flag: return # 获取海豚正在执行的任务判断是哪个task current_script = " ".join(sys.argv) current_script_name = sys.argv[0] project_map = cls.get_project_map() project_code = project_map.get(project_name) url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-instances" req_params = { "pageNo": 1, "pageSize": 10, "stateType": 'RUNNING_EXECUTION' } resp = requests.get( url, headers=cls.get_http_header(), params=req_params ) resp_json = json.loads(resp.content.decode("utf-8")) if bool(resp_json['success']): totalList = resp_json['data']['totalList'] for item in totalList: p_instance_id = item.get("id") taskList = DolphinschedulerHelper.get_process_instance_tasks(project_name, p_instance_id) for task in taskList: taskParams = json.loads(task['taskParams']) rawScript: str = taskParams.get('rawScript') if rawScript is None or current_script_name not in rawScript: continue paramMap = DolphinschedulerHelper.view_process_instance_variables(project_name, p_instance_id) for key, value in paramMap.items(): placeholder = "${" + key + "}" rawScript = rawScript.replace(placeholder, value) tmp_re = re.compile("\\s|/opt/module/anaconda3/envs/pyspark/bin/python3\.8|python3|python") if (tmp_re.sub("", rawScript)) == (re.compile("\\s").sub("", current_script)): return { "task_id": task['id'], "task_name": task['name'], "process_df_name": item['name'], "process_df_id": item['id'], "param": paramMap, "current_script": rawScript, } pass return None else: return None @classmethod def send_startup_state_to_oa(cls, project_name: str, process_df_name: str, resp_state: bool): """ 根据api触发海豚oa消息推送(推送人由维护在海豚调度任务中的wx_user决定) :param project_name:项目名称 :param process_df_name:流程名称 :param resp_state:任务调度启动状态 :return """ from utils.common_util import CommonUtil wx_user_list = DolphinschedulerHelper.get_process_df_manger(project_name, process_df_name) title = f"【海豚调度】调度api触发提示" if resp_state: msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发成功!" else: msg = f"项目【{project_name}】,流程【{process_df_name}】api任务触发异常,请查看日志!" if bool(wx_user_list): CommonUtil.send_wx_msg(wx_user_list, title, msg)