aba_old_month.py 4.44 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125
import os
import sys, warnings

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil


class AbaOldMonth():
    @staticmethod
    def spark_py_run_sh(db_level, py_name, site_name, date_type, date_info):
        return f"""
    /opt/module/spark/bin/spark-submit   \\
    --master yarn  \\
    --driver-memory 10g  \\
    --executor-memory 10g  \\
    --executor-cores 4  \\
    --num-executors 10  \\
    --queue spark  \\
    /opt/module/spark/demo/py_demo/{db_level}/{py_name} {site_name} {date_type} {date_info}
    """

    @staticmethod
    def spark_java_run_sh(db_level, table_name, site_name, date_type, date_info):
        return f"""
/opt/module/spark/bin/spark-submit  \\
--master yarn \\
--driver-memory 2g \\
--executor-memory 10g \\
--executor-cores 3 \\
--num-executors 10 \\
--queue spark \\
--class com.yswg.selection.sparkJob.{db_level}.{table_name} /opt/module/spark/demo/java_jar/big_data_selection-dev.jar {site_name} {date_type} {date_info}
        """


#  重新计算aba month_old的数据脚本
if __name__ == '__main__':
    client = SSHUtil.get_ssh_client()
    try:
        # weeks = [
        #     '2023-01',
        #     '2023-02',
        #     '2023-03',
        #     '2023-04',
        #     '2023-05',
        #     '2023-06',
        #     '2023-07',
        #     '2023-08',
        #     '2023-09',
        #     '2023-10',
        #     '2023-11',
        #     '2023-12',
        #     '2023-13',
        #     '2023-14'
        # ]
        # # 先要获取周的ods_brand_analytics
        # for week in weeks:
        #     ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh us week_old {week}"
        #     SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)

        months = [
            '2023-01',
            '2023-02',
            '2023-03'
        ]

        # months = ['2022-01']
        site_name = 'us'
        date_type = 'month_old'

        for month in months:
            if month == '2023-03':
                ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh {site_name} {date_type} {month}"
                SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)

                dim_st_sh = f"bash /mnt/run_shell/spark_shell/dim/spark_dim_st_detail.sh {site_name} {date_type} {month}"
                SSHUtil.exec_command_async(client, dim_st_sh, ignore_err=False)

                dwd_st_measure_sh = f"bash /mnt/run_shell/spark_shell/dwd/spark_dwd_st_asin_measure.sh {site_name} {date_type} {month} "
                SSHUtil.exec_command_async(client, dwd_st_measure_sh, ignore_err=False)

            dws_st_num = AbaOldMonth.spark_java_run_sh("dws", "Dws_st_num_stats", site_name, date_type, month)
            SSHUtil.exec_command_async(client, dws_st_num, ignore_err=False)

            dws_st_volume_num = AbaOldMonth.spark_java_run_sh("dwd", "Dwd_st_volume_fba", site_name, date_type, month)
            SSHUtil.exec_command_async(client, dws_st_volume_num, ignore_err=False)

            dwt_st_market = f"""
/opt/module/spark/bin/spark-submit   \\
--master yarn  \\
--driver-memory 5g  \\
--executor-memory 10g  \\
--executor-cores 3  \\
--num-executors 10  \\
--queue spark  \\
/opt/module/spark/demo/py_demo/dwt/dwt_st_market.py us month_old {month}
"""
            SSHUtil.exec_command_async(client, dwt_st_market, ignore_err=False)

            dwt_aba_sh = f"bash /mnt/run_shell/spark_shell/dwt/spark_dwt_aba_st_analytics.sh {site_name} {date_type} {month} "
            SSHUtil.exec_command_async(client, dwt_aba_sh, ignore_err=False)

            aba_last365_sh = f"""
/opt/module/spark/bin/spark-submit   \\
--master yarn  \\
--driver-memory 5g  \\
--executor-memory 10g  \\
--executor-cores 3  \\
--num-executors 10  \\
--queue spark  \\
/opt/module/spark/demo/py_demo/dwt/dwt_aba_last365.py us month_old {month}
"""
            SSHUtil.exec_command_async(client, aba_last365_sh, ignore_err=False)

        last_month = max(months)
        aba365_export_sh = f"""
        /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_py/dwt_aba_last365.py us last365day {last_month}
        """
        SSHUtil.exec_command_async(client, aba365_export_sh, ignore_err=False)
        client.close()
    except:
        client.close()
        CommonUtil.send_wx_msg(['wujicang', 'huangjian'], "异常", "AbaOldMonth 执行失败!")
        pass