import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.ssh_util import SSHUtil from utils.db_util import DBUtil, DbTypes from utils.common_util import CommonUtil def handle_first_import(): conn_info = DBUtil.get_connection_info(DbTypes.postgresql_test.name, "us") query = f""" select * from de_merchantwords_st_detail_2024 where 1=1 and \$CONDITIONS """ cmd1 = CommonUtil.build_import_sh_tmp_inner( conn_info, query, "de_tmp_merchantwords_st_detail_2024", 50, "id" ) print(cmd1) pass def handle_append_import(): conn_info = DBUtil.get_connection_info(DbTypes.postgresql.name, "us") hive_tb = "tmp_merchantwords_st_detail_append" query = f""" select * from merchantwords_st_detail_append where 1=1 and \$CONDITIONS """ # 先删掉表 client = SSHUtil.get_ssh_client() cmd = rf"""{CommonUtil.__hive_home__} -e "drop table if exists big_data_selection.{hive_tb};" """ print("====================================删掉临时导入表中=======================================================") print(cmd) SSHUtil.exec_command_async(client, cmd, ignore_err=False) cmd = CommonUtil.build_import_sh_tmp_inner(conn_info, query, hive_tb) print("====================================导入到临时导表中=======================================================") print(cmd) SSHUtil.exec_command_async(client, cmd, ignore_err=False) client.close() def handle_append_import_ods(site_name, date_info): assert site_name is not None, "site_name 不能为空!" assert date_info is not None, "date_info 不能为空!" hive_tb = "ods_merchantwords_st_detail_append" partition_dict = { "site_name": site_name, "date_info": date_info, } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) print(f"hdfs_path is {hdfs_path}") query = f"""select keyword, volume, avg_3m, avg_12m, depth, results_count, sponsored_ads_count, page_1_reviews, appearance, last_seen, update_time, source_type from merchantwords_st_detail_append where 1 = 1 and site_name = '{site_name}' and import_day = '{date_info}' and \$CONDITIONS """ pass print("================================sql====================================") print(query) db_type = DbTypes.postgresql.name empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=query, hive_tb_name=hive_tb, msg_usr=['wujicang'] ) assert check_flag, f"导入到hive表{hive_tb},源表表结构检查失败!请检查query是否异常!!" assert not empty_flag, f"导入数据为空,请检查!!" sh = CommonUtil.build_import_sh(site_name=site_name, db_type=db_type, query=query, hdfs_path=hdfs_path) # 导入前先删除 HdfsUtils.delete_hdfs_file(hdfs_path) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) client.close() pass if __name__ == '__main__': update_flag = CommonUtil.get_sys_arg(1, None) if update_flag == 'update': site_name = CommonUtil.get_sys_arg(2, 'us') date_info = CommonUtil.get_sys_arg(3, CommonUtil.format_now('%Y-%m-%d')) handle_append_import_ods(site_name, date_info) else: handle_first_import() pass