1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
import tkinter
import zipfile
from datetime import datetime
from tkinter.messagebox import *
from Pyspark_job.utils.hdfs_utils import HdfsUtils
import paramiko
ssh_host = "hadoop5"
ssh_port = 22
ssh_user = "root"
ssh_pwd = "LrmkEqypH4ZV4S4jA3gq3tSRTNsp2gpjqupLDM5K"
def crlf_2_lf(full_path):
"""
sh windows脚本转换为 unix分隔符
:param full_path:
:return:
"""
WINDOWS_LINE_ENDING = b'\r\n'
UNIX_LINE_ENDING = b'\n'
with open(full_path, 'rb') as open_file:
content = open_file.read()
return content.replace(WINDOWS_LINE_ENDING, UNIX_LINE_ENDING)
def handle_zip(dir_name, filename):
zfName = filename
result_path = os.path.join(os.getcwd(), zfName)
try:
os.remove(result_path)
except:
pass
foo = zipfile.ZipFile(zfName, 'w')
for root, dirs, files in os.walk(dir_name):
for f in files:
full_path = os.path.join(root, f)
zip_path = full_path[len(dir_name) + 1:]
if full_path.endswith("sh"):
foo.writestr(zinfo_or_arcname=zip_path, data=crlf_2_lf(full_path))
pass
else:
foo.write(full_path, zip_path)
foo.close()
print("压缩文件成功!!")
print(f"压缩文件目录为{result_path}")
return result_path
def put_and_unzip(local_file, remote_dir):
window = tkinter.Tk()
window.withdraw()
result = askquestion('确认', f'是否确认部署到目录{remote_dir}')
if (result == 'no'):
return
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd)
print("连接远程服务器成功...")
# 时间后缀
suffix = datetime.now().strftime("%m_%d_%H_%M")
cmd = f"mv {remote_dir} {remote_dir}_back_{suffix}"
print("执行命令中")
client.exec_command(cmd)
cmd = f"mkdir {remote_dir}"
client.exec_command(cmd)
print(f"备份远程目录{remote_dir}下的文件中.....")
sftp = client.open_sftp()
file_name = local_file[local_file.rfind("\\") + 1:]
remote_path = f"{remote_dir}/{file_name}"
print(f"上传文件【{local_file}】到远程【{remote_path}】中...")
sftp.put(local_file, remote_path)
print("上传成功!!")
cmd = f""" cd {remote_dir} && unzip -d {remote_dir} {file_name}"""
client.exec_command(cmd)
print(f"解压远程压缩文件{remote_path}中.....")
cmd = f"rm -rf {remote_path}"
client.exec_command(cmd)
print(f"删除远程压缩文件{remote_path}中.....")
client.close()
print("success")
pass
def replace_file(local_file, remote_file):
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd)
print("连接远程服务器成功...")
sftp = client.open_sftp()
print(f"上传文件【{local_file}】到远程【{remote_file}】中...")
sftp.put(local_file, remote_file)
print("上传成功!!")
pass
def zip_yswgutils_to_hdfs():
here = os.path.abspath(os.path.dirname(__file__))
src = os.path.join(here, "Pyspark_job")
# 定死
filename = "yswg_utils.zip"
result_path = os.path.join(here, filename)
if os.path.exists(result_path):
os.remove(result_path)
foo = zipfile.ZipFile(filename, 'w')
for root, dirs, files in os.walk(src):
for f in files:
full_path = os.path.join(root, f)
zip_path = full_path[len(src) + 1:]
if full_path.endswith("sh"):
foo.writestr(zinfo_or_arcname=zip_path, data=crlf_2_lf(full_path))
pass
else:
foo.write(full_path, zip_path)
foo.close()
print("上传环境到hdfs中.................")
hdfs_path = f"/lib/{filename}"
client = HdfsUtils.get_hdfs_cilent()
client.delete(hdfs_path)
client.upload(hdfs_path, result_path, cleanup=True)
print("删除本地包中.................")
os.remove(result_path)
pass
if __name__ == '__main__':
zip_yswgutils_to_hdfs()
# local_dir = "E:\Amazon-Selection\\Pyspark_job"
# remote_dir = "/opt/module/spark/demo/py_demo"
# local_dir = "E:\Amazon-Selection\\Pyspark_job"
# remote_dir = "/opt/module/spark-3.2.0-bin-hadoop3.2/python/lib/"
# local_dir = "E:\Amazon-Selection\\Pyspark_job"
# remote_dir = "/tmp/wjc_py"
# result_path = handle_zip(local_dir, "result.zip")
# put_and_unzip(result_path, remote_dir)
# local_file = r"E:\Amazon-Selection\Pyspark_job\dwd\dwd_asin_title_number.py"
# remote_file = "/opt/module/spark/demo/py_demo/dwd/dwd_asin_title_number.py"
# local_file = r"E:\Amazon-Selection\Pyspark_job\yswg_utils\dist\yswg_utils-0.1-py3.9.egg"
# remote_file = "E:\Amazon-Selection\Pyspark_job\yswg_utils\dist\yswg_utils-0.1-py3.9.egg"
# replace_file(local_file, remote_file)
pass