tmp_word_frequency.py 5.08 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils

from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, BooleanType, MapType

"""
merchantwords 搜索词分词词频
"""


def is_number(str):
    """
    判断一个字符是否是数字
    :param str:
    :return:
    """
    import re
    return re.match(r"^-?\d+\.?\d+$", str) is not None


def word_tokenize(keyword: str):
    import re
    keyword = re.sub(r'(\d+\.?\d*|-|\"|,|,|?|\?|/|、|)', '', keyword).strip()

    from nltk.tokenize import word_tokenize
    result = word_tokenize(keyword, "english")
    # 过滤标点如下
    filter_arr = [
        " ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "'", "'", "\""
    ]

    return list(filter(lambda x: not is_number(x) and x not in filter_arr, result))


def run():
    spark = SparkUtil.get_spark_session("app_name")

    udf_word_tokenize = F.udf(word_tokenize, ArrayType(StringType()))

    keywords_all = spark.sql("select keyword from dwt_merchantwords_st_detail where site_name='us'").cache()
    df_all = keywords_all.withColumn("word", F.explode(udf_word_tokenize(F.col("keyword"))))
    df_all = df_all.groupby(F.col("word")) \
        .agg(F.count("word").alias("frequency")) \
        .orderBy(F.col("frequency").desc()) \
        .select(
        F.col("word"),
        F.col("frequency"),
        F.lit("us").alias("site_name")
    )

    hive_tb = 'tmp_word_frequency'
    # #  去重
    partition_dict = {
        "site_name": "us"
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
    HdfsUtils.delete_hdfs_file(hdfs_path)
    partition_by = list(partition_dict.keys())
    print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
    df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)


def word_pluralize(keyword: str):
    from textblob import Word
    # 单数形式
    singularize = Word(keyword).singularize().string
    # 复数形式
    pluralize = Word(singularize).pluralize().string

    result = {
        "text": keyword,
        "singularize": singularize,
        "pluralize": pluralize,
        "pluralizeFlag": keyword == pluralize,
        "not_regular": keyword not in [singularize, pluralize]
    }
    return result


def word_stem(keyword: str):
    from nltk.stem.snowball import SnowballStemmer
    stemmer = SnowballStemmer("english", ignore_stopwords=False)
    return stemmer.stem(keyword)


def word_test():
    spark = SparkUtil.get_spark_session("word_test")
    udf_word_pluralize = F.udf(word_pluralize, StructType(
        [
            StructField('text', StringType(), True),
            StructField('singularize', StringType(), True),
            StructField('pluralize', StringType(), True),
            StructField('pluralizeFlag', BooleanType(), True),
            StructField('not_regular', BooleanType(), True),
        ]
    ))

    udf_word_stem = F.udf(word_stem, StringType())

    keywords_all = spark.sql("select word,frequency from tmp_word_frequency").cache()

    keywords_all = keywords_all.withColumn("resultMap", udf_word_pluralize(F.col("word"))).select(
        F.col("word"),
        F.col("frequency"),
        F.col("resultMap").getField("singularize").alias("singularize"),
        F.col("resultMap").getField("pluralize").alias("pluralize"),
        F.col("resultMap").getField("pluralizeFlag").alias("pluralizeFlag"),
        F.col("resultMap").getField("not_regular").alias("not_regular"),
    ).where("(pluralizeFlag == true) or (not_regular == true)")

    # 计算词根
    keywords_all = keywords_all.withColumn("word_stem", udf_word_stem(F.col("word")))
    keywords_all = keywords_all.withColumn("singularize_stem", udf_word_stem(F.col("singularize")))
    keywords_all = keywords_all.withColumn("pluralize_stem", udf_word_stem(F.col("pluralize")))

    hive_tb = 'tmp_word_not_regular_v2'

    keywords_all.write.saveAsTable(name=hive_tb, format='hive', mode='append')
    print("success")


def word_for_calc():
    spark = SparkUtil.get_spark_session("word_for_calc")
    keywords_all = spark.sql("""
    select *
    from (
        select word, sum(volume) as volume
        from (
            select regexp_extract(keyword, 'for (.*)', 0) as word, volume
            from big_data_selection.dwt_merchantwords_st_detail
            ) tmp
        where word != ''
        group by word
        ) tmp
    order by volume desc
    """)

    keywords_all.write.saveAsTable(name="tmp_for_market", format='hive', mode='append')
    print("success")


def word_for_download():
    spark = SparkUtil.get_spark_session("word_for_calc")
    keywords_all = spark.sql("""
    select word
    from tmp_for_market
    order by volume desc
    """)
    CommonUtil.df_export_csv(spark, keywords_all, csv_name='word_for_calc', limit=200 * 10000)
    print("success")
    pass


if __name__ == '__main__':
    # word_for_calc()
    word_for_download()
    print("success")