aba_old_month.py
4.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import os
import sys, warnings
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
class AbaOldMonth():
@staticmethod
def spark_py_run_sh(db_level, py_name, site_name, date_type, date_info):
return f"""
/opt/module/spark/bin/spark-submit \\
--master yarn \\
--driver-memory 10g \\
--executor-memory 10g \\
--executor-cores 4 \\
--num-executors 10 \\
--queue spark \\
/opt/module/spark/demo/py_demo/{db_level}/{py_name} {site_name} {date_type} {date_info}
"""
@staticmethod
def spark_java_run_sh(db_level, table_name, site_name, date_type, date_info):
return f"""
/opt/module/spark/bin/spark-submit \\
--master yarn \\
--driver-memory 2g \\
--executor-memory 10g \\
--executor-cores 3 \\
--num-executors 10 \\
--queue spark \\
--class com.yswg.selection.sparkJob.{db_level}.{table_name} /opt/module/spark/demo/java_jar/big_data_selection-dev.jar {site_name} {date_type} {date_info}
"""
# 重新计算aba month_old的数据脚本
if __name__ == '__main__':
client = SSHUtil.get_ssh_client()
try:
# weeks = [
# '2023-01',
# '2023-02',
# '2023-03',
# '2023-04',
# '2023-05',
# '2023-06',
# '2023-07',
# '2023-08',
# '2023-09',
# '2023-10',
# '2023-11',
# '2023-12',
# '2023-13',
# '2023-14'
# ]
# # 先要获取周的ods_brand_analytics
# for week in weeks:
# ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh us week_old {week}"
# SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)
months = [
'2023-01',
'2023-02',
'2023-03'
]
# months = ['2022-01']
site_name = 'us'
date_type = 'month_old'
for month in months:
if month == '2023-03':
ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh {site_name} {date_type} {month}"
SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)
dim_st_sh = f"bash /mnt/run_shell/spark_shell/dim/spark_dim_st_detail.sh {site_name} {date_type} {month}"
SSHUtil.exec_command_async(client, dim_st_sh, ignore_err=False)
dwd_st_measure_sh = f"bash /mnt/run_shell/spark_shell/dwd/spark_dwd_st_asin_measure.sh {site_name} {date_type} {month} "
SSHUtil.exec_command_async(client, dwd_st_measure_sh, ignore_err=False)
dws_st_num = AbaOldMonth.spark_java_run_sh("dws", "Dws_st_num_stats", site_name, date_type, month)
SSHUtil.exec_command_async(client, dws_st_num, ignore_err=False)
dws_st_volume_num = AbaOldMonth.spark_java_run_sh("dwd", "Dwd_st_volume_fba", site_name, date_type, month)
SSHUtil.exec_command_async(client, dws_st_volume_num, ignore_err=False)
dwt_st_market = f"""
/opt/module/spark/bin/spark-submit \\
--master yarn \\
--driver-memory 5g \\
--executor-memory 10g \\
--executor-cores 3 \\
--num-executors 10 \\
--queue spark \\
/opt/module/spark/demo/py_demo/dwt/dwt_st_market.py us month_old {month}
"""
SSHUtil.exec_command_async(client, dwt_st_market, ignore_err=False)
dwt_aba_sh = f"bash /mnt/run_shell/spark_shell/dwt/spark_dwt_aba_st_analytics.sh {site_name} {date_type} {month} "
SSHUtil.exec_command_async(client, dwt_aba_sh, ignore_err=False)
aba_last365_sh = f"""
/opt/module/spark/bin/spark-submit \\
--master yarn \\
--driver-memory 5g \\
--executor-memory 10g \\
--executor-cores 3 \\
--num-executors 10 \\
--queue spark \\
/opt/module/spark/demo/py_demo/dwt/dwt_aba_last365.py us month_old {month}
"""
SSHUtil.exec_command_async(client, aba_last365_sh, ignore_err=False)
last_month = max(months)
aba365_export_sh = f"""
/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_py/dwt_aba_last365.py us last365day {last_month}
"""
SSHUtil.exec_command_async(client, aba365_export_sh, ignore_err=False)
client.close()
except:
client.close()
CommonUtil.send_wx_msg(['wujicang', 'huangjian'], "异常", "AbaOldMonth 执行失败!")
pass