1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# author : wangrui
# data : 2023/8/4 16:54
import os
import sys
import datetime as DT
import time
import pandas as pd
import concurrent.futures
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
import subprocess
def scan_delayed_export_commands():
export_missions_dict = {}
engine = DBUtil.get_db_engine('postgresql_cluster', 'us')
sql = f"""
select a.commands as commands, a.principal as principal
from export_command_records a
inner join
(select min(priority) as min_priority from export_command_records where status = 1) b
on a.priority = b.min_priority and a.status =1
"""
with engine.connect() as connection:
reslut = connection.execute(sql)
for row in reslut:
export_missions_dict[row.commands] = row.principal
connection.close()
return export_missions_dict
def execute_missions_with_os(cmd):
process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
result, error = process.communicate()
return cmd, process.returncode, result, error
def execute_export_missions(export_missions_dict):
if len(export_missions_dict) > 0:
all_export_missions_list = list(export_missions_dict.keys())
print("============================当前批次的导出命令如下===================================")
for mission in all_export_missions_list:
print(mission)
for export_missions in all_export_missions_list:
sql = f"""
update export_command_records set status = 2, updated_at = CURRENT_TIMESTAMP where commands = '{export_missions}' and status = 1
"""
DBUtil.exec_sql('postgresql_cluster', 'us', sql)
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(execute_missions_with_os, cmd): cmd for cmd in all_export_missions_list}
for future in concurrent.futures.as_completed(futures):
cmd = futures[future]
principal = str(export_missions_dict.get(cmd))
failure_msg = cmd.split('/')[-1]
person_in_charge = principal.split(",")
user_list = ['chenjianyun', 'fangxingjun', 'wujicang', 'pengyanbing', 'chenyuanjie', 'leichao', 'chenbo']
users = person_in_charge + user_list
try:
result = future.result()
if not result[1]:
print(f"Command:'{cmd}'执行成功")
print(f"执行信息为:{result[2].decode()}")
sql = f"""
update export_command_records set status = 3, updated_at = CURRENT_TIMESTAMP where commands = '{cmd}' and status = 2
"""
else:
print(f"Command:'{cmd}'执行失败")
print(f"失败信息:{result[3].decode()}")
sql = f"""
update export_command_records set status = 4, updated_at = CURRENT_TIMESTAMP where commands = '{cmd}' and status = 2
"""
CommonUtil.send_wx_msg(users, "\u26A0 数据导出异常", f"任务信息:{failure_msg}\n负责人:{person_in_charge}")
DBUtil.exec_sql('postgresql_cluster', 'us', sql)
except Exception as e:
print(f"导出任务未能正常执行: {e}")
CommonUtil.send_wx_msg(users, "\u26A0 数据导出异常", f"任务信息:{failure_msg}\n负责人:{person_in_charge}")
print("============================该批次导出任务执行完毕===================================")
def is_in_time_range(start_time, end_time, check_time):
if start_time <= end_time:
return start_time <= check_time <= end_time
else:
return start_time <= check_time or check_time <= end_time
def judge_mission_is_execute(current_time, export_missions_dict):
all_export_missions_list = list(export_missions_dict.keys())
ignore_mission_name = 'dwt_st_asin_reverse.py'
ignore_mission_list = [export_mission for export_mission in all_export_missions_list if ignore_mission_name in export_mission]
if current_time >= DT.time(4, 0) and DT.datetime.now().weekday() < 5 and len(ignore_mission_list) > 0:
for ingore_mission in ignore_mission_list:
export_missions_dict.pop(ingore_mission)
if __name__ == '__main__':
while True:
start_time = DT.time(19, 0) #下午19:00
end_time = DT.time(8, 0) #次日早上8:00
current_time = DT.datetime.now().time()
if is_in_time_range(start_time, end_time, current_time) or (DT.datetime.now().weekday() >= 5):
export_missions_dict = scan_delayed_export_commands()
if len(export_missions_dict) == 0:
print("当前没有需要导出的任务,15min后继续扫描")
time.sleep(900) #休眠15分钟
else:
judge_mission_is_execute(current_time, export_missions_dict)
execute_export_missions(export_missions_dict)
else:
print("时间不满足,暂不进行数据导出操作")
sys.exit(0)