import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.db_util import DBUtil if __name__ == '__main__': site_name = "us" hive_tb = f"tmp_asin_image" partition_dict = { "site_name": "us14", } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) print(f"hdfs_path is {hdfs_path}") query = f""" select asin, img_url, img_order_by, created_at, updated_at, data_type from {site_name}_asin_image_pyb_copy where 1 = 1 and \$CONDITIONS """ print(query) db_type = "postgresql_14" empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=query, hive_tb_name=hive_tb, msg_usr=['chenyuanjie'] ) if not empty_flag: sh = CommonUtil.build_import_sh(site_name=site_name, db_type=db_type, query=query, hdfs_path=hdfs_path, map_num=10, key='id') client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) client.close() #导入后检测--检测数据一致性 CommonUtil.check_import_sync_num(db_type=db_type, partition_dict=partition_dict, import_query=query, hive_tb_name=hive_tb, msg_usr=['chenyuanjie'] ) # # 导出到pg数据库 # db_type = "postgresql" # export_tb = f"{site_name}_asin_image_copy" # # # 导出表名 # sh = CommonUtil.build_export_sh( # site_name=site_name, # db_type=db_type, # hive_tb="tmp_asin_image_copy", # export_tb=export_tb, # col=[ # "asin", # "img_url", # "img_order_by", # "created_at", # "updated_at", # "data_type" # ], # partition_dict={ # "site_name": site_name # } # ) # client = SSHUtil.get_ssh_client() # SSHUtil.exec_command_async(client, sh, ignore_err=False) # client.close() pass