1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
import os
import sys
import json
sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
if __name__ == '__main__':
site_name = CommonUtil.get_sys_arg(1, None)
date_type = CommonUtil.get_sys_arg(2, None)
date_info = CommonUtil.get_sys_arg(3, None)
assert site_name is not None, "site_name 不能为空!"
assert date_type is not None, "date_type 不能为空!"
assert date_info is not None, "date_info 不能为空!"
hive_table = f"dwt_flow_asin"
partition_dict = {
"site_name": site_name,
"date_type": date_type,
"date_info": date_info
}
# 重点字段阈值预警
# CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)
# 获取计算分区
msg_params = ""
# 解析partition_dict获取分区查询条件
partition_conditions = []
for key, value in partition_dict.items():
if value is not None:
msg_params += f"{value} "
partition_conditions.append(f"{key} = '{value}'")
base_msg = f"{hive_table} {msg_params} "
site_name = partition_dict.get("site_name")
date_type = partition_dict.get("date_type")
spark_session = SparkUtil.get_spark_sessionV3("check_fields_rule")
# 获取维护的字段验证配置表数据
config_table_query = f"""select * from hive_field_verify_config
where table_name ='{hive_table}'
and site_name = '{site_name}'
and use_flag = 1 """
conn_info = DBUtil.get_connection_info('postgresql', 'us')
check_field_df = SparkUtil.read_jdbc_query(
session=spark_session,
url=conn_info["url"],
pwd=conn_info["pwd"],
username=conn_info["username"],
query=config_table_query
)
# 获取验证消息
check_field_list = check_field_df.select('field_name', 'verify_desc', 'verify_type', 'config_json',
'msg_usr_list').collect()
if not check_field_list:
print("============================无验证匹配条件跳过验证===================================")
exit()
# 创建一个df用于储存验证情况
# 定义列的结构
schema = StructType([
StructField("验证描述", StringType(), True),
StructField("验证类型", StringType(), True),
StructField("校验字段", StringType(), True),
StructField("校验条件查询占比", StringType(), True),
StructField("验证占比临界值上限", StringType(), True),
StructField("验证占比临界值下限", StringType(), True),
StructField("是否验证通过", IntegerType(), True),
])
# 使用定义的结构创建空的 DataFrame
check_df = spark_session.createDataFrame([], schema)
# 进行验证sql组装
query = f"""
SELECT COUNT(1) AS total_count
FROM {hive_table}
"""
# 拼接where条件
if partition_conditions:
query_total = query + f" WHERE {' AND '.join(partition_conditions)}"
# print('query_total:', query_total)
# 执行sql获取验证值与df
total_df = spark_session.sql(query_total).cache()
total_count = int(total_df.collect()[0]['total_count'])
for row in check_field_list:
vertify_flag = True
field_name = row['field_name']
verify_type = row['verify_type']
config_json = json.loads(row['config_json'])
msg_usr = row['msg_usr_list']
msg_usr_list = [user.strip() for user in msg_usr.split(",")] if msg_usr else []
sql_condition = config_json['sql_condition']
partition_conf_list = config_json['partition_conf']
for conf in partition_conf_list:
conf_site_name = conf["site_name"]
conf_date_type = conf["date_type"]
if site_name == conf_site_name and date_type == conf_date_type:
vertify_flag = True
break
else:
vertify_flag = False
# assert base_rate is not None, f"未配置{field_name}验证周期{date_type}的基准值,请检查!"
# 没有合适的匹配维度
if not vertify_flag:
break
# 拼接外部查询条件
if sql_condition:
query_field_check = query_total + f" AND {sql_condition} "
# print('query_total:', query_field_check)
# # 执行sql获取验证值与df
# total_df = spark_session.sql(query_total).cache()
# total_count = int(total_df.collect()[0]['total_count'])
check_count_df = spark_session.sql(query_field_check).cache()
check_count = int(check_count_df.collect()[0]['total_count'])
calcult_rate = round((check_count / total_count), 3)
waring_max = conf['max_rate']
waring_min = conf['min_rate']
verify_flag = 1 if (calcult_rate <= waring_max) and (calcult_rate >= waring_min) else 0
ratio_df = spark_session.createDataFrame([(row['verify_desc'],verify_type,field_name,calcult_rate,waring_max,waring_min,verify_flag)],schema).repartition(1)
# ratio_df = check_df.select(
# F.lit(row['verify_desc']).alias('验证描述'),
# F.lit(verify_type).alias('验证类型'),
# F.lit(field_name).alias('校验字段'),
# F.lit(calcult_rate).alias('校验条件查询占比'),
# F.lit(waring_max).alias('验证占比临界值上限'),
# F.lit(waring_min).alias('验证占比临界值下限'),
# F.lit(verify_flag).alias('是否验证通过')
# )
check_df = check_df.unionByName(ratio_df, False)
if check_df.count() < 1 :
print("无验证项验证")
exit()
check_df.show(50, truncate=False)
# 对校验结果进行判断是否有校验不通过的数据
# print(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
schema_flag = bool(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
# print(schema_flag)
if not schema_flag:
msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!"
CommonUtil.send_wx_msg(['chenjianyun', 'wangrui4'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg)
spark_session.stop()
pass