import json import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from utils.common_util import CommonUtil from pyspark.sql.window import Window from pyspark.sql import functions as F from utils.ssh_util import SSHUtil def run_all(): import re client = HdfsUtils.get_hdfs_cilent() check_row_list = [] for root, dir, files in client.walk("/home/big_data_selection", 10, status=1): for (part_name, part_status) in files: if part_status.get("type") != 'FILE': continue (root_name, root_status) = root if "/home/big_data_selection/ods" in root_name or "/home/big_data_selection/dwd/pictures" in root_name: continue if part_name.endswith(".index"): continue if part_name.startswith("part-m"): continue group = re.findall(r"^(part-\d*)(.*)", part_name) if group is not None and len(group) >= 1: (part_prefix, val2) = group[0] if part_prefix: check_row_list.append({ "path": root_name, "part_prefix": part_prefix, "part_name": part_name, "part_date": CommonUtil.format_timestamp(part_status['modificationTime']), "full_path": f"{root_name}/{part_name}" }) spark = SparkUtil.get_spark_session("part_check") df_all = spark.createDataFrame(check_row_list) df_all = df_all.withColumn("count", F.count("part_prefix").over(Window.partitionBy(['path', 'part_prefix']))) df_all = df_all.withColumn("date_number", F.row_number().over( Window.partitionBy(['path', 'part_prefix']).orderBy(F.col("part_date").desc()))) err_df = df_all.where(F.expr("count > 1")) CommonUtil.df_export_csv(spark, err_df, "check_hadoop_part") print("success") pass if __name__ == '__main__': check_tb = [ "dim_bsr_asin_rank_history", "dim_nsr_asin_rank_history", ] from utils.io_util import IoUtils part_list = [] for tb in check_tb: part_list = part_list + HdfsUtils.list_all_part_location(tb) print(json.dumps(part_list)) IoUtils.save_json(part_list, "part_list.json") countMap = {} errList = [] for row in part_list: location = row.get("location") count = countMap.get(location) or 0 count = count + 1 countMap[location] = count if count > 1: errList.append(location) IoUtils.save_json(list(set(errList)), "errList.json") IoUtils.save_json(countMap, "countMap.json")