import os import tkinter import zipfile from datetime import datetime from tkinter.messagebox import * from Pyspark_job.utils.hdfs_utils import HdfsUtils import paramiko ssh_host = "hadoop5" ssh_port = 22 ssh_user = "root" ssh_pwd = "LrmkEqypH4ZV4S4jA3gq3tSRTNsp2gpjqupLDM5K" def crlf_2_lf(full_path): """ sh windows脚本转换为 unix分隔符 :param full_path: :return: """ WINDOWS_LINE_ENDING = b'\r\n' UNIX_LINE_ENDING = b'\n' with open(full_path, 'rb') as open_file: content = open_file.read() return content.replace(WINDOWS_LINE_ENDING, UNIX_LINE_ENDING) def handle_zip(dir_name, filename): zfName = filename result_path = os.path.join(os.getcwd(), zfName) try: os.remove(result_path) except: pass foo = zipfile.ZipFile(zfName, 'w') for root, dirs, files in os.walk(dir_name): for f in files: full_path = os.path.join(root, f) zip_path = full_path[len(dir_name) + 1:] if full_path.endswith("sh"): foo.writestr(zinfo_or_arcname=zip_path, data=crlf_2_lf(full_path)) pass else: foo.write(full_path, zip_path) foo.close() print("压缩文件成功!!") print(f"压缩文件目录为{result_path}") return result_path def put_and_unzip(local_file, remote_dir): window = tkinter.Tk() window.withdraw() result = askquestion('确认', f'是否确认部署到目录{remote_dir}') if (result == 'no'): return client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd) print("连接远程服务器成功...") # 时间后缀 suffix = datetime.now().strftime("%m_%d_%H_%M") cmd = f"mv {remote_dir} {remote_dir}_back_{suffix}" print("执行命令中") client.exec_command(cmd) cmd = f"mkdir {remote_dir}" client.exec_command(cmd) print(f"备份远程目录{remote_dir}下的文件中.....") sftp = client.open_sftp() file_name = local_file[local_file.rfind("\\") + 1:] remote_path = f"{remote_dir}/{file_name}" print(f"上传文件【{local_file}】到远程【{remote_path}】中...") sftp.put(local_file, remote_path) print("上传成功!!") cmd = f""" cd {remote_dir} && unzip -d {remote_dir} {file_name}""" client.exec_command(cmd) print(f"解压远程压缩文件{remote_path}中.....") cmd = f"rm -rf {remote_path}" client.exec_command(cmd) print(f"删除远程压缩文件{remote_path}中.....") client.close() print("success") pass def replace_file(local_file, remote_file): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd) print("连接远程服务器成功...") sftp = client.open_sftp() print(f"上传文件【{local_file}】到远程【{remote_file}】中...") sftp.put(local_file, remote_file) print("上传成功!!") pass def zip_yswgutils_to_hdfs(): here = os.path.abspath(os.path.dirname(__file__)) src = os.path.join(here, "Pyspark_job") # 定死 filename = "yswg_utils.zip" result_path = os.path.join(here, filename) if os.path.exists(result_path): os.remove(result_path) foo = zipfile.ZipFile(filename, 'w') for root, dirs, files in os.walk(src): for f in files: full_path = os.path.join(root, f) zip_path = full_path[len(src) + 1:] if full_path.endswith("sh"): foo.writestr(zinfo_or_arcname=zip_path, data=crlf_2_lf(full_path)) pass else: foo.write(full_path, zip_path) foo.close() print("上传环境到hdfs中.................") hdfs_path = f"/lib/{filename}" client = HdfsUtils.get_hdfs_cilent() client.delete(hdfs_path) client.upload(hdfs_path, result_path, cleanup=True) print("删除本地包中.................") os.remove(result_path) pass if __name__ == '__main__': zip_yswgutils_to_hdfs() # local_dir = "E:\Amazon-Selection\\Pyspark_job" # remote_dir = "/opt/module/spark/demo/py_demo" # local_dir = "E:\Amazon-Selection\\Pyspark_job" # remote_dir = "/opt/module/spark-3.2.0-bin-hadoop3.2/python/lib/" # local_dir = "E:\Amazon-Selection\\Pyspark_job" # remote_dir = "/tmp/wjc_py" # result_path = handle_zip(local_dir, "result.zip") # put_and_unzip(result_path, remote_dir) # local_file = r"E:\Amazon-Selection\Pyspark_job\dwd\dwd_asin_title_number.py" # remote_file = "/opt/module/spark/demo/py_demo/dwd/dwd_asin_title_number.py" # local_file = r"E:\Amazon-Selection\Pyspark_job\yswg_utils\dist\yswg_utils-0.1-py3.9.egg" # remote_file = "E:\Amazon-Selection\Pyspark_job\yswg_utils\dist\yswg_utils-0.1-py3.9.egg" # replace_file(local_file, remote_file) pass