import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.ssh_util import SSHUtil from utils.hdfs_utils import HdfsUtils """ 临时重跑bsr_rank数据 """ def getNowDay(current): bsr_start_day = "2023-06-26" bsr_end_day = "2023-09-21" job_start_day = "2023-09-20" # now = CommonUtil.format_now("%Y-%m-%d") # 每天跑10个任务 day_num = 10 offset = CommonUtil.calculate_date_offset(job_start_day, current) now_end_day = CommonUtil.get_day_offset(bsr_end_day, (day_num * offset + 1) * -1) now_start_day = CommonUtil.get_day_offset(bsr_end_day, (day_num * offset + day_num) * -1) if now_start_day < bsr_start_day: now_start_day = bsr_start_day if now_start_day >= now_end_day: return None return (now_start_day, now_end_day) if __name__ == '__main__': now = CommonUtil.format_now("%Y-%m-%d") val = getNowDay(now) if not val: CommonUtil.send_wx_msg(["wujicang"], "提醒", f"重跑 dwd_bsr_asin_rank任务无需进行,请手动关闭") print("程序结束") else: now_start_day, now_end_day = val print(f"重新计算【{now_start_day}】-【{now_end_day}】数据中") tmp_day = now_end_day client = SSHUtil.get_ssh_client() while tmp_day >= now_start_day: path_exist = HdfsUtils.path_exist(CommonUtil.build_hdfs_path("dwd_bsr_asin_rank", { "site_name": "us", "date_type": "last30day", "date_info": tmp_day })) if path_exist: cmd1 = f""" /opt/module/spark/bin/spark-submit \ --master yarn \ --driver-memory 2g \ --executor-memory 10g \ --executor-cores 4 \ --num-executors 25 \ --queue default \ /opt/module/spark/demo/py_demo/dwd/dwd_bsr_asin_rank.py us {tmp_day} """ SSHUtil.exec_command_async(client, cmd1, True) cmd2 = f""" /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_export/dwd_bsr_asin_rank.py us {tmp_day} """ SSHUtil.exec_command_async(client, cmd2, True) tmp_day = CommonUtil.get_day_offset(tmp_day, -1) CommonUtil.send_wx_msg(["wujicang"], "提醒", f"重跑 dwd_bsr_asin_rank并导出【{now_start_day}】-【{now_end_day}】成功") client.close() pass