export_dwd_feedback.py 4.64 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    sql_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    print(f"执行参数为{sys.argv}")

    if sql_flag == 'mysql':
        db_type = 'mysql'
        print("导出到mysql中")
    elif sql_flag == 'postgresql':
        db_type = "postgresql"
        print("导出到PG库中")

    # 获取数据库连接
    engine = DBUtil.get_db_engine(db_type, site_name)

    export_seller_agg_table = f"{site_name}_seller_asin_account_agg"

    # 保证幂等性,先删除原始表同周期的数据
    sql = f"""
        delete from {export_seller_agg_table} where date_info = {date_info}
    """
    DBUtil.engine_exec_sql(engine, sql)

    # 导出agg表
    sh_agg = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_seller_asin_account_agg",
        export_tb=export_seller_agg_table,
        col=[
            "account_id",
            "account_name",
            "asin_new_counts",
            "asin_counts",
            "asin_counts_exists",
            "counts_new_rate",
            "top_20_avg_price",
            "top_20_avg_rating",
            "top_20_avg_total_comments",
            "fb_variat_num",
            "fb_asin_total",
            "fb_variat_prop",
            "ym",
            "week",
            "date_info"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh_agg, ignore_err=False)

    # 导出dwt_fd_category_agg
    export_fd_cat_agg = f"{site_name}_seller_category_agg"

    # 清除同周期历史数据
    sql = f"""
        delete from {export_fd_cat_agg} where date_info = {date_info}
    """
    DBUtil.engine_exec_sql(engine, sql)
    # 导出表
    sh_fd_cat_agg = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwt_fd_category_agg",
        export_tb=export_fd_cat_agg,
        col=[
            "fd_account_id",
            "bsr_cate_1_id",
            "fd_cate_asin_num",
            "fd_cate_new_asin_num",
            "fd_asin_num",
            "bsr_asin_num",
            "fd_cate_asin_per",
            "fd_cate_new_asin_per",
            "fd_market_per",
            "ym",
            "week",
            "date_info"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )
    SSHUtil.exec_command_async(client, sh_fd_cat_agg, ignore_err=False)


    # 导出detail表
    export_seller_detail_table = f"{site_name}_seller_asin_account_detail_copy1"
    export_table_target = f"{site_name}_seller_asin_account_detail"

    # 清除 copy表数据
    sqls = [
        f"create table if not exists {export_seller_detail_table}  ( like {export_table_target} );",
        f"truncate table {export_seller_detail_table}; "
    ]
    for sql in sqls:
        DBUtil.engine_exec_sql(engine, sql)

    # 导出表
    sh_detail = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_seller_asin_account_detail",
        export_tb=export_seller_detail_table,
        col=[
            "account_id",
            "account_name",
            "asin",
            "launch_time",
            "days_diff",
            "is_asin_new"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    SSHUtil.exec_command_async(client, sh_detail, ignore_err=False)
    client.close()

    # 替换表名称
    # sql = f"""
    # alter table {export_table_target} rename to {export_table_target}_bak ;
    # alter table {export_seller_detail_table} rename to {export_table_target} ;
    # alter table {export_table_target}_bak rename to {export_seller_detail_table} ;
    # """

    sqls = [f"alter table {export_table_target} rename to {export_table_target}_bak ;",
            f"alter table {export_seller_detail_table} rename to {export_table_target} ;",
            f"alter table {export_table_target}_bak rename to {export_seller_detail_table} ;"
            ]
    for sql in sqls:
        DBUtil.engine_exec_sql(engine, sql)

    # 关闭链接
    engine.dispose()