import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil, DateTypes if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) # 获取最后一个参数 sql_flag = CommonUtil.get_sys_arg(len(sys.argv) - 1, None) print(f"执行参数为{sys.argv}") if sql_flag == 'mysql': db_type = 'mysql' print("导出到mysql中") elif sql_flag == 'postgresql': db_type = "postgresql" print("导出到PG库中") # 获取数据库连接 engine = DBUtil.get_db_engine(db_type, site_name) export_seller_agg_table = f"{site_name}_seller_asin_account_agg" # 保证幂等性,先删除原始表同周期的数据 sql = f""" delete from {export_seller_agg_table} where date_info = {date_info} """ DBUtil.engine_exec_sql(engine, sql) # 导出agg表 sh_agg = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwd_seller_asin_account_agg", export_tb=export_seller_agg_table, col=[ "account_id", "account_name", "asin_new_counts", "asin_counts", "asin_counts_exists", "counts_new_rate", "top_20_avg_price", "top_20_avg_rating", "top_20_avg_total_comments", "fb_variat_num", "fb_asin_total", "fb_variat_prop", "ym", "week", "date_info" ], partition_dict={ "site_name": site_name, "date_type": date_type, "date_info": date_info } ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh_agg, ignore_err=False) # 导出dwt_fd_category_agg export_fd_cat_agg = f"{site_name}_seller_category_agg" # 清除同周期历史数据 sql = f""" delete from {export_fd_cat_agg} where date_info = {date_info} """ DBUtil.engine_exec_sql(engine, sql) # 导出表 sh_fd_cat_agg = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwt_fd_category_agg", export_tb=export_fd_cat_agg, col=[ "fd_account_id", "bsr_cate_1_id", "fd_cate_asin_num", "fd_cate_new_asin_num", "fd_asin_num", "bsr_asin_num", "fd_cate_asin_per", "fd_cate_new_asin_per", "fd_market_per", "ym", "week", "date_info" ], partition_dict={ "site_name": site_name, "date_type": date_type, "date_info": date_info } ) SSHUtil.exec_command_async(client, sh_fd_cat_agg, ignore_err=False) # 导出detail表 export_seller_detail_table = f"{site_name}_seller_asin_account_detail_copy1" export_table_target = f"{site_name}_seller_asin_account_detail" # 清除 copy表数据 sqls = [ f"create table if not exists {export_seller_detail_table} ( like {export_table_target} );", f"truncate table {export_seller_detail_table}; " ] for sql in sqls: DBUtil.engine_exec_sql(engine, sql) # 导出表 sh_detail = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb="dwd_seller_asin_account_detail", export_tb=export_seller_detail_table, col=[ "account_id", "account_name", "asin", "launch_time", "days_diff", "is_asin_new" ], partition_dict={ "site_name": site_name, "date_type": date_type, "date_info": date_info } ) SSHUtil.exec_command_async(client, sh_detail, ignore_err=False) client.close() # 替换表名称 # sql = f""" # alter table {export_table_target} rename to {export_table_target}_bak ; # alter table {export_seller_detail_table} rename to {export_table_target} ; # alter table {export_table_target}_bak rename to {export_seller_detail_table} ; # """ sqls = [f"alter table {export_table_target} rename to {export_table_target}_bak ;", f"alter table {export_seller_detail_table} rename to {export_table_target} ;", f"alter table {export_table_target}_bak rename to {export_seller_detail_table} ;" ] for sql in sqls: DBUtil.engine_exec_sql(engine, sql) # 关闭链接 engine.dispose()