import os import sys, warnings sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil class WorkFlowAbaMonthOld(object): @staticmethod def spark_py_run_sh(db_level, py_name, site_name, date_type, date_info): return f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 5g \\ --executor-memory 10g \\ --executor-cores 3 \\ --num-executors 10 \\ --queue spark \\ /opt/module/spark/demo/py_demo/{db_level}/{py_name}.py {site_name} {date_type} {date_info} """ @staticmethod def spark_java_run_sh(db_level, table_name, site_name, date_type, date_info): return f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 5g \\ --executor-memory 10g \\ --executor-cores 3 \\ --num-executors 10 \\ --queue spark \\ --class com.yswg.selection.sparkJob.{db_level}.{table_name} /opt/module/spark/demo/java_jar/big_data_selection-dev.jar {site_name} {date_type} {date_info} """ if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) if date_type in ('month', 'month_old'): date_type = 'month_old' else: raise ValueError("传输的日期类型不符合") client = SSHUtil.get_ssh_client() try: ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh {site_name} {date_type} {date_info}" SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False) dim_st_sh = f"bash /mnt/run_shell/spark_shell/dim/spark_dim_st_detail.sh {site_name} {date_type} {date_info}" SSHUtil.exec_command_async(client, dim_st_sh, ignore_err=False) dwd_st_measure_sh = f"bash /mnt/run_shell/spark_shell/dwd/spark_dwd_st_asin_measure.sh {site_name} {date_type} {date_info} " SSHUtil.exec_command_async(client, dwd_st_measure_sh, ignore_err=False) dws_st_num = WorkFlowAbaMonthOld.spark_java_run_sh("dws", "Dws_st_num_stats", site_name, date_type, date_info) SSHUtil.exec_command_async(client, dws_st_num, ignore_err=False) dws_st_volume_num = WorkFlowAbaMonthOld.spark_java_run_sh("dwd", "Dwd_st_volume_fba", site_name, date_type, date_info) SSHUtil.exec_command_async(client, dws_st_volume_num, ignore_err=False) dwt_st_market = WorkFlowAbaMonthOld.spark_py_run_sh("dwt", "dwt_st_market", site_name, date_type, date_info) SSHUtil.exec_command_async(client, dwt_st_market, ignore_err=False) dwt_aba_sh = f"bash /mnt/run_shell/spark_shell/dwt/spark_dwt_aba_st_analytics.sh {site_name} {date_type} {date_info} " SSHUtil.exec_command_async(client, dwt_aba_sh, ignore_err=False) client.close() except: client.close() CommonUtil.send_wx_msg(['wujicang', 'huangjian'], "异常", "AbaMonthOld workflow脚本执行失败!")