import os import sys, warnings sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil class AbaOldMonth(): @staticmethod def spark_py_run_sh(db_level, py_name, site_name, date_type, date_info): return f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 10g \\ --executor-memory 10g \\ --executor-cores 4 \\ --num-executors 10 \\ --queue spark \\ /opt/module/spark/demo/py_demo/{db_level}/{py_name} {site_name} {date_type} {date_info} """ @staticmethod def spark_java_run_sh(db_level, table_name, site_name, date_type, date_info): return f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 2g \\ --executor-memory 10g \\ --executor-cores 3 \\ --num-executors 10 \\ --queue spark \\ --class com.yswg.selection.sparkJob.{db_level}.{table_name} /opt/module/spark/demo/java_jar/big_data_selection-dev.jar {site_name} {date_type} {date_info} """ # 重新计算aba month_old的数据脚本 if __name__ == '__main__': client = SSHUtil.get_ssh_client() try: # weeks = [ # '2023-01', # '2023-02', # '2023-03', # '2023-04', # '2023-05', # '2023-06', # '2023-07', # '2023-08', # '2023-09', # '2023-10', # '2023-11', # '2023-12', # '2023-13', # '2023-14' # ] # # 先要获取周的ods_brand_analytics # for week in weeks: # ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh us week_old {week}" # SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False) months = [ '2023-01', '2023-02', '2023-03' ] # months = ['2022-01'] site_name = 'us' date_type = 'month_old' for month in months: if month == '2023-03': ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh {site_name} {date_type} {month}" SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False) dim_st_sh = f"bash /mnt/run_shell/spark_shell/dim/spark_dim_st_detail.sh {site_name} {date_type} {month}" SSHUtil.exec_command_async(client, dim_st_sh, ignore_err=False) dwd_st_measure_sh = f"bash /mnt/run_shell/spark_shell/dwd/spark_dwd_st_asin_measure.sh {site_name} {date_type} {month} " SSHUtil.exec_command_async(client, dwd_st_measure_sh, ignore_err=False) dws_st_num = AbaOldMonth.spark_java_run_sh("dws", "Dws_st_num_stats", site_name, date_type, month) SSHUtil.exec_command_async(client, dws_st_num, ignore_err=False) dws_st_volume_num = AbaOldMonth.spark_java_run_sh("dwd", "Dwd_st_volume_fba", site_name, date_type, month) SSHUtil.exec_command_async(client, dws_st_volume_num, ignore_err=False) dwt_st_market = f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 5g \\ --executor-memory 10g \\ --executor-cores 3 \\ --num-executors 10 \\ --queue spark \\ /opt/module/spark/demo/py_demo/dwt/dwt_st_market.py us month_old {month} """ SSHUtil.exec_command_async(client, dwt_st_market, ignore_err=False) dwt_aba_sh = f"bash /mnt/run_shell/spark_shell/dwt/spark_dwt_aba_st_analytics.sh {site_name} {date_type} {month} " SSHUtil.exec_command_async(client, dwt_aba_sh, ignore_err=False) aba_last365_sh = f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 5g \\ --executor-memory 10g \\ --executor-cores 3 \\ --num-executors 10 \\ --queue spark \\ /opt/module/spark/demo/py_demo/dwt/dwt_aba_last365.py us month_old {month} """ SSHUtil.exec_command_async(client, aba_last365_sh, ignore_err=False) last_month = max(months) aba365_export_sh = f""" /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_py/dwt_aba_last365.py us last365day {last_month} """ SSHUtil.exec_command_async(client, aba365_export_sh, ignore_err=False) client.close() except: client.close() CommonUtil.send_wx_msg(['wujicang', 'huangjian'], "异常", "AbaOldMonth 执行失败!") pass