dwt_bsr_asin_detail.py 4.23 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.db_util import DBUtil, DbTypes
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil


def export_postgresql():
    pass


def export_postgresql_cluster():

    pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    #  获取最后一个参数
    test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    # 工作时间不导出
    date_type = "day"
    CommonUtil.judge_is_work_hours(site_name=site_name,
                                   date_type=date_type,
                                   date_info=date_info,
                                   principal='wujicang',
                                   priority=1,
                                   export_tools_type=1)

    if test_flag == 'test':
        db_type = DbTypes.postgresql_test.name
    else:
        db_type = DbTypes.postgresql_cluster.name
    print(f"导出到{db_type}中")

    engine = DBUtil.get_db_engine(db_type, site_name)
    d_month_now = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y_%m", )
    rel_date_info = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y-%m", )
    next_month = CommonUtil.get_month_offset(rel_date_info, 1)
    # 导出表
    export_master_tb = f"{site_name}_bsr_asin_detail"
    export_tb = f"{export_master_tb}_{d_month_now}"
    export_tb_copy = f"{export_master_tb}_{d_month_now}_copy"

    with engine.connect() as connection:
        sql = f"""
                create table if not exists {export_tb} partition of {export_master_tb} for values from ('{rel_date_info}') to ('{next_month}');
                drop table if exists {export_tb_copy};
                create table if not exists {export_tb_copy}
                (
                    like {export_tb} including indexes including comments
                );
                """
        print("================================执行sql================================")
        print(sql)
        connection.execute(sql)
        connection.close()

    # 导出表名
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwt_bsr_asin_detail",
        export_tb=export_tb_copy,
        col=[
            "asin",
            "title",
            "img_url",
            "ao_val",
            "rating",
            "total_comments",
            "bsr_orders",
            "bsr_orders_change",
            "price",
            "weight",
            "launch_time",
            "date_info",
            "brand_name",
            "buy_box_seller_type",
            "account_name",
            "volume",
            "img_type",
            "last_update_time",
            "asin_type",
            "asin_air_freight_gross_margin",
            "asin_ocean_freight_gross_margin",
            "asin_unlaunch_time",
            "seller_id",
            "seller_country_name",
            "category_first_id",
            "first_category_rank",
            "first_category_rank_date",
            "package_quantity",
            "asin_launch_time_type",
            "seller_country_type",
            "asin_bought_month",
        ],
        partition_dict={
            "site_name": site_name,
            "date_info": rel_date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    # 交换分区表名
    DBUtil.exchange_pg_part_tb(
        engine,
        source_tb_name=export_tb_copy,
        part_master_tb=export_master_tb,
        part_target_tb=export_tb,
        part_val={
            "from": [rel_date_info],
            "to": [next_month]
        },
        cp_index_flag=False,
    )

    #  修改状态
    sql = f""" insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark)
               values ('{site_name}', '{date_info}', '导出PG数据库', 14, '{site_name}_bsr_asin_rank', 'day', 'BSR榜单', '是', 'BS榜单对应的TOP100ASIN')
                """
    CommonUtil.modify_export_workflow_status(
        update_workflow_sql=sql,
        site_name=site_name,
        date_type=date_type,
        date_info=date_info
    )
    print("success")