export_dwd_feedback.py 4.64 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    sql_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    print(f"执行参数为{sys.argv}")

    if sql_flag == 'mysql':
        db_type = 'mysql'
        print("导出到mysql中")
    elif sql_flag == 'postgresql':
        db_type = "postgresql"
        print("导出到PG库中")

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)

    export_seller_agg_table = f"{site_name}_seller_asin_account_agg"

    # 保证幂等性,先删除原始表同周期的数据
    sql = f"""
        delete from {export_seller_agg_table} where date_info = {date_info}
    """
    DBUtil.engine_exec_sql(engine, sql)

    # 导出agg表
    sh_agg = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_seller_asin_account_agg",
        export_tb=export_seller_agg_table,
        col=[
            "account_id",
            "account_name",
            "asin_new_counts",
            "asin_counts",
            "asin_counts_exists",
            "counts_new_rate",
            "top_20_avg_price",
            "top_20_avg_rating",
            "top_20_avg_total_comments",
            "fb_variat_num",
            "fb_asin_total",
            "fb_variat_prop",
            "ym",
            "week",
            "date_info"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh_agg, ignore_err=False)

    # 导出dwt_fd_category_agg
    export_fd_cat_agg = f"{site_name}_seller_category_agg"

    # 清除同周期历史数据
    sql = f"""
        delete from {export_fd_cat_agg} where date_info = {date_info}
    """
    DBUtil.engine_exec_sql(engine, sql)
    # 导出表
    sh_fd_cat_agg = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwt_fd_category_agg",
        export_tb=export_fd_cat_agg,
        col=[
            "fd_account_id",
            "bsr_cate_1_id",
            "fd_cate_asin_num",
            "fd_cate_new_asin_num",
            "fd_asin_num",
            "bsr_asin_num",
            "fd_cate_asin_per",
            "fd_cate_new_asin_per",
            "fd_market_per",
            "ym",
            "week",
            "date_info"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )
    SSHUtil.exec_command_async(client, sh_fd_cat_agg, ignore_err=False)


    # 导出detail表
    export_seller_detail_table = f"{site_name}_seller_asin_account_detail_copy1"
    export_table_target = f"{site_name}_seller_asin_account_detail"

    # 清除 copy表数据
    sqls = [
        f"create table if not exists {export_seller_detail_table}  ( like {export_table_target} );",
        f"truncate table {export_seller_detail_table}; "
    ]
    for sql in sqls:
        DBUtil.engine_exec_sql(engine, sql)

    # 导出表
    sh_detail = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_seller_asin_account_detail",
        export_tb=export_seller_detail_table,
        col=[
            "account_id",
            "account_name",
            "asin",
            "launch_time",
            "days_diff",
            "is_asin_new"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    SSHUtil.exec_command_async(client, sh_detail, ignore_err=False)
    client.close()

    # 替换表名称
    # sql = f"""
    # alter table {export_table_target} rename to {export_table_target}_bak ;
    # alter table {export_seller_detail_table} rename to {export_table_target} ;
    # alter table {export_table_target}_bak rename to {export_seller_detail_table} ;
    # """

    sqls = [f"alter table {export_table_target} rename to {export_table_target}_bak ;",
            f"alter table {export_seller_detail_table} rename to {export_table_target} ;",
            f"alter table {export_table_target}_bak rename to {export_seller_detail_table} ;"
            ]
    for sql in sqls:
        DBUtil.engine_exec_sql(engine, sql)

    # 关闭链接
    engine.dispose()