# author : wangrui # data : 2023/8/4 16:54 import os import sys import datetime as DT import time import pandas as pd import concurrent.futures sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.common_util import CommonUtil from utils.db_util import DBUtil import subprocess def scan_delayed_export_commands(): export_missions_dict = {} engine = DBUtil.get_db_engine('postgresql_cluster', 'us') sql = f""" select a.commands as commands, a.principal as principal from export_command_records a inner join (select min(priority) as min_priority from export_command_records where status = 1) b on a.priority = b.min_priority and a.status =1 """ with engine.connect() as connection: reslut = connection.execute(sql) for row in reslut: export_missions_dict[row.commands] = row.principal connection.close() return export_missions_dict def execute_missions_with_os(cmd): process = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) result, error = process.communicate() return cmd, process.returncode, result, error def execute_export_missions(export_missions_dict): if len(export_missions_dict) > 0: all_export_missions_list = list(export_missions_dict.keys()) print("============================当前批次的导出命令如下===================================") for mission in all_export_missions_list: print(mission) for export_missions in all_export_missions_list: sql = f""" update export_command_records set status = 2, updated_at = CURRENT_TIMESTAMP where commands = '{export_missions}' and status = 1 """ DBUtil.exec_sql('postgresql_cluster', 'us', sql) with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(execute_missions_with_os, cmd): cmd for cmd in all_export_missions_list} for future in concurrent.futures.as_completed(futures): cmd = futures[future] principal = str(export_missions_dict.get(cmd)) failure_msg = cmd.split('/')[-1] person_in_charge = principal.split(",") user_list = ['chenjianyun', 'fangxingjun', 'wujicang', 'pengyanbing', 'chenyuanjie', 'leichao', 'chenbo'] users = person_in_charge + user_list try: result = future.result() if not result[1]: print(f"Command:'{cmd}'执行成功") print(f"执行信息为:{result[2].decode()}") sql = f""" update export_command_records set status = 3, updated_at = CURRENT_TIMESTAMP where commands = '{cmd}' and status = 2 """ else: print(f"Command:'{cmd}'执行失败") print(f"失败信息:{result[3].decode()}") sql = f""" update export_command_records set status = 4, updated_at = CURRENT_TIMESTAMP where commands = '{cmd}' and status = 2 """ CommonUtil.send_wx_msg(users, "\u26A0 数据导出异常", f"任务信息:{failure_msg}\n负责人:{person_in_charge}") DBUtil.exec_sql('postgresql_cluster', 'us', sql) except Exception as e: print(f"导出任务未能正常执行: {e}") CommonUtil.send_wx_msg(users, "\u26A0 数据导出异常", f"任务信息:{failure_msg}\n负责人:{person_in_charge}") print("============================该批次导出任务执行完毕===================================") def is_in_time_range(start_time, end_time, check_time): if start_time <= end_time: return start_time <= check_time <= end_time else: return start_time <= check_time or check_time <= end_time def judge_mission_is_execute(current_time, export_missions_dict): all_export_missions_list = list(export_missions_dict.keys()) ignore_mission_name = 'dwt_st_asin_reverse.py' ignore_mission_list = [export_mission for export_mission in all_export_missions_list if ignore_mission_name in export_mission] if current_time >= DT.time(4, 0) and DT.datetime.now().weekday() < 5 and len(ignore_mission_list) > 0: for ingore_mission in ignore_mission_list: export_missions_dict.pop(ingore_mission) if __name__ == '__main__': while True: start_time = DT.time(19, 0) #下午19:00 end_time = DT.time(8, 0) #次日早上8:00 current_time = DT.datetime.now().time() if is_in_time_range(start_time, end_time, current_time) or (DT.datetime.now().weekday() >= 5): export_missions_dict = scan_delayed_export_commands() if len(export_missions_dict) == 0: print("当前没有需要导出的任务,15min后继续扫描") time.sleep(900) #休眠15分钟 else: judge_mission_is_execute(current_time, export_missions_dict) execute_export_missions(export_missions_dict) else: print("时间不满足,暂不进行数据导出操作") sys.exit(0)