bsr_rank_recalc.py 2.56 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.ssh_util import SSHUtil
from utils.hdfs_utils import HdfsUtils

"""
临时重跑bsr_rank数据
"""


def getNowDay(current):
    bsr_start_day = "2023-06-26"
    bsr_end_day = "2023-09-21"

    job_start_day = "2023-09-20"

    # now = CommonUtil.format_now("%Y-%m-%d")

    #  每天跑10个任务
    day_num = 10

    offset = CommonUtil.calculate_date_offset(job_start_day, current)

    now_end_day = CommonUtil.get_day_offset(bsr_end_day, (day_num * offset + 1) * -1)
    now_start_day = CommonUtil.get_day_offset(bsr_end_day, (day_num * offset + day_num) * -1)

    if now_start_day < bsr_start_day:
        now_start_day = bsr_start_day

    if now_start_day >= now_end_day:
        return None

    return (now_start_day, now_end_day)


if __name__ == '__main__':
    now = CommonUtil.format_now("%Y-%m-%d")
    val = getNowDay(now)
    if not val:
        CommonUtil.send_wx_msg(["wujicang"], "提醒", f"重跑 dwd_bsr_asin_rank任务无需进行,请手动关闭")
        print("程序结束")
    else:
        now_start_day, now_end_day = val
        print(f"重新计算【{now_start_day}】-【{now_end_day}】数据中")

        tmp_day = now_end_day
        client = SSHUtil.get_ssh_client()
        while tmp_day >= now_start_day:

            path_exist = HdfsUtils.path_exist(CommonUtil.build_hdfs_path("dwd_bsr_asin_rank", {
                "site_name": "us",
                "date_type": "last30day",
                "date_info": tmp_day
            }))

            if path_exist:
                cmd1 = f"""
                     /opt/module/spark/bin/spark-submit  \
                    --master yarn \
                    --driver-memory 2g \
                    --executor-memory 10g \
                    --executor-cores 4 \
                    --num-executors 25 \
                    --queue default \
                    /opt/module/spark/demo/py_demo/dwd/dwd_bsr_asin_rank.py us {tmp_day}
            """
                SSHUtil.exec_command_async(client, cmd1, True)
                cmd2 = f"""
                    /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_export/dwd_bsr_asin_rank.py us {tmp_day}
                     """
                SSHUtil.exec_command_async(client, cmd2, True)

            tmp_day = CommonUtil.get_day_offset(tmp_day, -1)

        CommonUtil.send_wx_msg(["wujicang"], "提醒", f"重跑 dwd_bsr_asin_rank并导出【{now_start_day}】-【{now_end_day}】成功")
        client.close()
    pass