1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import os
import sys
import re
sys.path.append(os.path.dirname(sys.path[0])) # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
class MatchSensitive(Templates):
def __init__(self):
super().__init__()
self.db_save = f'dwd_sensitive_match'
self.spark = self.create_spark_object(
app_name=f"{self.db_save}")
self.df_save = self.spark.sql(f"select 1+1;")
self.df_sensitive = self.spark.sql(f"select 1+1;")
self.description_df = self.spark.sql(f"select 1+1;")
self.exploded_df = self.spark.sql(f"select 1+1;")
self.partitions_num = 5
# 自定义udf函数相关对象
# self.find_sensitive = self.spark.udf.register("find_sensitive", self.find_sensitive, StringType())
@staticmethod
def find_sensitive(ele_list:list):
pattern = re.compile(r'(?<!\+|\*|\-|\%|\.|\')\b({})\b'.format('|'.join([re.escape(x) for x in ele_list])),
flags=re.IGNORECASE)
def udf_find_sensitive(match_text):
ele_list = re.findall(pattern, match_text)
if ele_list:
return '||'.join(set(ele_list))
else:
return None
return F.udf(udf_find_sensitive, StringType())
def read_data(self):
# 读取dim_sensitive获取敏感词列表
sql = """
select
sensitive
from
dim_sensitive;
"""
print(sql)
self.df_sensitive = self.spark.sql(sqlQuery=sql).cache()
with open("/home/chenyuanjie/data/description.txt", 'r', encoding='utf-8') as file:
description_list = file.readlines()
# 转为 DataFrame
self.description_df = self.spark.createDataFrame([(description,) for description in description_list],
["description"])
self.description_df.show()
def handle_data(self):
# self.df_save = self.description_df.withColumn("matched", F.concat(*[F.when(F.col("description").like(f"%{sensitive}%"),
# F.lit(sensitive)) for sensitive in self.df_sensitive.select("sensitive").rdd.flatMap(lambda x: x).collect()],
# F.lit("||")))
# self.df_save.show()
# self.exploded_df = self.description_df.withColumn("word", F.explode(F.split(F.col("description"), " ")))
# 将 exploded_df 与 df_sensitive 进行 join,匹配单词和敏感词
# self.df_save = self.exploded_df.join(self.df_sensitive, self.exploded_df.word == self.df_sensitive.sensitive)
# sensitive_words = self.df_sensitive.select('sensitive').rdd.flatMap(lambda x: x).collect()
# escaped_sensitive_words = [re.escape(word) for word in sensitive_words]
# 构建正则表达式模式
# pattern = f"({'|'.join(escaped_sensitive_words)})"
# self.df_save = self.description_df.withColumn("sensitive", F.regexp_extract(F.col("description"), pattern, 0))
# 根据原始 description 进行分组,将匹配到的敏感词用 || 拼接
# self.df_save = self.df_save.groupBy("description").agg(F.collect_list("sensitive").alias("sensitive_list"))
# self.df_save = self.df_save.withColumn("matched", F.concat_ws("||", "sensitive_list"))
self.description_df = self.description_df.withColumn("description", F.regexp_replace(F.col("description"), "\r\n|\n", ""))
# 使用 collect 方法将列数据转为本地 Python 列表
sensitive_words = self.df_sensitive.toPandas()["sensitive"].values.tolist()
self.df_save = self.description_df.withColumn("matched", self.find_sensitive(sensitive_words)(F.col("description")))
# 选择需要的列
# self.df_save = self.df_save.select("description", "matched")
self.df_save.show()
if __name__ == '__main__':
handle_obj = MatchSensitive()
handle_obj.run()