post_to_dolphin.py 3.63 KB
import json
import os
import sys
from typing import Dict
import requests

sys.path.append(os.path.dirname(sys.path[0]))


class DolphinschedulerHelper(object):
    _admin_token = "2a761f0d17baac7ac6ac4a23fe6f33df"
    _ip_port = "http://113.100.143.162:12345"
    _project_map = {}
    _project_df_map = {}

    @classmethod
    def get_http_header(cls):
        return {
            "token": cls._admin_token,
        }
        pass

    @classmethod
    def get_project_map(cls):
        if len(cls._project_map) == 0:
            url = f"{cls._ip_port}/dolphinscheduler/projects"
            resp = requests.get(
                url,
                headers=cls.get_http_header(),
                params={
                    "pageNo": 1,
                    "pageSize": 10,
                }
            )
            resp_json = json.loads(resp.content.decode("utf-8"))

            for item in resp_json['data']['totalList']:
                cls._project_map[item['name']] = item['code']

        return cls._project_map

    @classmethod
    def get_project_df_map(cls, project_code):
        if cls._project_df_map.get(project_code) == None:
            url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/process-definition/simple-list"
            resp = requests.get(
                url,
                headers=cls.get_http_header()
            )
            resp_json = json.loads(resp.content.decode("utf-8"))

            cls._project_df_map[project_code] = {}

            for item in resp_json['data']:
                cls._project_df_map[project_code][item['name']] = item['code']

        return cls._project_df_map[project_code]

    @classmethod
    def start_process_instance(cls,site_name: str, date_info: str, table_list: str, flag: str, warning_Type: str = "NONE"):
        """
        启动一个海豚流程
        :param project_name:  项目名
        :param process_df_name: 流程名
        :param startParams: 启动全局参数
        :param warning_Type: 警告类型 NONE ALL
        :return:
        """
        project_name = "big_data_selection"
        process_df_name = "pg爬虫库同步pg生产库"
        startParams = {
            "site_name": site_name,
            "date_info": date_info,
            "table_list": table_list,
            "flag": flag
        }

        project_map = cls.get_project_map()
        project_code = project_map.get(project_name)
        process_df_map: Dict = cls.get_project_df_map(project_code)
        process_df_code = process_df_map.get(process_df_name)

        url = f"{cls._ip_port}/dolphinscheduler/projects/{project_code}/executors/start-process-instance"

        startParams['_sender'] = "api"
        req_params = {
            "processDefinitionCode": process_df_code,
            "scheduleTime": "",
            # 失败策略
            "failureStrategy": "CONTINUE",
            "warningType": warning_Type,
            # 告警组 2 是http推送报警
            "warningGroupId": "2",
            "execType": "",
            "startNodeList": "",
            "taskDependType": "TASK_POST",
            "runMode": "RUN_MODE_SERIAL",
            "processInstancePriority": "MEDIUM",
            "startParams": json.dumps(startParams),
            "expectedParallelismNumber": "",
            "dryRun": "0",
        }
        print(req_params)
        resp = requests.post(
            url,
            headers=cls.get_http_header(),
            data=req_params
        )
        resp_json = json.loads(resp.content.decode("utf-8"))
        if bool(resp_json['success']):
            return True
        else:
            raise Exception(f"任务【{project_name}/{process_df_name}】调度失败!")