1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
if __name__ == '__main__':
start_date = CommonUtil.get_sys_arg(1, None)
end_date = CommonUtil.get_sys_arg(2, None)
spark_session = SparkUtil.get_spark_session("re-run-aba")
sql = f"""
select distinct year_week as date_info from dim_date_20_to_30 where year_week >= '{start_date}' and year_week < '{end_date}';
"""
date_df = spark_session.sql(sql)
print(date_df.show())
date_list = sorted([d.asDict().get("date_info") for d in date_df.collect()])
print(date_list)
for date_info in date_list:
startParams = {
"site_name": "us",
"date_type": "week",
"date_info": date_info
}
print(startParams)
DolphinschedulerHelper.start_and_watch_process_instance(
"big_data_selection",
process_df_name='周重跑-新aba(四分位计算)流程',
startParams={
"site_name": "us",
"date_type": "week",
"date_info": date_info
}
)
CommonUtil.send_wx_msg(["wangrui4", "chenyuanjie", "zhouyuchen"], "【周重跑-新aba(四分位计算)流程】导出完成", "集群升级——启动!")
pass