dwd_bsr_asin_rank.py 3.21 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DBUtil

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    #  获取最后一个参数
    test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    print(f"执行参数为{sys.argv}")
    # 工作时间不导出
    date_type = "day"
    CommonUtil.judge_is_work_hours(site_name=site_name,
                                   date_type=date_type,
                                   date_info=date_info,
                                   principal='wujicang',
                                   priority=1,
                                   export_tools_type=1)

    if test_flag == 'test':
        db_type = 'postgresql_test'
        print("导出到测试库中")
    else:
        db_type = "postgresql"
        print("导出到PG库中")

    suffix = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y_%m_%d", )
    export_master_tb = f"{site_name}_bsr_asin_rank"
    export_tb = f"{export_master_tb}_{suffix}"
    next_day = CommonUtil.get_day_offset(date_info, 1)

    engine = DBUtil.get_db_engine(db_type, site_name)
    with engine.connect() as connection:
        sql = f"""
                drop table if exists {export_tb};
                create table if not exists {export_tb} 
                (
                like {export_master_tb}  including comments
                );
                """
        print("================================执行sql================================")
        print(sql)
        connection.execute(sql)

    # 导出表名
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwd_bsr_asin_rank",
        export_tb=export_tb,
        col=[
            "asin",
            "category_id",
            "bsr_rank",
            "is_1_day_flag",
            "is_7_day_flag",
            "is_30_day_flag",
            "bsr_count",
            "is_asin_new",
            "is_asin_bsr_new",
            "last_bsr_day",
            "date_info",
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": "last30day",
            "date_info": date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    # 创建索引并交换分区
    DBUtil.add_pg_part(
        engine,
        source_tb_name=export_tb,
        part_master_tb=export_master_tb,
        part_val={
            "from": [date_info],
            "to": [next_day]
        },
        cp_index_flag=True,
    )
    client.close()

    #  修改状态
    sql = f""" insert ignore into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark)
               values ('{site_name}', '{date_info}', '导出PG数据库', 14, '{site_name}_bsr_asin_rank', 'day', 'BSR榜单', '是', 'BS榜单对应的TOP100ASIN')
                """
    CommonUtil.modify_export_workflow_status(
        update_workflow_sql=sql,
        site_name=site_name,
        date_type=date_type,
        date_info=date_info
    )
    print("success")