vertify_dwt_flow_asin.py 6.69 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159
import os
import sys
import json

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    assert site_name is not None, "site_name 不能为空!"
    assert date_type is not None, "date_type 不能为空!"
    assert date_info is not None, "date_info 不能为空!"

    hive_table = f"dwt_flow_asin"
    partition_dict = {
        "site_name": site_name,
        "date_type": date_type,
        "date_info": date_info
    }

    # 重点字段阈值预警
    # CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)

    # 获取计算分区
    msg_params = ""
    # 解析partition_dict获取分区查询条件
    partition_conditions = []
    for key, value in partition_dict.items():
        if value is not None:
            msg_params += f"{value} "
            partition_conditions.append(f"{key} = '{value}'")
    base_msg = f"{hive_table} {msg_params} "
    site_name = partition_dict.get("site_name")
    date_type = partition_dict.get("date_type")
    spark_session = SparkUtil.get_spark_sessionV3("check_fields_rule")
    # 获取维护的字段验证配置表数据
    config_table_query = f"""select * from hive_field_verify_config 
                                        where table_name ='{hive_table}' 
                                        and site_name = '{site_name}'
                                        and use_flag = 1 """
    conn_info = DBUtil.get_connection_info('postgresql', 'us')
    check_field_df = SparkUtil.read_jdbc_query(
        session=spark_session,
        url=conn_info["url"],
        pwd=conn_info["pwd"],
        username=conn_info["username"],
        query=config_table_query
    )
    # 获取验证消息
    check_field_list = check_field_df.select('field_name', 'verify_desc', 'verify_type', 'config_json',
                                             'msg_usr_list').collect()
    if not check_field_list:
        print("============================无验证匹配条件跳过验证===================================")
        exit()
    # 创建一个df用于储存验证情况
    # 定义列的结构
    schema = StructType([
        StructField("验证描述", StringType(), True),
        StructField("验证类型", StringType(), True),
        StructField("校验字段", StringType(), True),
        StructField("校验条件查询占比", StringType(), True),
        StructField("验证占比临界值上限", StringType(), True),
        StructField("验证占比临界值下限", StringType(), True),
        StructField("是否验证通过", IntegerType(), True),

    ])

    # 使用定义的结构创建空的 DataFrame
    check_df = spark_session.createDataFrame([], schema)

    # 进行验证sql组装
    query = f"""
                SELECT COUNT(1) AS total_count
                FROM {hive_table}
            """
    # 拼接where条件
    if partition_conditions:
        query_total = query + f" WHERE {' AND '.join(partition_conditions)}"

    # print('query_total:', query_total)

    # 执行sql获取验证值与df
    total_df = spark_session.sql(query_total).cache()
    total_count = int(total_df.collect()[0]['total_count'])

    for row in check_field_list:
        vertify_flag = True
        field_name = row['field_name']
        verify_type = row['verify_type']
        config_json = json.loads(row['config_json'])
        msg_usr = row['msg_usr_list']
        msg_usr_list = [user.strip() for user in msg_usr.split(",")] if msg_usr else []
        sql_condition = config_json['sql_condition']
        partition_conf_list = config_json['partition_conf']

        for conf in partition_conf_list:
            conf_site_name = conf["site_name"]
            conf_date_type = conf["date_type"]
            if site_name == conf_site_name and date_type == conf_date_type:
                vertify_flag = True
                break
            else:
                vertify_flag = False
        # assert base_rate is not None, f"未配置{field_name}验证周期{date_type}的基准值,请检查!"

        # 没有合适的匹配维度
        if not vertify_flag:
            break

        # 拼接外部查询条件
        if sql_condition:
            query_field_check = query_total + f" AND {sql_condition} "

        # print('query_total:', query_field_check)

        # # 执行sql获取验证值与df
        # total_df = spark_session.sql(query_total).cache()
        # total_count = int(total_df.collect()[0]['total_count'])

        check_count_df = spark_session.sql(query_field_check).cache()
        check_count = int(check_count_df.collect()[0]['total_count'])

        calcult_rate = round((check_count / total_count), 3)

        waring_max = conf['max_rate']
        waring_min = conf['min_rate']
        verify_flag = 1 if (calcult_rate <= waring_max) and (calcult_rate >= waring_min) else 0
        ratio_df = spark_session.createDataFrame([(row['verify_desc'],verify_type,field_name,calcult_rate,waring_max,waring_min,verify_flag)],schema).repartition(1)
        # ratio_df = check_df.select(
        #     F.lit(row['verify_desc']).alias('验证描述'),
        #     F.lit(verify_type).alias('验证类型'),
        #     F.lit(field_name).alias('校验字段'),
        #     F.lit(calcult_rate).alias('校验条件查询占比'),
        #     F.lit(waring_max).alias('验证占比临界值上限'),
        #     F.lit(waring_min).alias('验证占比临界值下限'),
        #     F.lit(verify_flag).alias('是否验证通过')
        # )
        check_df = check_df.unionByName(ratio_df, False)

    if check_df.count() < 1 :
        print("无验证项验证")
        exit()
    check_df.show(50, truncate=False)

    # 对校验结果进行判断是否有校验不通过的数据
    # print(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
    schema_flag = bool(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
    # print(schema_flag)
    if not schema_flag:
        msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!"
wangrui committed
160
        CommonUtil.send_wx_msg(['chenjianyun'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg)
chenyuanjie committed
161 162
        spark_session.stop()
    pass