vertify_dwt_flow_asin.py 6.7 KB
import os
import sys
import json

sys.path.append(os.path.dirname(sys.path[0]))
from utils.ssh_util import SSHUtil
from utils.db_util import DBUtil
from utils.common_util import CommonUtil
from utils.common_util import DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    assert site_name is not None, "site_name 不能为空!"
    assert date_type is not None, "date_type 不能为空!"
    assert date_info is not None, "date_info 不能为空!"

    hive_table = f"dwt_flow_asin"
    partition_dict = {
        "site_name": site_name,
        "date_type": date_type,
        "date_info": date_info
    }

    # 重点字段阈值预警
    # CommonUtil.check_fields_and_warning(hive_tb_name=hive_table, partition_dict=partition_dict)

    # 获取计算分区
    msg_params = ""
    # 解析partition_dict获取分区查询条件
    partition_conditions = []
    for key, value in partition_dict.items():
        if value is not None:
            msg_params += f"{value} "
            partition_conditions.append(f"{key} = '{value}'")
    base_msg = f"{hive_table} {msg_params} "
    site_name = partition_dict.get("site_name")
    date_type = partition_dict.get("date_type")
    spark_session = SparkUtil.get_spark_sessionV3("check_fields_rule")
    # 获取维护的字段验证配置表数据
    config_table_query = f"""select * from hive_field_verify_config 
                                        where table_name ='{hive_table}' 
                                        and site_name = '{site_name}'
                                        and use_flag = 1 """
    conn_info = DBUtil.get_connection_info('postgresql', 'us')
    check_field_df = SparkUtil.read_jdbc_query(
        session=spark_session,
        url=conn_info["url"],
        pwd=conn_info["pwd"],
        username=conn_info["username"],
        query=config_table_query
    )
    # 获取验证消息
    check_field_list = check_field_df.select('field_name', 'verify_desc', 'verify_type', 'config_json',
                                             'msg_usr_list').collect()
    if not check_field_list:
        print("============================无验证匹配条件跳过验证===================================")
        exit()
    # 创建一个df用于储存验证情况
    # 定义列的结构
    schema = StructType([
        StructField("验证描述", StringType(), True),
        StructField("验证类型", StringType(), True),
        StructField("校验字段", StringType(), True),
        StructField("校验条件查询占比", StringType(), True),
        StructField("验证占比临界值上限", StringType(), True),
        StructField("验证占比临界值下限", StringType(), True),
        StructField("是否验证通过", IntegerType(), True),

    ])

    # 使用定义的结构创建空的 DataFrame
    check_df = spark_session.createDataFrame([], schema)

    # 进行验证sql组装
    query = f"""
                SELECT COUNT(1) AS total_count
                FROM {hive_table}
            """
    # 拼接where条件
    if partition_conditions:
        query_total = query + f" WHERE {' AND '.join(partition_conditions)}"

    # print('query_total:', query_total)

    # 执行sql获取验证值与df
    total_df = spark_session.sql(query_total).cache()
    total_count = int(total_df.collect()[0]['total_count'])

    for row in check_field_list:
        vertify_flag = True
        field_name = row['field_name']
        verify_type = row['verify_type']
        config_json = json.loads(row['config_json'])
        msg_usr = row['msg_usr_list']
        msg_usr_list = [user.strip() for user in msg_usr.split(",")] if msg_usr else []
        sql_condition = config_json['sql_condition']
        partition_conf_list = config_json['partition_conf']

        for conf in partition_conf_list:
            conf_site_name = conf["site_name"]
            conf_date_type = conf["date_type"]
            if site_name == conf_site_name and date_type == conf_date_type:
                vertify_flag = True
                break
            else:
                vertify_flag = False
        # assert base_rate is not None, f"未配置{field_name}验证周期{date_type}的基准值,请检查!"

        # 没有合适的匹配维度
        if not vertify_flag:
            break

        # 拼接外部查询条件
        if sql_condition:
            query_field_check = query_total + f" AND {sql_condition} "

        # print('query_total:', query_field_check)

        # # 执行sql获取验证值与df
        # total_df = spark_session.sql(query_total).cache()
        # total_count = int(total_df.collect()[0]['total_count'])

        check_count_df = spark_session.sql(query_field_check).cache()
        check_count = int(check_count_df.collect()[0]['total_count'])

        calcult_rate = round((check_count / total_count), 3)

        waring_max = conf['max_rate']
        waring_min = conf['min_rate']
        verify_flag = 1 if (calcult_rate <= waring_max) and (calcult_rate >= waring_min) else 0
        ratio_df = spark_session.createDataFrame([(row['verify_desc'],verify_type,field_name,calcult_rate,waring_max,waring_min,verify_flag)],schema).repartition(1)
        # ratio_df = check_df.select(
        #     F.lit(row['verify_desc']).alias('验证描述'),
        #     F.lit(verify_type).alias('验证类型'),
        #     F.lit(field_name).alias('校验字段'),
        #     F.lit(calcult_rate).alias('校验条件查询占比'),
        #     F.lit(waring_max).alias('验证占比临界值上限'),
        #     F.lit(waring_min).alias('验证占比临界值下限'),
        #     F.lit(verify_flag).alias('是否验证通过')
        # )
        check_df = check_df.unionByName(ratio_df, False)

    if check_df.count() < 1 :
        print("无验证项验证")
        exit()
    check_df.show(50, truncate=False)

    # 对校验结果进行判断是否有校验不通过的数据
    # print(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
    schema_flag = bool(check_df.select(F.min("是否验证通过").alias("result")).first().asDict()['result'])
    # print(schema_flag)
    if not schema_flag:
        msg = f"数据表:{hive_table} {msg_params},计算数据存在验证不通过,请检查数据是否异常!!具体信息请查看日志!!"
        CommonUtil.send_wx_msg(['chenjianyun', 'wangrui4'], f"\u26A0 {hive_table} {msg_params}流程数据导出前验证异常", msg)
        spark_session.stop()
    pass