1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.DolphinschedulerHelper import DolphinschedulerHelper
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
def check_By_offsetDay(offset: int):
day = CommonUtil.get_sys_arg(1, CommonUtil.format_now("%Y-%m-%d", ))
site_names = ["us", "uk", "de"]
start_tasks = []
export_tasks = []
err = []
for day_offset in range(0, offset):
day_check = CommonUtil.get_day_offset(day, -day_offset)
for type in ['bsr', 'nsr']:
for site_name in site_names:
engine = DBUtil.get_db_engine("mysql", "us")
with engine.connect() as connection:
sql = f"""
select * from workflow_everyday
where site_name ='{site_name}'
and table_name='{site_name}_{type}_asin_rank'
and report_date ='{day_check}' limit 1;
"""
size = len(list(connection.execute(sql)))
if size == 0:
print(f"{site_name} {type} {day_check} 日 不存在 数据!!!")
# 判断是否只是未导出
path_exist = HdfsUtils.path_exist(CommonUtil.build_hdfs_path(f"dwd_{type}_asin_rank", {
"site_name": site_name,
"date_type": "last30day",
"date_info": day_check,
}))
if path_exist:
export_tasks.append(f"""
/opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_export/dwd_bsr_nsr_asin_rank_day.py {site_name} {day_check} {type}
""")
# 导出
pass
else:
err.append(f"{site_name} {type} {day_check} 日 不存在 数据!!!")
# 只有当天的才重新计算
if day_check == day:
# err = err + f"{type}【{site_name},{day_check}】未计算!"
start_tasks.append({
"process_df_name": f"asin_{type}榜单统计日流程_api",
"startParams": {"site_name": site_name, "date_info": day_check, "wx_user": "wujicang"}
})
pass
# 有异常
if len(err) > 0 and len(start_tasks) > 0:
print(err)
CommonUtil.send_wx_msg(["wujicang", "pengyanbing"], "严重警告", ";".join(err))
pass
for task in start_tasks:
DolphinschedulerHelper.start_and_watch_process_instance(
project_name="big_data_selection",
process_df_name=task['process_df_name'],
startParams=task['startParams'],
)
for task in export_tasks:
client = SSHUtil.get_ssh_client()
SSHUtil.exec_command_async(client, task, ignore_err=False)
print(task)
pass
if __name__ == '__main__':
# 明天晚上检查数据并导出
if CommonUtil.format_now("%Y-%m-%d", ) == '2024-05-18':
check_By_offsetDay(60)
else:
check_By_offsetDay(1)