dwt_aba_last365.py 8.93 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.db_util import DBUtil
from utils.ssh_util import SSHUtil
from utils.common_util import CommonUtil, DateTypes

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    #  获取最后一个参数
    test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None)
    print(f"执行参数为{sys.argv}")

    if test_flag == 'test':
        db_type = 'postgresql_test'
        print("导出到测试库中")
    else:
        db_type = "postgresql_cluster"
        print("导出到PG集群中")

    year = date_info.split('-')[0]
    if date_type == DateTypes.year.name:
        export_tb_before = f"{site_name}_aba_last_year_{year}"
    elif date_type == DateTypes.month.name:
        export_tb_before = f"{site_name}_aba_last_365_day"
    else:
        raise Exception("DateType不合法")

    export_tb_rel = f"{export_tb_before}_copy"
    engine = DBUtil.get_db_engine(db_type, site_name)

    #  创建备份表
    with engine.connect() as connection:
        sql = f"""
            drop table if exists {export_tb_rel};
            create table if not exists {export_tb_rel} 
            (
                like {export_tb_before} including comments
            );
            ALTER TABLE {export_tb_rel} ALTER COLUMN st_movie_brand_label TYPE VARCHAR(20);
            ALTER TABLE {export_tb_rel} ALTER COLUMN total_appear_month TYPE VARCHAR(50);
        """
        print("================================执行sql================================")
        print(sql)
        connection.execute(sql)

    # 导出脚本
    sh = CommonUtil.build_export_sh(
        site_name=site_name,
        db_type=db_type,
        hive_tb="dwt_aba_last365",
        export_tb=export_tb_rel,
        col=[
            "id",
            "search_term",
            "category_id",
            "rank",
            "top_rank",
            "st_num1",
            "st_num2",
            "st_num3",
            "st_num4",
            "st_num5",
            "st_num6",
            "st_num7",
            "st_num8",
            "st_num9",
            "st_num10",
            "st_num11",
            "st_num12",
            "total_st_num",
            "bsr_orders1",
            "bsr_orders2",
            "bsr_orders3",
            "bsr_orders4",
            "bsr_orders5",
            "bsr_orders6",
            "bsr_orders7",
            "bsr_orders8",
            "bsr_orders9",
            "bsr_orders10",
            "bsr_orders11",
            "bsr_orders12",
            "market_cycle_type1",
            "market_cycle_type2",
            "market_cycle_type3",
            "market_cycle_type4",
            "market_cycle_type5",
            "market_cycle_type6",
            "market_cycle_type7",
            "market_cycle_type8",
            "market_cycle_type9",
            "market_cycle_type10",
            "market_cycle_type11",
            "market_cycle_type12",
            "search_volume1",
            "search_volume2",
            "search_volume3",
            "search_volume4",
            "search_volume5",
            "search_volume6",
            "search_volume7",
            "search_volume8",
            "search_volume9",
            "search_volume10",
            "search_volume11",
            "search_volume12",
            "st_ao_avg",
            "st_ao_val_rate",
            "supply_demand",
            "price_avg",
            "total_comments_avg",
            "rating_avg",
            "weight_avg",
            "volume_avg",
            "aadd_proportion",
            "sp_proportion",
            "fbm_proportion",
            "cn_proportion",
            "amzon_proportion",
            "top3_seller_orders",
            "top3_seller_bsr_orders",
            "top3_brand_orders",
            "top3_brand_bsr_orders",
            "page3_brand_num",
            "page3_seller_num",
            "max_num",
            "most_avg_proportion",
            "new_asin_num_avg_monopoly",
            "new_asin_bsr_orders_avg_monopoly",
            "total_asin_num",
            "orders",
            "bsr_orders",
            "created_time",
            "updated_time",
            "max_num_asin",
            "is_self_max_num_asin",
            "date_info",
            "gross_profit_fee_sea",
            "gross_profit_fee_air",
            "category_current_id",
            "color_proportion",
            "brand_monopoly",
            "seller_monopoly",
            "orders1",
            "orders2",
            "orders3",
            "orders4",
            "orders5",
            "orders6",
            "orders7",
            "orders8",
            "orders9",
            "orders10",
            "orders11",
            "orders12",
            "max_orders_month",
            "max_bsr_orders_month",
            "multi_color_avg_proportion",
            "multi_size_avg_proportion",
            "q1_bsr_orders",
            "q2_bsr_orders",
            "q3_bsr_orders",
            "q4_bsr_orders",
            "q1_orders",
            "q2_orders",
            "q3_orders",
            "q4_orders",
            "is_new_market_segment",
            "is_first_text",
            "is_ascending_text",
            "is_search_text",
            "st_word_num",
            "lang",
            "is_history_first_text",
            "history_first_appear_month",
            "first_appear_month",
            "sv_rising_rate",
            "sv_decline_rate",
            "sv_change_rate_flag",
            "st_movie_brand_label",
            "total_appear_month",
            "market_cycle_type",
            "rank_lastest",
            "rank_change_rate_lastest",
            "rank_rate_of_change_lastest"
        ],
        partition_dict={
            "site_name": site_name,
            "date_type": date_type,
            "date_info": date_info
        }
    )

    client = SSHUtil.get_ssh_client()
    SSHUtil.exec_command_async(client, sh, ignore_err=False)
    client.close()

    #  交换表名
    DBUtil.exchange_tb(engine,
                       source_tb_name=export_tb_rel,
                       target_tb_name=export_tb_before,
                       cp_index_flag=True)

    with engine.connect() as connection:
        sql = f"""
            ALTER TABLE {export_tb_before}
            ALTER COLUMN st_movie_brand_label TYPE INTEGER[]
            USING string_to_array(st_movie_brand_label, ',')::int[];

            ALTER TABLE {export_tb_before}
            ALTER COLUMN total_appear_month TYPE INTEGER[]
            USING string_to_array(total_appear_month, ',')::int[];

            alter table {export_tb_before} drop if exists keyword_tsv;
            alter table {export_tb_before} add column keyword_tsv tsvector generated always as (to_tsvector('english_amazonword', search_term)) STORED;
            drop index if exists {export_tb_before}_keyword_tsv_idx;
            create index {export_tb_before}_keyword_tsv_idx ON {export_tb_before} USING gin (keyword_tsv);
        """
        print("================================执行sql================================")
        print(sql)
        connection.execute(sql)

    if test_flag != 'test':
        if date_type == 'year':
            export_date_info = year
            export_tb_name = f'{site_name}_aba_last_year'
            export_date_type = date_type
            remark = 'ABA搜索词年表'
        elif date_type == 'month':
            export_date_info = date_info
            export_tb_name = f'{site_name}_aba_last_365_day'
            export_date_type = '365_day'
            remark = 'ABA搜索词年表(最近12月,每月更新)'
            with engine.connect() as connection:
                sql = f"""
                    insert into us_st_translate (search_term, st_lang)
                    select tmp.search_term, tmp.lang
                    from public.us_aba_last_365_day tmp
                    left join us_st_translate st on tmp.search_term = st.search_term
                    where st.search_term is null;
                """
                print("================================执行sql================================")
                print(sql)
                connection.execute(sql)
        else:
            raise Exception("DateType不合法")

        # 更新workflow_everyday
        engine = DBUtil.get_db_engine("mysql", "us")
        with engine.connect() as connection:
            sql = f"""
                replace into workflow_everyday (
                    site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark, export_db_type
                )
                values (
                    '{site_name}', '{export_date_info}', '导出pg完成', 14, '{export_tb_name}', '{export_date_type}', 'AbaWordYear', '是', '{remark}', 'postgresql_cluster'
                );
            """
            print("================================更新workflow_everyday================================")
            print(sql)
            connection.execute(sql)

    print("success")