check_hadoop_part.py 2.74 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
import json
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.ssh_util import SSHUtil


def run_all():
    import re
    client = HdfsUtils.get_hdfs_cilent()
    check_row_list = []
    for root, dir, files in client.walk("/home/big_data_selection", 10,
                                        status=1):

        for (part_name, part_status) in files:
            if part_status.get("type") != 'FILE':
                continue
            (root_name, root_status) = root
            if "/home/big_data_selection/ods" in root_name or "/home/big_data_selection/dwd/pictures" in root_name:
                continue
            if part_name.endswith(".index"):
                continue
            if part_name.startswith("part-m"):
                continue

            group = re.findall(r"^(part-\d*)(.*)", part_name)
            if group is not None and len(group) >= 1:
                (part_prefix, val2) = group[0]
                if part_prefix:
                    check_row_list.append({
                        "path": root_name,
                        "part_prefix": part_prefix,
                        "part_name": part_name,
                        "part_date": CommonUtil.format_timestamp(part_status['modificationTime']),
                        "full_path": f"{root_name}/{part_name}"
                    })

    spark = SparkUtil.get_spark_session("part_check")
    df_all = spark.createDataFrame(check_row_list)

    df_all = df_all.withColumn("count", F.count("part_prefix").over(Window.partitionBy(['path', 'part_prefix'])))
    df_all = df_all.withColumn("date_number", F.row_number().over(
        Window.partitionBy(['path', 'part_prefix']).orderBy(F.col("part_date").desc())))

    err_df = df_all.where(F.expr("count > 1"))

    CommonUtil.df_export_csv(spark, err_df, "check_hadoop_part")

    print("success")

    pass


if __name__ == '__main__':
    check_tb = [
        "dim_bsr_asin_rank_history",
        "dim_nsr_asin_rank_history",
    ]
    from utils.io_util import IoUtils

    part_list = []
    for tb in check_tb:
        part_list = part_list + HdfsUtils.list_all_part_location(tb)

    print(json.dumps(part_list))

    IoUtils.save_json(part_list, "part_list.json")

    countMap = {}
    errList = []
    for row in part_list:
        location = row.get("location")
        count = countMap.get(location) or 0
        count = count + 1
        countMap[location] = count
        if count > 1:
            errList.append(location)

    IoUtils.save_json(list(set(errList)), "errList.json")
    IoUtils.save_json(countMap, "countMap.json")