import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil, DateTypes from utils.db_util import DBUtil if __name__ == '__main__': # 获取入参 site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) print(f"执行参数为{sys.argv}") print(test_flag) if test_flag == 'test': db_type = 'postgresql_test' print("导出到测试库中") else: db_type = "postgresql" print("导出到PG库中") # 获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) suffix = str(date_info).replace("-", "_") # 导出表--基准表名 export_base_tb = f"{site_name}_aba_last" export_base_cols = [ "id", "search_term", "rank", "category_id", "category_current_id", "orders", "bsr_orders", "search_volume", "quantity_being_sold", "st_ao_avg", "st_ao_val_rate", "new_bsr_orders_proportion", "new_asin_proportion", "page1_title_proportion", "price_avg", "total_comments_avg", "rating_avg", "weight_avg", "volume_avg", "title_length_avg", "st_num", "aadd_proportion", "sp_proportion", "fbm_proportion", "cn_proportion", "amzon_proportion", "most_proportion", "max_num", "asin1", "asin2", "asin3", "click_share1", "click_share2", "click_share3", "total_click_share", "conversion_share1", "conversion_share2", "conversion_share3", "total_conversion_share", "new_asin_num", "total_asin_num", "new_asin_orders", "new_asin_bsr_orders", "is_first_text", "is_ascending_text", "is_search_text", "top3_seller_orders", "top3_seller_bsr_orders", "top3_brand_orders", "top3_brand_bsr_orders", "page3_brand_num", "page3_seller_num", "brand_monopoly", "seller_monopoly", "max_num_asin", "is_self_max_num_asin", "date_info", "created_time", "updated_time", "gross_profit_fee_air", "gross_profit_fee_sea", "multi_color_proportion", "multi_size_proportion", "st_4_20_ao_avg", "st_word_num", "st_movie_label", "st_brand_label", "st_brand1", "st_category1", "st_brand2", "st_category2", "st_brand3", "st_category3" ] if date_type == DateTypes.last30day.name: export_tb_target = f"{export_base_tb}_30_day" export_tb_copy = f"{export_tb_target}_copy" export_table = export_tb_copy sql = f""" create table if not exists {export_tb_copy} ( like {export_tb_target} including indexes including comments ); truncate table {export_tb_copy}; """ # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql) # 补全动态30天字段 tb_cols = ["color_proportion", "st_4_20_ao_rate"] export_cols = export_base_cols + tb_cols print("导出的字段:", export_cols) elif date_type in (DateTypes.day.name, DateTypes.week.name, DateTypes.month.name): # aba基础分区表名 export_table = f"{export_base_tb}_{date_type}_{suffix}" next_val = CommonUtil.get_next_val(date_type, date_info) year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None) if date_type == DateTypes.day.name: # 特有导出字段 tb_cols = [] # 处理导出表 year_month = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y-%m") year_month_before = CommonUtil.get_month_offset(year_month, -1).replace("-", "_") suffix_y_m = year_month.replace("-", "_") export_master_tb = f"{export_base_tb}_{date_type}_{suffix_y_m}" export_tb_before = f"{export_base_tb}_{date_type}_{year_month_before}" elif date_type == DateTypes.week.name: # week特有导出字段 tb_cols = ["is_new_market_segment", "color_proportion"] # 处理导出表 export_master_tb = f"{export_base_tb}_{date_type}_{year_str}" year_month_before = str(int(year_str) - 1) export_tb_before = f"{export_base_tb}_{date_type}_{year_month_before}" else: # month特有导出字段 tb_cols = ["is_new_market_segment", "color_proportion", "st_4_20_ao_rate", "supply_demand", "market_cycle_type"] # 处理导出表 export_master_tb = f"{export_base_tb}_{date_type}_{year_str}" year_month_before = str(int(year_str) - 1) export_tb_before = f"{export_base_tb}_{date_type}_{year_month_before}" # 导出字段补全 export_cols = export_base_cols + tb_cols # sql建表和创建分区 sql = f""" create table if not exists {export_master_tb} ( like {export_tb_before} including indexes including comments ) partition by range (date_info); create table if not exists {export_table} partition of {export_master_tb} for values from ('{date_info}') to ('{next_val}'); truncate table {export_table}; """ DBUtil.engine_exec_sql(engine, sql) else: print("输入的date_type有误,请检查!!") quit() # 导出执行sqoop的sh编写 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwt_aba_st_analytics", export_tb=export_table, col=export_cols, partition_dict={ "site_name": site_name, "date_type": date_type, "date_info": date_info } ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() if date_type == DateTypes.last30day.name: # 需要通过备份表替换形式替换数据 DBUtil.exchange_tb(engine, export_tb_copy, export_tb_target, cp_index_flag=False)