import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.DolphinschedulerHelper import DolphinschedulerHelper from utils.common_util import CommonUtil from utils.spark_util import SparkUtil if __name__ == '__main__': start_date = CommonUtil.get_sys_arg(1, None) end_date = CommonUtil.get_sys_arg(2, None) spark_session = SparkUtil.get_spark_session("re-run-aba") sql = f""" select distinct year_week as date_info from dim_date_20_to_30 where year_week >= '{start_date}' and year_week < '{end_date}'; """ date_df = spark_session.sql(sql) print(date_df.show()) date_list = sorted([d.asDict().get("date_info") for d in date_df.collect()]) print(date_list) for date_info in date_list: startParams = { "site_name": "us", "date_type": "week", "date_info": date_info } print(startParams) DolphinschedulerHelper.start_and_watch_process_instance( "big_data_selection", process_df_name='周重跑-新aba(四分位计算)流程', startParams={ "site_name": "us", "date_type": "week", "date_info": date_info } ) CommonUtil.send_wx_msg(["wangrui4", "chenyuanjie", "zhouyuchen"], "【周重跑-新aba(四分位计算)流程】导出完成", "集群升级——启动!") pass