import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) assert site_name is not None, "site_name 不能为空!" hive_tb = "ods_asin_variat" db_type = "mysql" partition_dict = { "site_name": site_name } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict=partition_dict) print(f"hdfs_path is {hdfs_path}") import_table = f"{site_name}_variat" query = f""" select id, asin, parent_asin, color, `size`, `style`, `state`, column_2, column_1, created_at as created_time, updated_at as updated_time from {import_table} where 1 = 1 and \$CONDITIONS """ empty_flag, check_flag = CommonUtil.check_schema_before_import(db_type=db_type, site_name=site_name, query=query, hive_tb_name=hive_tb, msg_usr=['wangrui4'] ) assert check_flag, f"导入hive表{hive_tb}表结构检查失败!请检查query是否异常!!" if not empty_flag: sh = CommonUtil.build_import_sh(site_name=site_name, db_type=db_type, query=query, hdfs_path=hdfs_path, map_num=50, key='id' ) # 导入前先删除 HdfsUtils.delete_hdfs_file(hdfs_path) client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, sh, ignore_err=False) CommonUtil.after_import(hdfs_path=hdfs_path, hive_tb=hive_tb) client.close() # # 导入后检测--检测数据一致性 # CommonUtil.check_import_sync_num(db_type=db_type, # partition_dict=partition_dict, # import_query=query, # hive_tb_name=hive_tb, # msg_usr=['chenyuanjie','wangrui4','chenjianyun'] # ) pass