dim_m_word_frequency.py 4.73 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from pyspark.sql.types import StringType, ArrayType
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.window import Window

"""
根据merchantwords计算词频
"""


def is_number(str):
    """
    判断一个字符是否是数字
    :param str:
    :return:
    """
    import re
    return re.match(r"^-?\d+\.?\d+$", str) is not None


class DimMWordFrequency:

    def __init__(self, site_name, batch):
        assert site_name is not None, "site_name not null"
        assert batch is not None, "batch not null"
        self.site_name = site_name
        self.batch = batch
        pass

    @staticmethod
    def word_tokenize(keyword: str):
        import re
        keyword = keyword.replace("+", " ")
        keyword = re.sub(r'(\d+\.?\d*|-|\"|,|,|?|\?|/|、|)', '', keyword).strip()

        from nltk.tokenize import word_tokenize
        result = word_tokenize(keyword, "english")
        # 过滤标点如下
        filter_arr = [
            " ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "'", "'", "\""
        ]
        return list(filter(lambda x: not is_number(x) and x not in filter_arr, result))

    @staticmethod
    def word_stem(keyword: str):
        from nltk.stem.snowball import SnowballStemmer
        stemmer = SnowballStemmer("english", ignore_stopwords=False)
        return stemmer.stem(keyword)

    def run(self):
        """
        单词纬度
        :return:
        """
        spark = SparkUtil.get_spark_session("DimWordFrequency")
        udf_word_tokenize = F.udf(self.word_tokenize, ArrayType(StringType()))

        keywords_all = spark.sql(
            f"select keyword from dwt_merchantwords_st_detail where site_name='{self.site_name}' and batch ='{self.batch}'").cache()
        df_all = keywords_all.withColumn("word", F.explode(udf_word_tokenize(F.col("keyword"))))
        df_all = df_all.groupby(F.col("word")) \
            .agg(F.count("word").alias("frequency")) \
            .withColumn("rank", F.rank().over(Window.orderBy(F.col("frequency").desc()))) \
            .orderBy(F.col("rank").asc()) \
            .select(
            F.col("word"),
            F.col("frequency"),
            F.col("rank"),
            F.lit({self.site_name}).alias("site_name"),
            F.lit({self.batch}).alias("batch")
        )

        hive_tb = 'dim_m_word_frequency'
        partition_dict = {
            "site_name": self.site_name,
            "batch": self.batch,
        }
        CommonUtil.save_or_update_table(
            spark,
            hive_tb,
            partition_dict,
            df_all,
            True
        )
        print("success")
        pass

    def run_stem(self):
        """
        词根纬度
        :return:
        """
        spark = SparkUtil.get_spark_session("DimWordFrequency")
        udf_word_tokenize = F.udf(self.word_tokenize, ArrayType(StringType()))
        udf_word_stem = F.udf(self.word_stem, StringType())

        keywords_all = spark.sql("select keyword from dwt_merchantwords_st_detail where site_name='us'").cache()
        df_all = keywords_all.withColumn("word", F.explode(udf_word_tokenize(F.col("keyword"))))
        df_all = df_all.groupby(F.col("word")) \
            .agg(F.count("word").alias("frequency"))

        df_all = df_all.withColumn("word_stem", udf_word_stem(F.col("word")))

        df_all = df_all.groupby(F.col("word_stem")) \
            .agg(
            F.sum("frequency").alias("frequency"),
            F.concat_ws(',', F.collect_set(F.col("word").alias('word_list')))
        ) \
            .withColumn("rank", F.rank().over(Window.orderBy(F.col("frequency").desc()))) \
            .orderBy(F.col("rank").asc()) \
            .select(
            F.col("word_stem"),
            F.col("word_list"),
            F.col("frequency"),
            F.col("rank"),
            F.lit("us").alias("site_name")
        )

        hive_tb = 'dim_m_word_stem_frequency'
        # #  去重
        partition_dict = {
            "site_name": "us"
        }
        hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
        HdfsUtils.delete_hdfs_file(hdfs_path)
        partition_by = list(partition_dict.keys())
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")
        pass

    pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, 'us')
    batch = CommonUtil.get_sys_arg(2, '2024-1')
    handle_obj = DimMWordFrequency(site_name, batch)
    handle_obj.run()
    pass