#  用于本地py代码测试
import os
import sys
import time

sys.path.append(os.path.dirname(sys.path[0]))
import paramiko

from utils.common_util import CommonUtil

ssh_host = "hadoop5"
ssh_port = 22
ssh_user = "root"
ssh_pwd = "LrmkEqypH4ZV4S4jA3gq3tSRTNsp2gpjqupLDM5K"
remote_dir = "/tmp/wjc_py/"
remote_py = "/opt/module/anaconda3/envs/pyspark/bin/python"


def put_and_run(local_path, remote_path):
    pass


def put_and_run(py_file, args):
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    client.connect(hostname=ssh_host, port=ssh_port, username=ssh_user, password=ssh_pwd)
    print("连接远程服务器成功...")
    sftp = client.open_sftp()

    file_name = py_file[py_file.rfind("\\") + 1:]
    remote_file = f"{remote_dir}{file_name}"

    print(f"上传文件【{py_file}】到远程【{remote_file}】中...")

    sftp.put(py_file, remote_file)
    argstr = " ".join(args)
    cmd = """
            /opt/module/spark/bin/spark-submit  \
        --driver-memory 500M \
        --executor-memory 500M \
        --executor-cores 1 \
        --num-executors 3 \
        --queue spark \
        {py_file} {args}
    """
    cmd = cmd.format(py_file=remote_file, args=argstr)

    print(f"执行远程命令:【{cmd}】 中...")
    stdin, stdout, stderr = client.exec_command(cmd)
    print(stdout.read().decode('utf-8'))
    client.close()
    pass


def submit_remote_run(file_path, args):
    """
    测试算法用  使用local模式
    :param file_path:  本地文件路径
    :param args: 参数
    :return:
    """
    argstr = " ".join(args)
    dir = "/tmp/wjc_py/"
    cmd = """
            /opt/module/spark/bin/spark-submit  \\
            --driver-memory 500M \\
            --executor-memory 500M \\
            --executor-cores 1 \\
            --num-executors 3 \\
            --queue spark \\
            {py_file} {args}
        """
    cmd = cmd.format(py_file=dir + file_path, args=argstr)
    print("=====================执行远程命令========================")
    print(cmd)
    print("=====================执行远程命令========================")
    fr = os.popen(cmd, "r")
    print(fr.read())
    fr.close()


def submit_yarn_run(file_path, args):
    """
    直接提交spark到yarn
    :param file_path:  本地文件路径
    :param args: 参数
    :return:
    """
    arg_str1 = " ".join(args)
    arg_str2 = "_".join(args)
    dir = "/tmp/wjc_py/"
    file_name = file_path[file_path.rfind("/") + 1:file_path.rfind(".")]
    log_path = f"/tmp/wjc_java/log/{file_name}_{arg_str2}.log"

    py_file = dir + file_path

    cmd = f"""
 
/opt/module/spark/bin/spark-submit  \\
--master yarn \\
--driver-memory 2g \\
--executor-memory 10g \\
--executor-cores 4 \\
--num-executors 25 \\
--queue spark \\
{py_file} {arg_str1}
        """

    print("=====================执行远程命令========================")
    print(cmd)
    print("=====================执行远程命令========================")
    fr = os.popen(cmd, "r")
    print(fr.read())
    fr.close()
    print("====================日志文件位于========================")
    print(log_path)


if __name__ == '__main__':
    submit_yarn_run(
        file_path="my_kafka/keyword_pcp_listener.py",
        args=[]
    )
    submit_remote_run(
        file_path="my_kafka/keyword_pcp_listener.py",
        args=[]
    )
    submit_yarn_run(
        file_path="my_kafka/keyword_pcp_listener.py",
        args=[]
    )
    # submit_remote_run(
    #     file_path="script/test_overwrite_insert.py",
    #     args=[
    #         "us",
    #         "day",
    #         "2023-01-01"
    #     ]
    # )

    # submit_yarn_run(
    #     file_path="dwd/dwd_st_volume_fba.py",
    #     args=[
    #         "us",
    #         "day",
    #         "2023-01-01"
    #     ]
    # )

    # submit_yarn_run(
    #     file_path="sparkTest/tmp1.py",
    #     args=[
    #         # "us",
    #         # "last365day",
    #         # "2023-01",
    #         "us",
    #         "day",
    #         "2023-01-01"
    #     ]
    # )

    #
    # submit_yarn_run(
    #     file_path="dim/dim_header_category_bsr.py",
    #     args=["us"]
    # )

    # HdfsUtils.delete_hdfs_file("/home/big_data_selection/dwd/dwd_bsr_asin_rank/site_name=us/date_type=last30day/date_info=2023-01-30")
    # HdfsUtils.delete_hdfs_file("/home/big_data_selection/dim/dim_bsr_asin_rank")

    # submit_yarn_run(
    #     file_path="dwd/dwt_st_sv_last365.py",
    #     args=["us", "2023-01-30"]
    # )

    # for i in range(4):
    #     submit_remote_run(
    #         file_path="sparkTest/test_read_hive.py",
    #         args=["31231", "123q31", "adohahso"]
    #     )
    pass