import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) year_month = CommonUtil.get_sys_arg(2, None) assert site_name is not None, "site_name 不能为空!" assert year_month is not None, "year_month 不能为空!" year,month = year_month.split("-") # 导出到pg数据库 db_type = "postgresql_cluster" export_tb = f"{site_name}_st_month_{year}_{month}" sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="tmp_st_month_2209_2303", export_tb=export_tb, col=[ "search_term", "st_ao_val", "st_type", "st_rank", "st_rank_avg", "st_search_num", "st_search_rate", "st_search_sum", "st_adv_counts", "st_quantity_being_sold", "asin", "asin_st_zr_orders", "asin_st_zr_orders_sum", "asin_st_zr_flow", "asin_st_sp_orders", "asin_st_sp_orders_sum", "asin_st_sp_flow", "st_asin_zr_page", "st_asin_zr_page_row", "st_asin_zr_page_rank", "st_asin_zr_updated_at", "st_asin_sp_page", "st_asin_sp_page_rank", "st_asin_sp_page_row", "st_asin_sp_updated_at", "st_asin_sb1_page", "st_asin_sb1_updated_at", "st_asin_sb2_page", "st_asin_sb2_updated_at", "st_asin_sb3_page", "st_asin_sb3_updated_at", "st_asin_ac_page", "st_asin_ac_updated_at", "st_asin_bs_page", "st_asin_bs_updated_at", "st_asin_er_page", "st_asin_er_updated_at", "st_asin_tr_page", "st_asin_tr_updated_at", "created_at", "updated_at" ], partition_dict={ "site_name": site_name, "year_month": year_month } ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() pass