import os import sys import json sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from utils.common_util import CommonUtil from utils.spark_util import SparkUtil from pyspark.sql import functions as F from pyspark.sql.types import StructType, StructField, StringType, IntegerType if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) assert site_name is not None, "site_name 不能为空!" assert date_type is not None, "date_type 不能为空!" assert date_info is not None, "date_info 不能为空!" hive_table = f"dwt_flow_asin" partition_dict = { "site_name": site_name, "date_type": date_type, "date_info": date_info } # 获取计算分区 msg_params = "" # 解析partition_dict获取分区查询条件 partition_conditions = [] for key, value in partition_dict.items(): if value is not None: msg_params += f"{value} " partition_conditions.append(f"{key} = '{value}'") base_msg = f"{hive_table} {msg_params} " site_name = partition_dict.get("site_name") date_type = partition_dict.get("date_type") spark_session = SparkUtil.get_spark_sessionV3("check_fields_rule") # 获取维护的字段验证配置表数据 config_table_query = f"""select * from hive_field_verify_config where table_name ='{hive_table}' and site_name = '{site_name}' and use_flag = 1 """ conn_info = DBUtil.get_connection_info('postgresql', 'us') check_field_df = SparkUtil.read_jdbc_query( session=spark_session, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=config_table_query ) # 获取验证消息 check_field_list = check_field_df.select('field_name', 'verify_desc', 'verify_type', 'config_json', 'msg_usr_list').collect() if not check_field_list: print("============================无验证匹配条件跳过验证===================================") exit() # 创建一个df用于储存验证情况 # 定义列的结构 schema = StructType([ StructField("验证描述", StringType(), True), StructField("验证类型", StringType(), True), StructField("校验字段", StringType(), True), StructField("校验条件查询占比", StringType(), True), StructField("验证占比临界值上限", StringType(), True), StructField("验证占比临界值下限", StringType(), True), StructField("是否验证通过", IntegerType(), True), ]) # 使用定义的结构创建空的 DataFrame check_df = spark_session.createDataFrame([], schema) # 进行验证sql组装 query = f""" SELECT COUNT(1) AS total_count FROM {hive_table} """ # 拼接where条件 if partition_conditions: query_total = query + f" WHERE {' AND '.join(partition_conditions)}" # 执行sql获取验证值与df total_df = spark_session.sql(query_total).cache() total_count = int(total_df.collect()[0]['total_count']) for row in check_field_list: vertify_flag = True field_name = row['field_name'] verify_type = row['verify_type'] config_json = json.loads(row['config_json']) msg_usr = row['msg_usr_list'] msg_usr_list = [user.strip() for user in msg_usr.split(",")] if msg_usr else [] sql_condition = config_json['sql_condition'] partition_conf_list = config_json['partition_conf'] for conf in partition_conf_list: conf_site_name = conf["site_name"] conf_date_type = conf["date_type"] if site_name == conf_site_name and date_type == conf_date_type: vertify_flag = True break else: vertify_flag = False # 没有合适的匹配维度 if not vertify_flag: break # 拼接外部查询条件 if sql_condition: query_field_check = query_total + f" AND {sql_condition} " check_count_df = spark_session.sql(query_field_check).cache() check_count = int(check_count_df.collect()[0]['total_count']) calcult_rate = round((check_count / total_count), 3) waring_max = conf['max_rate'] waring_min = conf['min_rate'] verify_flag = 1 if (calcult_rate <= waring_max) and (calcult_rate >= waring_min) else 0 ratio_df = spark_session.createDataFrame([(row['verify_desc'],verify_type,field_name,calcult_rate,waring_max,waring_min,verify_flag)],schema).repartition(1) check_df = check_df.unionByName(ratio_df, False) if check_df.count() < 1 : print("无验证项验证") exit() check_df.show(50, truncate=False) # 对校验结果进行判断是否有校验不通过的数据 schema_flag = bool(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result']) if not schema_flag: msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!" CommonUtil.send_wx_msg(['chenjianyun'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg) spark_session.stop() pass