1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
from hdfs import InsecureClient
from hdfs.client import Client
from kazoo.client import KazooClient
from hdfs.util import HdfsError
import re
class HdfsUtils(object):
# __hdfs_host__ = ""
# __hdfs_port__ = "9870"
# __hdfs_url__ = ""
__hdfs_url__ = "http://hadoop15:9870/"
__hdfs_user__ = "root"
__hdfs_client__ = None
# 建立hdfs链接
@staticmethod
def get_hdfs_cilent() -> Client:
if HdfsUtils.__hdfs_client__ is None:
# HdfsUtils.__hdfs_host__ = HdfsUtils.get_namenode_info()
# HdfsUtils.__hdfs_url__ = f"http://{HdfsUtils.__hdfs_host__}:{HdfsUtils.__hdfs_port__}/"
# print(HdfsUtils.__hdfs_url__)
# HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__, HdfsUtils.__hdfs_user__)
HdfsUtils.__hdfs_client__ = InsecureClient(HdfsUtils.__hdfs_url__)
return HdfsUtils.__hdfs_client__
@staticmethod
def get_namenode_info():
"""
废弃
:return:
"""
zk = KazooClient("hadoop14:2181,hadoop15:2181,hadoop16:2181")
zk.start()
zookeeper_path = '/hadoop-ha/nameservice1/ActiveBreadCrumb'
if zk.exists(zookeeper_path):
data, _ = zk.get(zookeeper_path)
data = data.decode('latin-1')
pattern = r'\b(hadoop\d+)\b'
matches = re.findall(pattern, data)
host = matches[0]
print(host)
return host
@staticmethod
def delete_hdfs_file(hdfs_path):
"""
删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs
:param hdfs_path:
:return:
"""
tmp = "/home/big_data_selection/dwd/xxx"
assert len(hdfs_path) > len(tmp), '要删除的路径长度不合法,请检查!!'
cilent = HdfsUtils.get_hdfs_cilent()
return cilent.delete(hdfs_path, True, False)
@staticmethod
def delete_hdfs_file_with_checkpoint(hdfs_path):
"""
删除hdfs路径 hadoop fs -rm -r -skipTrash /path/to/hdfs
:param hdfs_path:
:return:
"""
cilent = HdfsUtils.get_hdfs_cilent()
if hdfs_path == '' or cilent.status(hdfs_path, False) is None:
print("路径不存在")
return False
else:
print("路径存在")
value = cilent.delete(hdfs_path, True)
if value is None:
print("目录删除失败")
return False
print("目录删除成功")
return True
@staticmethod
def list_all_part_location(hive_tb):
"""
根据hive表获取所有的part location路径
:param hive_tb:
:return:
"""
from utils.common_util import CommonUtil
from utils.ssh_util import SSHUtil
path_prefix = CommonUtil.build_hdfs_path(hive_tb, None)
client = SSHUtil.get_ssh_client()
cmd = f"hdfs fsck {path_prefix} -files -blocks -locations"
stdin, stdout, stderr = client.exec_command(cmd)
res_text = stdout.read().decode('utf-8')
client.close()
find_list = re.findall(
# fr"({path_prefix}(.*)/part.*decommissioning replica\(s\))",
fr"({path_prefix}(.*)/part.*\.lzo)",
res_text
)
result_list = []
unique_set = set()
for it in find_list:
line = str(it[0])
part_full_name = line[0:line.find(" ")]
part_by = part_full_name[part_full_name.find(path_prefix) + len(path_prefix) + 1:part_full_name.find("/part")]
part_name = part_full_name[part_full_name.find(path_prefix + part_by) + len(path_prefix + part_by) + 3:]
part_by_val = {}
for kv in part_by.split("/"):
kv_arr = kv.split("=")
part_by_val[kv_arr[0]] = kv_arr[1]
row = {
"full_name": part_full_name,
"part_by": part_by_val,
"part_name": part_name,
"location": f"{path_prefix}/{part_by}"
}
# 去重
if part_full_name not in unique_set:
result_list.append(row)
unique_set.add(part_full_name)
return result_list
@staticmethod
def delete_file_in_folder(hdfs_path: str):
"""
删除hdfs文件夹下的文件保留文件夹
:param hdfs_path: 路径
:return:
"""
cilent = HdfsUtils.get_hdfs_cilent()
status = cilent.status(hdfs_path, False)
if status != None and status.get("type") == "DIRECTORY":
HdfsUtils.delete_hdfs_file(hdfs_path)
cilent.makedirs(hdfs_path, permission=775)
return True
else:
return False
@staticmethod
def is_checkpoint_exist(hdfs_path):
cilent = HdfsUtils.get_hdfs_cilent()
try:
status = cilent.status(hdfs_path)
if status is not None:
print("目录已存在")
except HdfsError as e:
cilent.makedirs(hdfs_path, permission=775)
print("目录创建成功")
@staticmethod
def path_exist(hdfs_path):
"""
判断hdfs文件是否存在
:param hdfs_path:
:return:
"""
cilent = HdfsUtils.get_hdfs_cilent()
status = cilent.status(hdfs_path, False)
if status is None:
return False
return True
@staticmethod
def create_if_not_exist(hdfs_path):
"""
如果路径不存在则递归创建路径
:param hdfs_path:
:return:
"""
cilent = HdfsUtils.get_hdfs_cilent()
# 判断文件是否存在
status = cilent.status(hdfs_path, False)
if status is None:
return cilent.makedirs(hdfs_path, permission=775)
# 上传hdfs文件
@staticmethod
def put_to_hdfs(local_path, hdfs_path):
cilent = HdfsUtils.get_hdfs_cilent()
return cilent.upload(hdfs_path, local_path, cleanup=True)
# 读取hdfs文件内容
@staticmethod
def read_hdfs_file(hdfs_file):
cilent = HdfsUtils.get_hdfs_cilent()
lines = []
with cilent.read(hdfs_file, encoding='utf-8', delimiter='\n') as reader:
for line in reader:
lines.append(line.strip())
return lines
@staticmethod
def read_list(hdfs_path):
"""
读取hdfs路径下的文件名
:param hdfs_path:
:return:
"""
try:
cilent = HdfsUtils.get_hdfs_cilent()
return cilent.list(hdfs_path)
except Exception as ex:
return None
@classmethod
def exchange_path(cls, path_one: str, path_two: str):
"""
交换hdfs的两个目录名 此处要注意两个path一定要存在否则请使用重命名
:param path_one:
:param path_two:
:return:
"""
import os
import uuid
cilent = HdfsUtils.get_hdfs_cilent()
tmp = os.path.join(
os.path.dirname(path_one),
f"{os.path.basename(path_one)}_tmp_{uuid.uuid1()}"
)
cilent.rename(path_one, tmp)
cilent.rename(path_two, path_one)
cilent.rename(tmp, path_two)
print(f"交换目录【{path_one}】和【{path_two}】成功!!")
pass
# 复制hdfs文件
@staticmethod
def copy_file(source_path, target_path):
client = HdfsUtils.get_hdfs_cilent()
# 检查源路径是否存在
try:
source_status = client.status(source_path)
except HdfsError as e:
print(f"Source path '{source_path}' does not exist.")
return
# 检查目标路径是否存在
try:
target_status = client.status(target_path)
except HdfsError as e:
print(f"Target path '{target_path}' does not exist.")
return
# 获取源路径下的所有文件
try:
files = client.list(source_path)
except HdfsError as e:
print(f"Error occurred while listing files in '{source_path}'.")
return
for file in files:
source_file = source_path + '/' + file
target_file = target_path + '/' + file
try:
client.copy(source_file, target_file)
print(f"File '{source_file}' copied to '{target_file}' successfully!")
except HdfsError as e:
print(f"Error occurred while copying file '{source_file}' to '{target_file}': {e}")
if __name__ == '__main__':
pass