1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import os
import sys
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.db_util import DbTypes, DBUtil
from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
class TemplatesRerun(object):
def __init__(self):
self.site_name = str() # 站点
self.date_type = str() # 时间维度
self.date_info = str() # 时间分区
self.spark = None # spark程序的执行入口对象
self.db_name = "big_data_selection"
self.partitions_num = int() # df数据对象的分区数
self.partitions_by = list() # hive分区表对应的分区
self.partitions_dict = dict() # hive分区表对应的分区字典
self.engine_mysql = DBUtil.get_db_engine(db_type=DbTypes.mysql.name, site_name="us") # mysql连接
self.rerun_range = str() # 需重跑的范围
self.page = str() # 需重跑的模块
self.db_save = str() # 需重跑的表名
self.rerun_field = str() # 需重跑的字段
self.task_owner = str() # 重跑任务负责人
self.rerun_reason = str() # 重跑原因
self.rerun_reason_description = str() # 重跑原因详情
# 定义df对象
self.df_history_data = object()
self.df_save = object()
def rerun(self):
"""重跑模板"""
self.rerun_read_data()
self.rerun_handle_data()
self.rerun_save_data()
def rerun_read_data(self):
# 向mysql表hive_task_rerun_log插入重跑记录
with self.engine_mysql.connect() as connection:
sql = f"""
replace into hive_task_rerun_log (
site_name, date_type, rerun_range, page, table_name, field_name, task_owner, rerun_reason, rerun_reason_description, is_finished
)
values (
'{self.site_name}', '{self.date_type}', '{self.rerun_range}', '{self.page}', '{self.db_save}',
'{self.rerun_field}', '{self.task_owner}', '{self.rerun_reason}', '{self.rerun_reason_description}', 0
);
"""
connection.execute(sql)
# 拼接分区条件
partition_conditions = []
for key, value in self.partitions_dict.items():
partition_conditions.append(f"{key} = '{value}'")
query_condition = f"WHERE {' AND '.join(partition_conditions)}"
# 查询历史数据
sql = f"""
select * from {self.db_save} {query_condition};
"""
self.df_history_data = self.spark.sql(sql).drop(self.rerun_field).repartition(self.partitions_num).cache()
print("重跑前历史数据如下:")
self.df_history_data.show(10, True)
def rerun_handle_data(self):
# todo
pass
def rerun_save_data(self):
if self.df_history_data.count() == self.df_save.count():
print(f"当前存储的表名为:{self.db_save}, 分区为{self.partitions_by}")
hdfs_path = CommonUtil.build_hdfs_path(self.db_save, self.partitions_dict)
HdfsUtils.delete_file_in_folder(hdfs_path)
self.df_save.repartition(self.partitions_num).write.saveAsTable(name=self.db_save, format='hive', mode='append', partitionBy=self.partitions_by)
print("重跑成功!")
# 更新重跑日志
with self.engine_mysql.connect() as connection:
sql = f"""
replace into hive_task_rerun_log (
site_name, date_type, rerun_range, page, table_name, field_name, task_owner, rerun_reason, rerun_reason_description, is_finished
)
values (
'{self.site_name}', '{self.date_type}', '{self.rerun_range}', '{self.page}', '{self.db_save}',
'{self.rerun_field}', '{self.task_owner}', '{self.rerun_reason}', '{self.rerun_reason_description}', 1
);
"""
connection.execute(sql)
else:
print("重跑失败!")
exit()