bsr_rank_recalc.py
2.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.ssh_util import SSHUtil
from utils.hdfs_utils import HdfsUtils
"""
临时重跑bsr_rank数据
"""
def getNowDay(current):
bsr_start_day = "2023-06-26"
bsr_end_day = "2023-09-21"
job_start_day = "2023-09-20"
# now = CommonUtil.format_now("%Y-%m-%d")
# 每天跑10个任务
day_num = 10
offset = CommonUtil.calculate_date_offset(job_start_day, current)
now_end_day = CommonUtil.get_day_offset(bsr_end_day, (day_num * offset + 1) * -1)
now_start_day = CommonUtil.get_day_offset(bsr_end_day, (day_num * offset + day_num) * -1)
if now_start_day < bsr_start_day:
now_start_day = bsr_start_day
if now_start_day >= now_end_day:
return None
return (now_start_day, now_end_day)
if __name__ == '__main__':
now = CommonUtil.format_now("%Y-%m-%d")
val = getNowDay(now)
if not val:
CommonUtil.send_wx_msg(["wujicang"], "提醒", f"重跑 dwd_bsr_asin_rank任务无需进行,请手动关闭")
print("程序结束")
else:
now_start_day, now_end_day = val
print(f"重新计算【{now_start_day}】-【{now_end_day}】数据中")
tmp_day = now_end_day
client = SSHUtil.get_ssh_client()
while tmp_day >= now_start_day:
path_exist = HdfsUtils.path_exist(CommonUtil.build_hdfs_path("dwd_bsr_asin_rank", {
"site_name": "us",
"date_type": "last30day",
"date_info": tmp_day
}))
if path_exist:
cmd1 = f"""
/opt/module/spark/bin/spark-submit \
--master yarn \
--driver-memory 2g \
--executor-memory 10g \
--executor-cores 4 \
--num-executors 25 \
--queue default \
/opt/module/spark/demo/py_demo/dwd/dwd_bsr_asin_rank.py us {tmp_day}
"""
SSHUtil.exec_command_async(client, cmd1, True)
cmd2 = f"""
/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_export/dwd_bsr_asin_rank.py us {tmp_day}
"""
SSHUtil.exec_command_async(client, cmd2, True)
tmp_day = CommonUtil.get_day_offset(tmp_day, -1)
CommonUtil.send_wx_msg(["wujicang"], "提醒", f"重跑 dwd_bsr_asin_rank并导出【{now_start_day}】-【{now_end_day}】成功")
client.close()
pass