import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.db_util import DBUtil, DbTypes """ 广告搜索词当前竞价及近期历史dwt表导出 """ def handle_export(): site_name = "us" db_type = DbTypes.postgresql_cluster.name export_tb = f"st_pcp_current" export_tb_copy = f"{export_tb}_copy" hive_tb = "dwt_st_pcp_current" assert CommonUtil.judge_not_working_hour(), "工作时间,请谨慎导出!!!!" sql = f""" drop table if exists {export_tb_copy}; create table if not exists {export_tb_copy} ( like {export_tb} including indexes including comments ); """ print("================================执行sql================================") print(sql) DBUtil.exec_sql(db_type, site_name, sql, True) # 导出表名 sh = CommonUtil.build_export_sh( site_name=site_name, db_type=db_type, hive_tb=hive_tb, export_tb=export_tb_copy, col=[ "site_id", "group_id", "keyword_id", "keyword", "match_type", "created_at", "min_bid", "max_bid", "suggested_bid", "history_json", ], partition_dict={} ) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) client.close() # 交换表名 DBUtil.exchange_tb(DBUtil.get_db_engine(db_type, site_name), export_tb, export_tb_copy, False) print("success") # 更新 merchantwords_st_detail 表 sql = f""" update merchantwords_st_detail msd set suggested_bid = spc.suggested_bid from st_pcp_current spc where spc.site_id = 4 and spc.keyword = msd.keyword; """ DBUtil.exec_sql(db_type, site_name, sql=sql, dispose_flag=True) # 更新 merchantwords_st_detail_v2_2024 表 sql = f""" update merchantwords_st_detail_v2_2024 msd set suggested_bid = spc.suggested_bid from st_pcp_current spc where spc.site_id = 4 and spc.keyword = msd.keyword; """ DBUtil.exec_sql(db_type, site_name, sql=sql, dispose_flag=True) pass if __name__ == '__main__': handle_export()