hdfs_utils.py 8.5 KB
from hdfs import InsecureClient
from hdfs.client import Client
from kazoo.client import KazooClient
from hdfs.util import HdfsError
import re


class HdfsUtils(object):
    # __hdfs_host__ = ""
    # __hdfs_port__ = "9870"
    # __hdfs_url__ = ""

    __hdfs_url__ = "http://hadoop15:9870/"
    __hdfs_user__ = "root"
    __hdfs_client__ = None

    # 建立hdfs链接
    @staticmethod
    def get_hdfs_cilent() -> Client:
        if HdfsUtils.__hdfs_client__ is None:
            # HdfsUtils.__hdfs_host__ = HdfsUtils.get_namenode_info()
            # HdfsUtils.__hdfs_url__ = f"http://{HdfsUtils.__hdfs_host__}:{HdfsUtils.__hdfs_port__}/"
            # print(HdfsUtils.__hdfs_url__)
            # HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__, HdfsUtils.__hdfs_user__)
            HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__)
        return HdfsUtils.__hdfs_client__

    @staticmethod
    def get_namenode_info():
        """
        废弃
        :return:
        """
        zk = KazooClient("hadoop14:2181,hadoop15:2181,hadoop16:2181")
        zk.start()
        zookeeper_path = '/hadoop-ha/nameservice1/ActiveBreadCrumb'
        if zk.exists(zookeeper_path):
            data, _ = zk.get(zookeeper_path)
            data = data.decode('latin-1')
        pattern = r'\b(hadoop\d+)\b'
        matches = re.findall(pattern, data)
        host = matches[0]
        print(host)
        return host

    @staticmethod
    def delete_hdfs_file(hdfs_path):
        """
        删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs
        :param hdfs_path:
        :return:
        """
        tmp = "/home/big_data_selection/dwd/xxx"
        assert len(hdfs_path) > len(tmp), '要删除的路径长度不合法,请检查!!'
        cilent = HdfsUtils.get_hdfs_cilent()
        return cilent.delete(hdfs_path, True, False)

    @staticmethod
    def delete_hdfs_file_with_checkpoint(hdfs_path):
        """
        删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs
        :param hdfs_path:
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        if hdfs_path == '' or cilent.status(hdfs_path, False) is None:
            print("路径不存在")
            return False
        else:
            print("路径存在")
            value = cilent.delete(hdfs_path, True)
            if value is None:
                print("目录删除失败")
                return False
            print("目录删除成功")
            return True


    @staticmethod
    def list_all_part_location(hive_tb):
        """
        根据hive表获取所有的part location路径
        :param hive_tb:
        :return:
        """
        from utils.common_util import CommonUtil
        from utils.ssh_util import SSHUtil
        path_prefix = CommonUtil.build_hdfs_path(hive_tb, None)

        client = SSHUtil.get_ssh_client()
        cmd = f"hdfs fsck {path_prefix} -files -blocks -locations"
        stdin, stdout, stderr = client.exec_command(cmd)
        res_text = stdout.read().decode('utf-8')
        client.close()

        find_list = re.findall(
            # fr"({path_prefix}(.*)/part.*decommissioning replica\(s\))",
            fr"({path_prefix}(.*)/part.*\.lzo)",
            res_text
        )
        result_list = []
        unique_set = set()
        for it in find_list:
            line = str(it[0])
            part_full_name = line[0:line.find(" ")]
            part_by = part_full_name[part_full_name.find(path_prefix) + len(path_prefix) + 1:part_full_name.find("/part")]
            part_name = part_full_name[part_full_name.find(path_prefix + part_by) + len(path_prefix + part_by) + 3:]
            part_by_val = {}
            for kv in part_by.split("/"):
                kv_arr = kv.split("=")
                part_by_val[kv_arr[0]] = kv_arr[1]
                row = {
                    "full_name": part_full_name,
                    "part_by": part_by_val,
                    "part_name": part_name,
                    "location": f"{path_prefix}/{part_by}"
                }
                # 去重
                if part_full_name not in unique_set:
                    result_list.append(row)
                unique_set.add(part_full_name)
        return result_list

    @staticmethod
    def delete_file_in_folder(hdfs_path: str):
        """
        删除hdfs文件夹下的文件保留文件夹
        :param hdfs_path:  路径
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        status = cilent.status(hdfs_path, False)

        if status != None and status.get("type") == "DIRECTORY":
            HdfsUtils.delete_hdfs_file(hdfs_path)
            cilent.makedirs(hdfs_path, permission=775)
            return True
        else:
            return False

    @staticmethod
    def is_checkpoint_exist(hdfs_path):
        cilent = HdfsUtils.get_hdfs_cilent()
        try:
            status = cilent.status(hdfs_path)
            if status is not None:
                print("目录已存在")
        except HdfsError as e:
            cilent.makedirs(hdfs_path, permission=775)
            print("目录创建成功")

    @staticmethod
    def path_exist(hdfs_path):
        """
        判断hdfs文件是否存在
        :param hdfs_path:
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        status = cilent.status(hdfs_path, False)
        if status is None:
            return False
        return True

    @staticmethod
    def create_if_not_exist(hdfs_path):
        """
        如果路径不存在则递归创建路径
        :param hdfs_path:
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        #  判断文件是否存在
        status = cilent.status(hdfs_path, False)
        if status is None:
            return cilent.makedirs(hdfs_path, permission=775)

    # 上传hdfs文件
    @staticmethod
    def put_to_hdfs(local_path, hdfs_path):
        cilent = HdfsUtils.get_hdfs_cilent()
        return cilent.upload(hdfs_path, local_path, cleanup=True)

    # 读取hdfs文件内容
    @staticmethod
    def read_hdfs_file(hdfs_file):
        cilent = HdfsUtils.get_hdfs_cilent()
        lines = []
        with cilent.read(hdfs_file, encoding='utf-8', delimiter='\n') as reader:
            for line in reader:
                lines.append(line.strip())
        return lines

    @staticmethod
    def read_list(hdfs_path):
        """
        读取hdfs路径下的文件名
        :param hdfs_path:
        :return:
        """
        try:
            cilent = HdfsUtils.get_hdfs_cilent()
            return cilent.list(hdfs_path)
        except Exception as ex:
            return None

    @classmethod
    def exchange_path(cls, path_one: str, path_two: str):
        """
        交换hdfs的两个目录名 此处要注意两个path一定要存在否则请使用重命名
        :param path_one:
        :param path_two:
        :return:
        """
        import os
        import uuid
        cilent = HdfsUtils.get_hdfs_cilent()
        tmp = os.path.join(
            os.path.dirname(path_one),
            f"{os.path.basename(path_one)}_tmp_{uuid.uuid1()}"
        )
        cilent.rename(path_one, tmp)
        cilent.rename(path_two, path_one)
        cilent.rename(tmp, path_two)
        print(f"交换目录【{path_one}】和【{path_two}】成功!!")
        pass

    # 复制hdfs文件
    @staticmethod
    def copy_file(source_path, target_path):
        client = HdfsUtils.get_hdfs_cilent()
        # 检查源路径是否存在
        try:
            source_status = client.status(source_path)
        except HdfsError as e:
            print(f"Source path '{source_path}' does not exist.")
            return

         # 检查目标路径是否存在
        try:
            target_status = client.status(target_path)
        except HdfsError as e:
            print(f"Target path '{target_path}' does not exist.")
            return

        # 获取源路径下的所有文件
        try:
            files = client.list(source_path)
        except HdfsError as e:
            print(f"Error occurred while listing files in '{source_path}'.")
            return

        for file in files:
            source_file = source_path + '/' + file
            target_file = target_path + '/' + file
            try:
                client.copy(source_file, target_file)
                print(f"File '{source_file}' copied to '{target_file}' successfully!")
            except HdfsError as e:
                print(f"Error occurred while copying file '{source_file}' to '{target_file}': {e}")


if __name__ == '__main__':
    pass