1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import json
import os
import sys
sys.path.append(os.path.dirname(sys.path[0]))
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from utils.ssh_util import SSHUtil
def run_all():
import re
client = HdfsUtils.get_hdfs_cilent()
check_row_list = []
for root, dir, files in client.walk("/home/big_data_selection", 10,
status=1):
for (part_name, part_status) in files:
if part_status.get("type") != 'FILE':
continue
(root_name, root_status) = root
if "/home/big_data_selection/ods" in root_name or "/home/big_data_selection/dwd/pictures" in root_name:
continue
if part_name.endswith(".index"):
continue
if part_name.startswith("part-m"):
continue
group = re.findall(r"^(part-\d*)(.*)", part_name)
if group is not None and len(group) >= 1:
(part_prefix, val2) = group[0]
if part_prefix:
check_row_list.append({
"path": root_name,
"part_prefix": part_prefix,
"part_name": part_name,
"part_date": CommonUtil.format_timestamp(part_status['modificationTime']),
"full_path": f"{root_name}/{part_name}"
})
spark = SparkUtil.get_spark_session("part_check")
df_all = spark.createDataFrame(check_row_list)
df_all = df_all.withColumn("count", F.count("part_prefix").over(Window.partitionBy(['path', 'part_prefix'])))
df_all = df_all.withColumn("date_number", F.row_number().over(
Window.partitionBy(['path', 'part_prefix']).orderBy(F.col("part_date").desc())))
err_df = df_all.where(F.expr("count > 1"))
CommonUtil.df_export_csv(spark, err_df, "check_hadoop_part")
print("success")
pass
if __name__ == '__main__':
check_tb = [
"dim_bsr_asin_rank_history",
"dim_nsr_asin_rank_history",
]
from utils.io_util import IoUtils
part_list = []
for tb in check_tb:
part_list = part_list + HdfsUtils.list_all_part_location(tb)
print(json.dumps(part_list))
IoUtils.save_json(part_list, "part_list.json")
countMap = {}
errList = []
for row in part_list:
location = row.get("location")
count = countMap.get(location) or 0
count = count + 1
countMap[location] = count
if count > 1:
errList.append(location)
IoUtils.save_json(list(set(errList)), "errList.json")
IoUtils.save_json(countMap, "countMap.json")