import os import sys from datetime import datetime sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil, DateTypes from utils.db_util import DBUtil if __name__ == '__main__': # 获取入参 site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) print(f"执行参数为{sys.argv}") print(test_flag) if test_flag == 'test': db_type = 'postgresql_test' print("导出到测试库中") else: CommonUtil.judge_is_work_hours(site_name=site_name, date_type=date_type, date_info=date_info, principal='chenyuanjie', priority=2, export_tools_type=1, belonging_to_process=f'新ABA流程_{date_type}') if date_type in (DateTypes.month.name, DateTypes.week.name,DateTypes.month_week.name): db_type = 'postgresql_cluster' print("导出到PG-Cluster库中") else: db_type = "postgresql" print("导出到PG库中") # 获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) suffix = str(date_info).replace("-", "_") # 导出表--基准表名 export_base_tb = f"{site_name}_aba_last" export_base_cols = [ "id", "search_term", "rank", "category_id", "category_current_id", "orders", "bsr_orders", "search_volume", "quantity_being_sold", "st_ao_avg", "st_ao_val_rate", "new_bsr_orders_proportion", "new_asin_proportion", "page1_title_proportion", "price_avg", "total_comments_avg", "rating_avg", "weight_avg", "volume_avg", "title_length_avg", "st_num", "aadd_proportion", "sp_proportion", "fbm_proportion", "cn_proportion", "amzon_proportion", "most_proportion", "max_num", "asin1", "asin2", "asin3", "click_share1", "click_share2", "click_share3", "total_click_share", "conversion_share1", "conversion_share2", "conversion_share3", "total_conversion_share", "new_asin_num", "total_asin_num", "new_asin_orders", "new_asin_bsr_orders", "is_first_text", "is_ascending_text", "is_search_text", "top3_seller_orders", "top3_seller_bsr_orders", "top3_brand_orders", "top3_brand_bsr_orders", "page3_brand_num", "page3_seller_num", "brand_monopoly", "seller_monopoly", "max_num_asin", "is_self_max_num_asin", "date_info", "created_time", "updated_time", "gross_profit_fee_air", "gross_profit_fee_sea", "multi_color_proportion", "multi_size_proportion", "st_word_num", "st_movie_label", "st_brand_label", "st_brand1", "st_category1", "st_brand2", "st_category2", "st_brand3", "st_category3", "st_bsr_cate_1_id_new", "st_bsr_cate_current_id_new", "st_crawl_date", "st_zr_page123_title_appear_rate", "st_sp_page123_title_appear_rate", "st_competition_level", "amazon_monthly_sales", "st_zr_flow_proportion", "st_ao_val_matrix", "st_flow_proportion_matrix", "lang" ] if date_type == DateTypes.last30day.name or date_type == DateTypes.month_week.name: export_tb_target = f"{export_base_tb}_30_day" export_tb_copy = f"{export_tb_target}_copy" export_table = export_tb_copy sql = f""" create table if not exists {export_tb_copy} ( like {export_tb_target} including indexes including comments ); truncate table {export_tb_copy}; """ # 执行SQL语句 DBUtil.engine_exec_sql(engine, sql) # 补全动态30天字段 tb_cols = ["color_proportion", "is_new_market_segment", "supply_demand", "market_cycle_type", "is_high_return_text"] export_cols = export_base_cols + tb_cols print("导出的字段:", export_cols) elif date_type in (DateTypes.day.name, DateTypes.week.name, DateTypes.month.name): # aba基础分区表名 export_table = f"{export_base_tb}_{date_type}_{suffix}" next_val = CommonUtil.get_next_val(date_type, date_info) year_str = CommonUtil.safeIndex(date_info.split("-"), 0, None) if date_type == DateTypes.day.name: # 特有导出字段 tb_cols = [] # 处理导出表 year_month = CommonUtil.reformat_date(date_info, "%Y-%m-%d", "%Y-%m") year_month_before = CommonUtil.get_month_offset(year_month, -1).replace("-", "_") suffix_y_m = year_month.replace("-", "_") export_master_tb = f"{export_base_tb}_{date_type}_{suffix_y_m}" export_tb_before = f"{export_base_tb}_{date_type}_{year_month_before}" elif date_type == DateTypes.week.name: # week特有导出字段 tb_cols = ["is_new_market_segment", "color_proportion"] # 处理导出表 export_master_tb = f"{export_base_tb}_{date_type}_{year_str}" year_month_before = str(int(year_str) - 1) export_tb_before = f"{export_base_tb}_{date_type}_{year_month_before}" else: # month特有导出字段 tb_cols = [ "is_new_market_segment", "color_proportion", "supply_demand", "market_cycle_type", "is_high_return_text", "st_zr_counts", "st_sp_counts", "st_self_asin_counts", "st_self_asin_proportion" ] # 处理导出表 export_master_tb = f"{export_base_tb}_{date_type}_{year_str}" year_month_before = str(int(year_str) - 1) export_tb_before = f"{export_base_tb}_{date_type}_{year_month_before}" # 导出字段补全 export_cols = export_base_cols + tb_cols # sql建表和创建分区 sql = f""" create table if not exists {export_master_tb} ( like {export_tb_before} including indexes including comments ) partition by range (date_info); create table if not exists {export_table} partition of {export_master_tb} for values from ('{date_info}') to ('{next_val}'); truncate table {export_table}; """ DBUtil.engine_exec_sql(engine, sql) else: print("输入的date_type有误,请检查!!") quit() if test_flag == "month_append": partition_dict = { "site_name": site_name, "date_type": "month", "date_info": date_info } else: partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 导出执行sqoop的sh编写 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwt_aba_st_analytics", export_tb=export_table, col=export_cols, partition_dict=partition_dict ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # 处理流程更新 if date_type in (DateTypes.month_week.name, DateTypes.last30day.name): DBUtil.exchange_tb(engine, source_tb_name=export_tb_copy, target_tb_name=export_tb_target, cp_index_flag=False) update_workflow_sql = f""" replace INTO selection.workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark,export_db_type) VALUES('{site_name}', '{datetime.now().date()}', '导出PG数据库完成', 14, 'us_aba_last_30_day', '30_day', 'ABA搜索词', '是', 'ABA搜索词最近30天表','{db_type}'); """ else: update_workflow_sql = f""" UPDATE selection.workflow_everyday SET status='导出PG数据库完成', status_val=14,is_end ='是',export_db_type = '{db_type}' WHERE site_name= '{site_name}' and date_type='{date_type}' and report_date= '{date_info}' and page ='ABA搜索词' """ # 往导出流程表插入导出完成数据,方便监听导出脚本是否全部完成 CommonUtil.modify_export_workflow_status(update_workflow_sql, site_name, date_type, date_info)