dwt_merchantwords_merge.py 3.91 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil
from utils.db_util import DbTypes, DBUtil
import random


def export():
    site_name = CommonUtil.get_sys_arg(1, 'us')
    batch = CommonUtil.get_sys_arg(2, '2024-07-01')
    db_type = DbTypes.postgresql_cluster.name

    export_tb = f"merchantwords_st_detail_v2_2024"
    if site_name == 'de':
        export_tb = f"{site_name}_merchantwords_st_detail_v2_2024"

    hive_tb = "dwt_merchantwords_merge"

    export_tb_copy = f"{export_tb}_copy"
    # 建表
    sql = f"""
        drop table if exists {export_tb_copy};
        create table if not exists {export_tb_copy}   ( like {export_tb}  including comments );
    """
    DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True)
    # print(sql)
    # 导出表名
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb=hive_tb,
        export_tb=export_tb_copy,
        col=[
            "keyword",
            "volume",
            "avg_3m",
            "avg_12m",
            "depth",
            "results_count",
            "sponsored_ads_count",
            "page_1_reviews",
            "appearance",
            "last_seen",
            "update_time",
            "site_name",
            "lang",
            "st_ao_val",
            "st_zr_flow_proportion",
            "asin_total_num",
            "asin_num",
            "self_asin_num",
            "self_asin_proportion",
            "st_sp_counts",
            "st_zr_counts",
            "st_monthly_sales",
            "listing_sales_avg",
            "reviews_avg",
            "rating_avg",
            "price_avg",
            "last_batch",
            "package_quantity"
        ],
        partition_dict={
            "site_name": site_name,
            "batch": batch,
        }
    )
    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()
    # print(sh)
    # generated always 字段重建
    sql = f"""
        alter table {export_tb_copy} drop  if exists keyword_tsv ;
        alter table {export_tb_copy} add column keyword_tsv  tsvector generated always as (to_tsvector('english_amazonword', keyword)) STORED;
    """
    DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True)
    # print(sql)
    suffix_int = random.randint(1, 200)
    # 创建索引
    sql = f"""
        create index {export_tb}_keyword_tsv_idx_{suffix_int} on {export_tb_copy} using gin (keyword_tsv);
        create index {export_tb}_volume_idx_{suffix_int} on {export_tb_copy} using btree (volume desc);
        create index {export_tb}_keyword_idx_{suffix_int} on {export_tb_copy} using btree (keyword);
        create index {export_tb}_last_batch_idx_{suffix_int} on {export_tb_copy} using btree (last_batch);
        create index {export_tb}_st_ao_val_idx_{suffix_int} on {export_tb_copy} using btree (st_ao_val);
        create index {export_tb}_asin_total_num_idx_{suffix_int} on {export_tb_copy} using btree (asin_total_num);
    """
    DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True)
    # print(sql)
    # # 更新 suggested_bid 字段
    # sql = f"""
    #     with tb_result as (
    #         select msd.keyword, spc.suggested_bid
    #         from {export_tb_copy} msd
    #                  left join st_pcp_current spc on spc.keyword = msd.keyword and spc.site_id = 4
    #     )
    #     update {export_tb_copy} tmp
    #     set suggested_bid = tb_result.suggested_bid
    #     from tb_result
    #     where tmp.keyword = tb_result.keyword
    # """
    # # DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True)
    # print(sql)
    # 交换表名
    DBUtil.exchange_tb(
        DBUtil.get_db_engine(db_type, site_name),
        source_tb_name=export_tb_copy,
        target_tb_name=export_tb,
        cp_index_flag=False,
    )
    pass


if __name__ == '__main__':
    export()