re-run.py 1.39 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil


if __name__ == '__main__':
    start_date = CommonUtil.get_sys_arg(1, None)
    end_date = CommonUtil.get_sys_arg(2, None)

    spark_session = SparkUtil.get_spark_session("re-run-aba")

    sql = f"""
    select distinct year_week as date_info from dim_date_20_to_30 where year_week >= '{start_date}' and year_week < '{end_date}';
    """
    date_df = spark_session.sql(sql)

    print(date_df.show())

    date_list = sorted([d.asDict().get("date_info") for d in date_df.collect()])

    print(date_list)

    for date_info in date_list:
        startParams = {
            "site_name": "us",
            "date_type": "week",
            "date_info": date_info
        }
        print(startParams)
        DolphinschedulerHelper.start_and_watch_process_instance(
            "big_data_selection",
            process_df_name='周重跑-新aba(四分位计算)流程',
            startParams={
                "site_name": "us",
                "date_type": "week",
                "date_info": date_info
            }
        )

    CommonUtil.send_wx_msg(["wangrui4", "chenyuanjie", "zhouyuchen"], "【周重跑-新aba(四分位计算)流程】导出完成", "集群升级——启动!")

    pass