hdfs_utils.py 8.5 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264
from hdfs import InsecureClient
from hdfs.client import Client
from kazoo.client import KazooClient
from hdfs.util import HdfsError
import re


class HdfsUtils(object):
    # __hdfs_host__ = ""
    # __hdfs_port__ = "9870"
    # __hdfs_url__ = ""

    __hdfs_url__ = "http://hadoop15:9870/"
    __hdfs_user__ = "root"
    __hdfs_client__ = None

    # 建立hdfs链接
    @staticmethod
    def get_hdfs_cilent() -> Client:
        if HdfsUtils.__hdfs_client__ is None:
            # HdfsUtils.__hdfs_host__ = HdfsUtils.get_namenode_info()
            # HdfsUtils.__hdfs_url__ = f"http://{HdfsUtils.__hdfs_host__}:{HdfsUtils.__hdfs_port__}/"
            # print(HdfsUtils.__hdfs_url__)
            # HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__, HdfsUtils.__hdfs_user__)
            HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__)
        return HdfsUtils.__hdfs_client__

    @staticmethod
    def get_namenode_info():
        """
        废弃
        :return:
        """
        zk = KazooClient("hadoop14:2181,hadoop15:2181,hadoop16:2181")
        zk.start()
        zookeeper_path = '/hadoop-ha/nameservice1/ActiveBreadCrumb'
        if zk.exists(zookeeper_path):
            data, _ = zk.get(zookeeper_path)
            data = data.decode('latin-1')
        pattern = r'\b(hadoop\d+)\b'
        matches = re.findall(pattern, data)
        host = matches[0]
        print(host)
        return host

    @staticmethod
    def delete_hdfs_file(hdfs_path):
        """
        删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs
        :param hdfs_path:
        :return:
        """
        tmp = "/home/big_data_selection/dwd/xxx"
        assert len(hdfs_path) > len(tmp), '要删除的路径长度不合法,请检查!!'
        cilent = HdfsUtils.get_hdfs_cilent()
        return cilent.delete(hdfs_path, True, False)

    @staticmethod
    def delete_hdfs_file_with_checkpoint(hdfs_path):
        """
        删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs
        :param hdfs_path:
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        if hdfs_path == '' or cilent.status(hdfs_path, False) is None:
            print("路径不存在")
            return False
        else:
            print("路径存在")
            value = cilent.delete(hdfs_path, True)
            if value is None:
                print("目录删除失败")
                return False
            print("目录删除成功")
            return True


    @staticmethod
    def list_all_part_location(hive_tb):
        """
        根据hive表获取所有的part location路径
        :param hive_tb:
        :return:
        """
        from utils.common_util import CommonUtil
        from utils.ssh_util import SSHUtil
        path_prefix = CommonUtil.build_hdfs_path(hive_tb, None)

        client = SSHUtil.get_ssh_client()
        cmd = f"hdfs fsck {path_prefix} -files -blocks -locations"
        stdin, stdout, stderr = client.exec_command(cmd)
        res_text = stdout.read().decode('utf-8')
        client.close()

        find_list = re.findall(
            # fr"({path_prefix}(.*)/part.*decommissioning replica\(s\))",
            fr"({path_prefix}(.*)/part.*\.lzo)",
            res_text
        )
        result_list = []
        unique_set = set()
        for it in find_list:
            line = str(it[0])
            part_full_name = line[0:line.find(" ")]
            part_by = part_full_name[part_full_name.find(path_prefix) + len(path_prefix) + 1:part_full_name.find("/part")]
            part_name = part_full_name[part_full_name.find(path_prefix + part_by) + len(path_prefix + part_by) + 3:]
            part_by_val = {}
            for kv in part_by.split("/"):
                kv_arr = kv.split("=")
                part_by_val[kv_arr[0]] = kv_arr[1]
                row = {
                    "full_name": part_full_name,
                    "part_by": part_by_val,
                    "part_name": part_name,
                    "location": f"{path_prefix}/{part_by}"
                }
                # 去重
                if part_full_name not in unique_set:
                    result_list.append(row)
                unique_set.add(part_full_name)
        return result_list

    @staticmethod
    def delete_file_in_folder(hdfs_path: str):
        """
        删除hdfs文件夹下的文件保留文件夹
        :param hdfs_path:  路径
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        status = cilent.status(hdfs_path, False)

        if status != None and status.get("type") == "DIRECTORY":
            HdfsUtils.delete_hdfs_file(hdfs_path)
            cilent.makedirs(hdfs_path, permission=775)
            return True
        else:
            return False

    @staticmethod
    def is_checkpoint_exist(hdfs_path):
        cilent = HdfsUtils.get_hdfs_cilent()
        try:
            status = cilent.status(hdfs_path)
            if status is not None:
                print("目录已存在")
        except HdfsError as e:
            cilent.makedirs(hdfs_path, permission=775)
            print("目录创建成功")

    @staticmethod
    def path_exist(hdfs_path):
        """
        判断hdfs文件是否存在
        :param hdfs_path:
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        status = cilent.status(hdfs_path, False)
        if status is None:
            return False
        return True

    @staticmethod
    def create_if_not_exist(hdfs_path):
        """
        如果路径不存在则递归创建路径
        :param hdfs_path:
        :return:
        """
        cilent = HdfsUtils.get_hdfs_cilent()
        #  判断文件是否存在
        status = cilent.status(hdfs_path, False)
        if status is None:
            return cilent.makedirs(hdfs_path, permission=775)

    # 上传hdfs文件
    @staticmethod
    def put_to_hdfs(local_path, hdfs_path):
        cilent = HdfsUtils.get_hdfs_cilent()
        return cilent.upload(hdfs_path, local_path, cleanup=True)

    # 读取hdfs文件内容
    @staticmethod
    def read_hdfs_file(hdfs_file):
        cilent = HdfsUtils.get_hdfs_cilent()
        lines = []
        with cilent.read(hdfs_file, encoding='utf-8', delimiter='\n') as reader:
            for line in reader:
                lines.append(line.strip())
        return lines

    @staticmethod
    def read_list(hdfs_path):
        """
        读取hdfs路径下的文件名
        :param hdfs_path:
        :return:
        """
        try:
            cilent = HdfsUtils.get_hdfs_cilent()
            return cilent.list(hdfs_path)
        except Exception as ex:
            return None

    @classmethod
    def exchange_path(cls, path_one: str, path_two: str):
        """
        交换hdfs的两个目录名 此处要注意两个path一定要存在否则请使用重命名
        :param path_one:
        :param path_two:
        :return:
        """
        import os
        import uuid
        cilent = HdfsUtils.get_hdfs_cilent()
        tmp = os.path.join(
            os.path.dirname(path_one),
            f"{os.path.basename(path_one)}_tmp_{uuid.uuid1()}"
        )
        cilent.rename(path_one, tmp)
        cilent.rename(path_two, path_one)
        cilent.rename(tmp, path_two)
        print(f"交换目录【{path_one}】和【{path_two}】成功!!")
        pass

    # 复制hdfs文件
    @staticmethod
    def copy_file(source_path, target_path):
        client = HdfsUtils.get_hdfs_cilent()
        # 检查源路径是否存在
        try:
            source_status = client.status(source_path)
        except HdfsError as e:
            print(f"Source path '{source_path}' does not exist.")
            return

         # 检查目标路径是否存在
        try:
            target_status = client.status(target_path)
        except HdfsError as e:
            print(f"Target path '{target_path}' does not exist.")
            return

        # 获取源路径下的所有文件
        try:
            files = client.list(source_path)
        except HdfsError as e:
            print(f"Error occurred while listing files in '{source_path}'.")
            return

        for file in files:
            source_file = source_path + '/' + file
            target_file = target_path + '/' + file
            try:
                client.copy(source_file, target_file)
                print(f"File '{source_file}' copied to '{target_file}' successfully!")
            except HdfsError as e:
                print(f"Error occurred while copying file '{source_file}' to '{target_file}': {e}")


if __name__ == '__main__':
    pass