import re from pyspark.sql import SparkSession def load_sensitive_words(hive_table): # 在这里加载敏感词汇,可以使用Hive或其他方式获取数据 # 返回一个包含敏感词的列表 sensitive_words = ['sensitive_word1', 'sensitive_word2', 'sensitive_word3'] return sensitive_words def process_text(text, sensitive_words): # 检查文本中是否包含敏感词 matched_words = [word for word in sensitive_words if re.search(r'\b' + re.escape(word) + r'\b', text, re.IGNORECASE)] return '||'.join(matched_words) def main(): # 创建Spark会话 spark = SparkSession.builder.appName("SensitiveWordsProcessor").getOrCreate() # 从文件加载产品描述数据 input_file_path = '/path/to/product_descriptions.txt' product_descriptions_df = spark.read.text(input_file_path) product_descriptions_df = product_descriptions_df.withColumnRenamed("value", "original_text") # 从Hive表加载敏感词汇 hive_table_name = 'your_hive_table_name' sensitive_words = load_sensitive_words(hive_table_name) # 使用UDF处理文本,将匹配的敏感词添加到新列 spark.udf.register("process_text_udf", lambda text: process_text(text, sensitive_words)) result_df = product_descriptions_df.withColumn("sensitive_words", expr("process_text_udf(original_text)")) # 显示或保存结果 result_df.show(truncate=False) # 可选:将结果保存到新表 result_table_name = 'your_result_table_name' result_df.write.saveAsTable(result_table_name, mode='overwrite') # 停止Spark会话 spark.stop() if __name__ == "__main__": main()