import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil, DateTypes if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 test_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) print(f"执行参数为{sys.argv}") if test_flag == 'test': db_type = 'postgresql_test' print("导出到测试库中") else: db_type = "postgresql" print("导出到PG库中") if date_type == DateTypes.year.name: year = date_info export_tb_before = f"{site_name}_aba_last_year_{year}" rel_date_info = f"{year}-12" elif date_type == DateTypes.last365day.name: export_tb_before = f"{site_name}_aba_last_365_day" rel_date_info = date_info pass else: raise Exception("DateType不合法") export_tb_rel = f"{export_tb_before}_copy" engine = DBUtil.get_db_engine(db_type, site_name) # 创建备份表 with engine.connect() as connection: sql = f""" drop table if exists {export_tb_rel}; create table if not exists {export_tb_rel} ( like {export_tb_before} including comments ); """ print("================================执行sql================================") print(sql) connection.execute(sql) # 兼容 month_old rel_date_type = DateTypes.last365day.name if rel_date_info > '2023-10' else f"{DateTypes.last365day.name}_{DateTypes.month_old.name}" # 导出表名 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwt_aba_last365", export_tb=export_tb_rel, col=[ "id", "search_term", "category_id", "rank", "top_rank", "st_num1", "st_num2", "st_num3", "st_num4", "st_num5", "st_num6", "st_num7", "st_num8", "st_num9", "st_num10", "st_num11", "st_num12", "total_st_num", "bsr_orders1", "bsr_orders2", "bsr_orders3", "bsr_orders4", "bsr_orders5", "bsr_orders6", "bsr_orders7", "bsr_orders8", "bsr_orders9", "bsr_orders10", "bsr_orders11", "bsr_orders12", "market_cycle_type1", "market_cycle_type2", "market_cycle_type3", "market_cycle_type4", "market_cycle_type5", "market_cycle_type6", "market_cycle_type7", "market_cycle_type8", "market_cycle_type9", "market_cycle_type10", "market_cycle_type11", "market_cycle_type12", "search_volume1", "search_volume2", "search_volume3", "search_volume4", "search_volume5", "search_volume6", "search_volume7", "search_volume8", "search_volume9", "search_volume10", "search_volume11", "search_volume12", "st_ao_avg", "st_ao_val_rate", "supply_demand", "price_avg", "total_comments_avg", "rating_avg", "weight_avg", "volume_avg", "aadd_proportion", "sp_proportion", "fbm_proportion", "cn_proportion", "amzon_proportion", "top3_seller_orders", "top3_seller_bsr_orders", "top3_brand_orders", "top3_brand_bsr_orders", "page3_brand_num", "page3_seller_num", "max_num", "most_avg_proportion", "new_asin_num_avg_monopoly", "new_asin_bsr_orders_avg_monopoly", "total_asin_num", "orders", "bsr_orders", "created_time", "updated_time", "max_num_asin", "is_self_max_num_asin", "date_info", "gross_profit_fee_sea", "gross_profit_fee_air", "category_current_id", "color_proportion", "brand_monopoly", "seller_monopoly", "orders1", "orders2", "orders3", "orders4", "orders5", "orders6", "orders7", "orders8", "orders9", "orders10", "orders11", "orders12", "max_orders_month", "max_bsr_orders_month", "multi_color_avg_proportion", "multi_size_avg_proportion", "q1_bsr_orders", "q2_bsr_orders", "q3_bsr_orders", "q4_bsr_orders", "q1_orders", "q2_orders", "q3_orders", "q4_orders", "is_new_market_segment", "is_first_text", "is_ascending_text", "is_search_text", "st_word_num", ], partition_dict={ "site_name": site_name, "date_type": rel_date_type, "date_info": rel_date_info } ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # 交换表名 DBUtil.exchange_tb(engine, source_tb_name=export_tb_rel, target_tb_name=export_tb_before, cp_index_flag=True) if test_flag != 'test': pass # 正式环境更新workflow_everyday # engine = DBUtil.get_db_engine("mysql", site_name) # with engine.connect() as connection: # sql = f""" # insert into workflow_everyday (site_name, report_date, status, status_val, table_name, date_type, page, is_end, remark) # values ('{site_name}', '{date_info}', '导出PG数据库', 14, 'us_bsr_asin_rank', 'day', 'BSR榜单', '是', 'BS榜单对应的TOP100ASIN'); # """ # print("================================更新workflow_everyday================================") # print(sql) # connection.execute(sql) print("success")