from hdfs import InsecureClient from hdfs.client import Client from kazoo.client import KazooClient from hdfs.util import HdfsError import re class HdfsUtils(object): # __hdfs_host__ = "" # __hdfs_port__ = "9870" # __hdfs_url__ = "" __hdfs_url__ = "http://hadoop15:9870/" __hdfs_user__ = "root" __hdfs_client__ = None # 建立hdfs链接 @staticmethod def get_hdfs_cilent() -> Client: if HdfsUtils.__hdfs_client__ is None: # HdfsUtils.__hdfs_host__ = HdfsUtils.get_namenode_info() # HdfsUtils.__hdfs_url__ = f"http://{HdfsUtils.__hdfs_host__}:{HdfsUtils.__hdfs_port__}/" # print(HdfsUtils.__hdfs_url__) # HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__, HdfsUtils.__hdfs_user__) HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__) return HdfsUtils.__hdfs_client__ @staticmethod def get_namenode_info(): """ 废弃 :return: """ zk = KazooClient("hadoop14:2181,hadoop15:2181,hadoop16:2181") zk.start() zookeeper_path = '/hadoop-ha/nameservice1/ActiveBreadCrumb' if zk.exists(zookeeper_path): data, _ = zk.get(zookeeper_path) data = data.decode('latin-1') pattern = r'\b(hadoop\d+)\b' matches = re.findall(pattern, data) host = matches[0] print(host) return host @staticmethod def delete_hdfs_file(hdfs_path): """ 删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs :param hdfs_path: :return: """ tmp = "/home/big_data_selection/dwd/xxx" assert len(hdfs_path) > len(tmp), '要删除的路径长度不合法,请检查!!' cilent = HdfsUtils.get_hdfs_cilent() return cilent.delete(hdfs_path, True, False) @staticmethod def delete_hdfs_file_with_checkpoint(hdfs_path): """ 删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs :param hdfs_path: :return: """ cilent = HdfsUtils.get_hdfs_cilent() if hdfs_path == '' or cilent.status(hdfs_path, False) is None: print("路径不存在") return False else: print("路径存在") value = cilent.delete(hdfs_path, True) if value is None: print("目录删除失败") return False print("目录删除成功") return True @staticmethod def list_all_part_location(hive_tb): """ 根据hive表获取所有的part location路径 :param hive_tb: :return: """ from utils.common_util import CommonUtil from utils.ssh_util import SSHUtil path_prefix = CommonUtil.build_hdfs_path(hive_tb, None) client = SSHUtil.get_ssh_client() cmd = f"hdfs fsck {path_prefix} -files -blocks -locations" stdin, stdout, stderr = client.exec_command(cmd) res_text = stdout.read().decode('utf-8') client.close() find_list = re.findall( # fr"({path_prefix}(.*)/part.*decommissioning replica\(s\))", fr"({path_prefix}(.*)/part.*\.lzo)", res_text ) result_list = [] unique_set = set() for it in find_list: line = str(it[0]) part_full_name = line[0:line.find(" ")] part_by = part_full_name[part_full_name.find(path_prefix) + len(path_prefix) + 1:part_full_name.find("/part")] part_name = part_full_name[part_full_name.find(path_prefix + part_by) + len(path_prefix + part_by) + 3:] part_by_val = {} for kv in part_by.split("/"): kv_arr = kv.split("=") part_by_val[kv_arr[0]] = kv_arr[1] row = { "full_name": part_full_name, "part_by": part_by_val, "part_name": part_name, "location": f"{path_prefix}/{part_by}" } # 去重 if part_full_name not in unique_set: result_list.append(row) unique_set.add(part_full_name) return result_list @staticmethod def delete_file_in_folder(hdfs_path: str): """ 删除hdfs文件夹下的文件保留文件夹 :param hdfs_path: 路径 :return: """ cilent = HdfsUtils.get_hdfs_cilent() status = cilent.status(hdfs_path, False) if status != None and status.get("type") == "DIRECTORY": HdfsUtils.delete_hdfs_file(hdfs_path) cilent.makedirs(hdfs_path, permission=775) return True else: return False @staticmethod def is_checkpoint_exist(hdfs_path): cilent = HdfsUtils.get_hdfs_cilent() try: status = cilent.status(hdfs_path) if status is not None: print("目录已存在") except HdfsError as e: cilent.makedirs(hdfs_path, permission=775) print("目录创建成功") @staticmethod def path_exist(hdfs_path): """ 判断hdfs文件是否存在 :param hdfs_path: :return: """ cilent = HdfsUtils.get_hdfs_cilent() status = cilent.status(hdfs_path, False) if status is None: return False return True @staticmethod def create_if_not_exist(hdfs_path): """ 如果路径不存在则递归创建路径 :param hdfs_path: :return: """ cilent = HdfsUtils.get_hdfs_cilent() # 判断文件是否存在 status = cilent.status(hdfs_path, False) if status is None: return cilent.makedirs(hdfs_path, permission=775) # 上传hdfs文件 @staticmethod def put_to_hdfs(local_path, hdfs_path): cilent = HdfsUtils.get_hdfs_cilent() return cilent.upload(hdfs_path, local_path, cleanup=True) # 读取hdfs文件内容 @staticmethod def read_hdfs_file(hdfs_file): cilent = HdfsUtils.get_hdfs_cilent() lines = [] with cilent.read(hdfs_file, encoding='utf-8', delimiter='\n') as reader: for line in reader: lines.append(line.strip()) return lines @staticmethod def read_list(hdfs_path): """ 读取hdfs路径下的文件名 :param hdfs_path: :return: """ try: cilent = HdfsUtils.get_hdfs_cilent() return cilent.list(hdfs_path) except Exception as ex: return None @classmethod def exchange_path(cls, path_one: str, path_two: str): """ 交换hdfs的两个目录名 此处要注意两个path一定要存在否则请使用重命名 :param path_one: :param path_two: :return: """ import os import uuid cilent = HdfsUtils.get_hdfs_cilent() tmp = os.path.join( os.path.dirname(path_one), f"{os.path.basename(path_one)}_tmp_{uuid.uuid1()}" ) cilent.rename(path_one, tmp) cilent.rename(path_two, path_one) cilent.rename(tmp, path_two) print(f"交换目录【{path_one}】和【{path_two}】成功!!") pass # 复制hdfs文件 @staticmethod def copy_file(source_path, target_path): client = HdfsUtils.get_hdfs_cilent() # 检查源路径是否存在 try: source_status = client.status(source_path) except HdfsError as e: print(f"Source path '{source_path}' does not exist.") return # 检查目标路径是否存在 try: target_status = client.status(target_path) except HdfsError as e: print(f"Target path '{target_path}' does not exist.") return # 获取源路径下的所有文件 try: files = client.list(source_path) except HdfsError as e: print(f"Error occurred while listing files in '{source_path}'.") return for file in files: source_file = source_path + '/' + file target_file = target_path + '/' + file try: client.copy(source_file, target_file) print(f"File '{source_file}' copied to '{target_file}' successfully!") except HdfsError as e: print(f"Error occurred while copying file '{source_file}' to '{target_file}': {e}") if __name__ == '__main__': pass