aba_old_month.py 4.44 KB
import os
import sys, warnings

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil


class AbaOldMonth():
    @staticmethod
    def spark_py_run_sh(db_level, py_name, site_name, date_type, date_info):
        return f"""
    /opt/module/spark/bin/spark-submit   \\
    --master yarn  \\
    --driver-memory 10g  \\
    --executor-memory 10g  \\
    --executor-cores 4  \\
    --num-executors 10  \\
    --queue spark  \\
    /opt/module/spark/demo/py_demo/{db_level}/{py_name} {site_name} {date_type} {date_info}
    """

    @staticmethod
    def spark_java_run_sh(db_level, table_name, site_name, date_type, date_info):
        return f"""
/opt/module/spark/bin/spark-submit  \\
--master yarn \\
--driver-memory 2g \\
--executor-memory 10g \\
--executor-cores 3 \\
--num-executors 10 \\
--queue spark \\
--class com.yswg.selection.sparkJob.{db_level}.{table_name} /opt/module/spark/demo/java_jar/big_data_selection-dev.jar {site_name} {date_type} {date_info}
        """


#  重新计算aba month_old的数据脚本
if __name__ == '__main__':
    client = SSHUtil.get_ssh_client()
    try:
        # weeks = [
        #     '2023-01',
        #     '2023-02',
        #     '2023-03',
        #     '2023-04',
        #     '2023-05',
        #     '2023-06',
        #     '2023-07',
        #     '2023-08',
        #     '2023-09',
        #     '2023-10',
        #     '2023-11',
        #     '2023-12',
        #     '2023-13',
        #     '2023-14'
        # ]
        # # 先要获取周的ods_brand_analytics
        # for week in weeks:
        #     ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh us week_old {week}"
        #     SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)

        months = [
            '2023-01',
            '2023-02',
            '2023-03'
        ]

        # months = ['2022-01']
        site_name = 'us'
        date_type = 'month_old'

        for month in months:
            if month == '2023-03':
                ods_brand_analytics_sh = f"/mnt/run_shell/spark_shell/ods/spark_ods_brand_analytics.sh {site_name} {date_type} {month}"
                SSHUtil.exec_command_async(client, ods_brand_analytics_sh, ignore_err=False)

                dim_st_sh = f"bash /mnt/run_shell/spark_shell/dim/spark_dim_st_detail.sh {site_name} {date_type} {month}"
                SSHUtil.exec_command_async(client, dim_st_sh, ignore_err=False)

                dwd_st_measure_sh = f"bash /mnt/run_shell/spark_shell/dwd/spark_dwd_st_asin_measure.sh {site_name} {date_type} {month} "
                SSHUtil.exec_command_async(client, dwd_st_measure_sh, ignore_err=False)

            dws_st_num = AbaOldMonth.spark_java_run_sh("dws", "Dws_st_num_stats", site_name, date_type, month)
            SSHUtil.exec_command_async(client, dws_st_num, ignore_err=False)

            dws_st_volume_num = AbaOldMonth.spark_java_run_sh("dwd", "Dwd_st_volume_fba", site_name, date_type, month)
            SSHUtil.exec_command_async(client, dws_st_volume_num, ignore_err=False)

            dwt_st_market = f"""
/opt/module/spark/bin/spark-submit   \\
--master yarn  \\
--driver-memory 5g  \\
--executor-memory 10g  \\
--executor-cores 3  \\
--num-executors 10  \\
--queue spark  \\
/opt/module/spark/demo/py_demo/dwt/dwt_st_market.py us month_old {month}
"""
            SSHUtil.exec_command_async(client, dwt_st_market, ignore_err=False)

            dwt_aba_sh = f"bash /mnt/run_shell/spark_shell/dwt/spark_dwt_aba_st_analytics.sh {site_name} {date_type} {month} "
            SSHUtil.exec_command_async(client, dwt_aba_sh, ignore_err=False)

            aba_last365_sh = f"""
/opt/module/spark/bin/spark-submit   \\
--master yarn  \\
--driver-memory 5g  \\
--executor-memory 10g  \\
--executor-cores 3  \\
--num-executors 10  \\
--queue spark  \\
/opt/module/spark/demo/py_demo/dwt/dwt_aba_last365.py us month_old {month}
"""
            SSHUtil.exec_command_async(client, aba_last365_sh, ignore_err=False)

        last_month = max(months)
        aba365_export_sh = f"""
        /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_py/dwt_aba_last365.py us last365day {last_month}
        """
        SSHUtil.exec_command_async(client, aba365_export_sh, ignore_err=False)
        client.close()
    except:
        client.close()
        CommonUtil.send_wx_msg(['wujicang', 'huangjian'], "异常", "AbaOldMonth 执行失败!")
        pass