import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.ssh_util import SSHUtil from utils.DolphinschedulerHelper import DolphinschedulerHelper from utils.db_util import DBUtil from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils def check_By_offsetDay(offset: int): day = CommonUtil.get_sys_arg(1, CommonUtil.format_now("%Y-%m-%d", )) site_names = ["us", "uk", "de"] start_tasks = [] export_tasks = [] err = [] for day_offset in range(0, offset): day_check = CommonUtil.get_day_offset(day, -day_offset) for type in ['bsr', 'nsr']: for site_name in site_names: engine = DBUtil.get_db_engine("mysql", "us") with engine.connect() as connection: sql = f""" select * from workflow_everyday where site_name ='{site_name}' and table_name='{site_name}_{type}_asin_rank' and report_date ='{day_check}' limit 1; """ size = len(list(connection.execute(sql))) if size == 0: print(f"{site_name} {type} {day_check} 日 不存在 数据!!!") # 判断是否只是未导出 path_exist = HdfsUtils.path_exist(CommonUtil.build_hdfs_path(f"dwd_{type}_asin_rank", { "site_name": site_name, "date_type": "last30day", "date_info": day_check, })) if path_exist: export_tasks.append(f""" /opt/module/anaconda3/envs/pyspark/bin/python3.8 /opt/module/spark/demo/py_demo/sqoop_export/dwd_bsr_nsr_asin_rank_day.py {site_name} {day_check} {type} """) # 导出 pass else: err.append(f"{site_name} {type} {day_check} 日 不存在 数据!!!") # 只有当天的才重新计算 if day_check == day: # err = err + f"{type}【{site_name},{day_check}】未计算!" start_tasks.append({ "process_df_name": f"asin_{type}榜单统计日流程_api", "startParams": {"site_name": site_name, "date_info": day_check, "wx_user": "wujicang"} }) pass # 有异常 if len(err) > 0 and len(start_tasks) > 0: print(err) CommonUtil.send_wx_msg(["wujicang", "pengyanbing"], "严重警告", ";".join(err)) pass for task in start_tasks: DolphinschedulerHelper.start_and_watch_process_instance( project_name="big_data_selection", process_df_name=task['process_df_name'], startParams=task['startParams'], ) for task in export_tasks: client = SSHUtil.get_ssh_client() SSHUtil.exec_command_async(client, task, ignore_err=False) print(task) pass if __name__ == '__main__': # 明天晚上检查数据并导出 if CommonUtil.format_now("%Y-%m-%d", ) == '2024-05-18': check_By_offsetDay(60) else: check_By_offsetDay(1)