templates_rerun.py 3.94 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录

from utils.db_util import DbTypes, DBUtil
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil



class TemplatesRerun(object):
    def __init__(self):
        self.site_name = str()  # 站点
        self.date_type = str()  # 时间维度
        self.date_info = str()  # 时间分区
        self.spark = None  # spark程序的执行入口对象
        self.db_name = "big_data_selection"
        self.partitions_num = int()  # df数据对象的分区数
        self.partitions_by = list()  # hive分区表对应的分区
        self.partitions_dict = dict()  # hive分区表对应的分区字典
        self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us")  # mysql连接
        self.rerun_range = str()  # 需重跑的范围
        self.page = str()  # 需重跑的模块
        self.db_save = str()  # 需重跑的表名
        self.rerun_field = str()  # 需重跑的字段
        self.task_owner = str()  # 重跑任务负责人
        self.rerun_reason = str()  # 重跑原因
        self.rerun_reason_description = str()  # 重跑原因详情
        # 定义df对象
        self.df_history_data = object()
        self.df_save = object()

    def rerun(self):
        """重跑模板"""
        self.rerun_read_data()
        self.rerun_handle_data()
        self.rerun_save_data()

    def rerun_read_data(self):
        # 向mysql表hive_task_rerun_log插入重跑记录
        with self.engine_mysql.connect() as connection:
            sql = f"""
            replace into hive_task_rerun_log (
            site_name, date_type, rerun_range, page, table_name, field_name, task_owner, rerun_reason, rerun_reason_description, is_finished
            )
            values (
            '{self.site_name}', '{self.date_type}', '{self.rerun_range}', '{self.page}', '{self.db_save}', 
            '{self.rerun_field}', '{self.task_owner}', '{self.rerun_reason}', '{self.rerun_reason_description}', 0
            );
            """
            connection.execute(sql)

        # 拼接分区条件
        partition_conditions = []
        for key, value in self.partitions_dict.items():
            partition_conditions.append(f"{key} = '{value}'")
        query_condition = f"WHERE {' AND '.join(partition_conditions)}"

        # 查询历史数据
        sql = f"""
        select * from {self.db_save} {query_condition};
        """
        self.df_history_data = self.spark.sql(sql).drop(self.rerun_field).repartition(self.partitions_num).cache()
        print("重跑前历史数据如下:")
        self.df_history_data.show(10, True)

    def rerun_handle_data(self):
        # todo
        pass

    def rerun_save_data(self):
        if self.df_history_data.count() == self.df_save.count():
            print(f"当前存储的表名为:{self.db_save}, 分区为{self.partitions_by}")
            hdfs_path = CommonUtil.build_hdfs_path(self.db_save, self.partitions_dict)
            HdfsUtils.delete_file_in_folder(hdfs_path)
            self.df_save.repartition(self.partitions_num).write.saveAsTable(name=self.db_save, format='hive', mode='append', partitionBy=self.partitions_by)
            print("重跑成功!")
            # 更新重跑日志
            with self.engine_mysql.connect() as connection:
                sql = f"""
                replace into hive_task_rerun_log (
                site_name, date_type, rerun_range, page, table_name, field_name, task_owner, rerun_reason, rerun_reason_description, is_finished
                )
                values (
                '{self.site_name}', '{self.date_type}', '{self.rerun_range}', '{self.page}', '{self.db_save}', 
                '{self.rerun_field}', '{self.task_owner}', '{self.rerun_reason}', '{self.rerun_reason_description}', 1
                );
                """
                connection.execute(sql)
        else:
            print("重跑失败!")
            exit()