import re
from pyspark.sql import SparkSession

def load_sensitive_words(hive_table):
    # 在这里加载敏感词汇,可以使用Hive或其他方式获取数据
    # 返回一个包含敏感词的列表
    sensitive_words = ['sensitive_word1', 'sensitive_word2', 'sensitive_word3']
    return sensitive_words

def process_text(text, sensitive_words):
    # 检查文本中是否包含敏感词
    matched_words = [word for word in sensitive_words if re.search(r'\b' + re.escape(word) + r'\b', text, re.IGNORECASE)]
    return '||'.join(matched_words)

def main():
    # 创建Spark会话
    spark = SparkSession.builder.appName("SensitiveWordsProcessor").getOrCreate()

    # 从文件加载产品描述数据
    input_file_path = '/path/to/product_descriptions.txt'
    product_descriptions_df = spark.read.text(input_file_path)
    product_descriptions_df = product_descriptions_df.withColumnRenamed("value", "original_text")

    # 从Hive表加载敏感词汇
    hive_table_name = 'your_hive_table_name'
    sensitive_words = load_sensitive_words(hive_table_name)

    # 使用UDF处理文本,将匹配的敏感词添加到新列
    spark.udf.register("process_text_udf", lambda text: process_text(text, sensitive_words))
    result_df = product_descriptions_df.withColumn("sensitive_words", expr("process_text_udf(original_text)"))

    # 显示或保存结果
    result_df.show(truncate=False)

    # 可选:将结果保存到新表
    result_table_name = 'your_result_table_name'
    result_df.write.saveAsTable(result_table_name, mode='overwrite')

    # 停止Spark会话
    spark.stop()

if __name__ == "__main__":
    main()