import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.db_util import DbTypes, DBUtil import random def export(): site_name = CommonUtil.get_sys_arg(1, 'us') batch = CommonUtil.get_sys_arg(2, '2024-07-01') db_type = DbTypes.postgresql_cluster.name export_tb = f"merchantwords_st_detail_v2_2024" if site_name == 'de': export_tb = f"{site_name}_merchantwords_st_detail_v2_2024" hive_tb = "dwt_merchantwords_merge" export_tb_copy = f"{export_tb}_copy" # 建表 sql = f""" drop table if exists {export_tb_copy}; create table if not exists {export_tb_copy} ( like {export_tb} including comments ); """ DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True) # print(sql) # 导出表名 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb=hive_tb, export_tb=export_tb_copy, col=[ "keyword", "volume", "avg_3m", "avg_12m", "depth", "results_count", "sponsored_ads_count", "page_1_reviews", "appearance", "last_seen", "update_time", "site_name", "lang", "st_ao_val", "st_zr_flow_proportion", "asin_total_num", "asin_num", "self_asin_num", "self_asin_proportion", "st_sp_counts", "st_zr_counts", "st_monthly_sales", "listing_sales_avg", "reviews_avg", "rating_avg", "price_avg", "last_batch", "package_quantity" ], partition_dict={ "site_name": site_name, "batch": batch, } ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # print(sh) # generated always 字段重建 sql = f""" alter table {export_tb_copy} drop if exists keyword_tsv ; alter table {export_tb_copy} add column keyword_tsv tsvector generated always as (to_tsvector('english_amazonword', keyword)) STORED; """ DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True) # print(sql) suffix_int = random.randint(1, 200) # 创建索引 sql = f""" create index {export_tb}_keyword_tsv_idx_{suffix_int} on {export_tb_copy} using gin (keyword_tsv); create index {export_tb}_volume_idx_{suffix_int} on {export_tb_copy} using btree (volume desc); create index {export_tb}_keyword_idx_{suffix_int} on {export_tb_copy} using btree (keyword); create index {export_tb}_last_batch_idx_{suffix_int} on {export_tb_copy} using btree (last_batch); create index {export_tb}_st_ao_val_idx_{suffix_int} on {export_tb_copy} using btree (st_ao_val); create index {export_tb}_asin_total_num_idx_{suffix_int} on {export_tb_copy} using btree (asin_total_num); """ DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True) # print(sql) # # 更新 suggested_bid 字段 # sql = f""" # with tb_result as ( # select msd.keyword, spc.suggested_bid # from {export_tb_copy} msd # left join st_pcp_current spc on spc.keyword = msd.keyword and spc.site_id = 4 # ) # update {export_tb_copy} tmp # set suggested_bid = tb_result.suggested_bid # from tb_result # where tmp.keyword = tb_result.keyword # """ # # DBUtil.exec_sql(db_type, site_name, sql, dispose_flag=True) # print(sql) # 交换表名 DBUtil.exchange_tb( DBUtil.get_db_engine(db_type, site_name), source_tb_name=export_tb_copy, target_tb_name=export_tb, cp_index_flag=False, ) pass if __name__ == '__main__': export()