import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.db_util import DbTypes, DBUtil from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil class TemplatesRerun(object): def __init__(self): self.site_name = str() # 站点 self.date_type = str() # 时间维度 self.date_info = str() # 时间分区 self.spark = None # spark程序的执行入口对象 self.db_name = "big_data_selection" self.partitions_num = int() # df数据对象的分区数 self.partitions_by = list() # hive分区表对应的分区 self.partitions_dict = dict() # hive分区表对应的分区字典 self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us") # mysql连接 self.rerun_range = str() # 需重跑的范围 self.page = str() # 需重跑的模块 self.db_save = str() # 需重跑的表名 self.rerun_field = str() # 需重跑的字段 self.task_owner = str() # 重跑任务负责人 self.rerun_reason = str() # 重跑原因 self.rerun_reason_description = str() # 重跑原因详情 # 定义df对象 self.df_history_data = object() self.df_save = object() def rerun(self): """重跑模板""" self.rerun_read_data() self.rerun_handle_data() self.rerun_save_data() def rerun_read_data(self): # 向mysql表hive_task_rerun_log插入重跑记录 with self.engine_mysql.connect() as connection: sql = f""" replace into hive_task_rerun_log ( site_name, date_type, rerun_range, page, table_name, field_name, task_owner, rerun_reason, rerun_reason_description, is_finished ) values ( '{self.site_name}', '{self.date_type}', '{self.rerun_range}', '{self.page}', '{self.db_save}', '{self.rerun_field}', '{self.task_owner}', '{self.rerun_reason}', '{self.rerun_reason_description}', 0 ); """ connection.execute(sql) # 拼接分区条件 partition_conditions = [] for key, value in self.partitions_dict.items(): partition_conditions.append(f"{key} = '{value}'") query_condition = f"WHERE {' AND '.join(partition_conditions)}" # 查询历史数据 sql = f""" select * from {self.db_save} {query_condition}; """ self.df_history_data = self.spark.sql(sql).drop(self.rerun_field).repartition(self.partitions_num).cache() print("重跑前历史数据如下:") self.df_history_data.show(10, True) def rerun_handle_data(self): # todo pass def rerun_save_data(self): if self.df_history_data.count() == self.df_save.count(): print(f"当前存储的表名为:{self.db_save}, 分区为{self.partitions_by}") hdfs_path = CommonUtil.build_hdfs_path(self.db_save, self.partitions_dict) HdfsUtils.delete_file_in_folder(hdfs_path) self.df_save.repartition(self.partitions_num).write.saveAsTable(name=self.db_save, format='hive', mode='append', partitionBy=self.partitions_by) print("重跑成功!") # 更新重跑日志 with self.engine_mysql.connect() as connection: sql = f""" replace into hive_task_rerun_log ( site_name, date_type, rerun_range, page, table_name, field_name, task_owner, rerun_reason, rerun_reason_description, is_finished ) values ( '{self.site_name}', '{self.date_type}', '{self.rerun_range}', '{self.page}', '{self.db_save}', '{self.rerun_field}', '{self.task_owner}', '{self.rerun_reason}', '{self.rerun_reason_description}', 1 ); """ connection.execute(sql) else: print("重跑失败!") exit()