# 用于本地py代码测试 import os import sys import time sys.path.append(os.path.dirname(sys.path[0])) import paramiko from utils.common_util import CommonUtil ssh_host = "hadoop5" ssh_port = 22 ssh_user = "root" ssh_pwd = "LrmkEqypH4ZV4S4jA3gq3tSRTNsp2gpjqupLDM5K" remote_dir = "/tmp/wjc_py/" remote_py = "/opt/module/anaconda3/envs/pyspark/bin/python" def put_and_run(local_path, remote_path): pass def put_and_run(py_file, args): client = paramiko.SSHClient() client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd) print("连接远程服务器成功...") sftp = client.open_sftp() file_name = py_file[py_file.rfind("\\") + 1:] remote_file = f"{remote_dir}{file_name}" print(f"上传文件【{py_file}】到远程【{remote_file}】中...") sftp.put(py_file, remote_file) argstr = " ".join(args) cmd = """ /opt/module/spark/bin/spark-submit \ --driver-memory 500M \ --executor-memory 500M \ --executor-cores 1 \ --num-executors 3 \ --queue spark \ {py_file} {args} """ cmd = cmd.format(py_file=remote_file, args=argstr) print(f"执行远程命令:【{cmd}】 中...") stdin, stdout, stderr = client.exec_command(cmd) print(stdout.read().decode('utf-8')) client.close() pass def submit_remote_run(file_path, args): """ 测试算法用 使用local模式 :param file_path: 本地文件路径 :param args: 参数 :return: """ argstr = " ".join(args) dir = "/tmp/wjc_py/" cmd = """ /opt/module/spark/bin/spark-submit \\ --driver-memory 500M \\ --executor-memory 500M \\ --executor-cores 1 \\ --num-executors 3 \\ --queue spark \\ {py_file} {args} """ cmd = cmd.format(py_file=dir + file_path, args=argstr) print("=====================执行远程命令========================") print(cmd) print("=====================执行远程命令========================") fr = os.popen(cmd, "r") print(fr.read()) fr.close() def submit_yarn_run(file_path, args): """ 直接提交spark到yarn :param file_path: 本地文件路径 :param args: 参数 :return: """ arg_str1 = " ".join(args) arg_str2 = "_".join(args) dir = "/tmp/wjc_py/" file_name = file_path[file_path.rfind("/") + 1:file_path.rfind(".")] log_path = f"/tmp/wjc_java/log/{file_name}_{arg_str2}.log" py_file = dir + file_path cmd = f""" /opt/module/spark/bin/spark-submit \\ --master yarn \\ --driver-memory 2g \\ --executor-memory 10g \\ --executor-cores 4 \\ --num-executors 25 \\ --queue spark \\ {py_file} {arg_str1} """ print("=====================执行远程命令========================") print(cmd) print("=====================执行远程命令========================") fr = os.popen(cmd, "r") print(fr.read()) fr.close() print("====================日志文件位于========================") print(log_path) if __name__ == '__main__': submit_yarn_run( file_path="my_kafka/keyword_pcp_listener.py", args=[] ) submit_remote_run( file_path="my_kafka/keyword_pcp_listener.py", args=[] ) submit_yarn_run( file_path="my_kafka/keyword_pcp_listener.py", args=[] ) # submit_remote_run( # file_path="script/test_overwrite_insert.py", # args=[ # "us", # "day", # "2023-01-01" # ] # ) # submit_yarn_run( # file_path="dwd/dwd_st_volume_fba.py", # args=[ # "us", # "day", # "2023-01-01" # ] # ) # submit_yarn_run( # file_path="sparkTest/tmp1.py", # args=[ # # "us", # # "last365day", # # "2023-01", # "us", # "day", # "2023-01-01" # ] # ) # # submit_yarn_run( # file_path="dim/dim_header_category_bsr.py", # args=["us"] # ) # HdfsUtils.delete_hdfs_file("/home/big_data_selection/dwd/dwd_bsr_asin_rank/site_name=us/date_type=last30day/date_info=2023-01-30") # HdfsUtils.delete_hdfs_file("/home/big_data_selection/dim/dim_bsr_asin_rank") # submit_yarn_run( # file_path="dwd/dwt_st_sv_last365.py", # args=["us", "2023-01-30"] # ) # for i in range(4): # submit_remote_run( # file_path="sparkTest/test_read_hive.py", # args=["31231", "123q31", "adohahso"] # ) pass