workflow_aba_month_old.py 3.08 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
import os
import sys, warnings

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil


class WorkFlowAbaMonthOld(object):
    @staticmethod
    def spark_py_run_sh(db_level, py_name, site_name, date_type, date_info):
        return f"""
        /opt/module/spark/bin/spark-submit   \\
        --master yarn  \\
        --driver-memory 5g  \\
        --executor-memory 10g  \\
        --executor-cores 3  \\
        --num-executors 10  \\
        --queue spark  \\
        /opt/module/spark/demo/py_demo/{db_level}/{py_name}.py {site_name} {date_type} {date_info}
        """

    @staticmethod
    def spark_java_run_sh(db_level, table_name, site_name, date_type, date_info):
        return f"""
    /opt/module/spark/bin/spark-submit  \\
    --master yarn \\
    --driver-memory 5g \\
    --executor-memory 10g \\
    --executor-cores 3 \\
    --num-executors 10 \\
    --queue spark \\
    --class com.yswg.selection.sparkJob.{db_level}.{table_name} /opt/module/spark/demo/java_jar/big_data_selection-dev.jar {site_name} {date_type} {date_info}
            """


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)

    if date_type in ('month', 'month_old'):
        date_type = 'month_old'
    else:
        raise ValueError("传输的日期类型不符合")
    client = SSHUtil.get_ssh_client()
    try:
        ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh {site_name} {date_type} {date_info}"
        SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)

        dim_st_sh = f"bash /mnt/run_shell/spark_shell/dim/spark_dim_st_detail.sh {site_name} {date_type} {date_info}"
        SSHUtil.exec_command_async(client, dim_st_sh, ignore_err=False)

        dwd_st_measure_sh = f"bash /mnt/run_shell/spark_shell/dwd/spark_dwd_st_asin_measure.sh {site_name} {date_type} {date_info} "
        SSHUtil.exec_command_async(client, dwd_st_measure_sh, ignore_err=False)

        dws_st_num = WorkFlowAbaMonthOld.spark_java_run_sh("dws", "Dws_st_num_stats", site_name, date_type, date_info)
        SSHUtil.exec_command_async(client, dws_st_num, ignore_err=False)

        dws_st_volume_num = WorkFlowAbaMonthOld.spark_java_run_sh("dwd", "Dwd_st_volume_fba", site_name, date_type,
                                                                  date_info)
        SSHUtil.exec_command_async(client, dws_st_volume_num, ignore_err=False)

        dwt_st_market = WorkFlowAbaMonthOld.spark_py_run_sh("dwt", "dwt_st_market", site_name, date_type, date_info)
        SSHUtil.exec_command_async(client, dwt_st_market, ignore_err=False)

        dwt_aba_sh = f"bash /mnt/run_shell/spark_shell/dwt/spark_dwt_aba_st_analytics.sh {site_name} {date_type} {date_info} "
        SSHUtil.exec_command_async(client, dwt_aba_sh, ignore_err=False)
        client.close()
    except:
        client.close()
        CommonUtil.send_wx_msg(['wujicang', 'huangjian'], "异常", "AbaMonthOld workflow脚本执行失败!")