tmp_word_frequency.py 5.08 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils

from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, BooleanType, MapType

"""
merchantwords 搜索词分词词频
"""


def is_number(str):
    """
    判断一个字符是否是数字
    :param str:
    :return:
    """
    import re
    return re.match(r"^-?\d+\.?\d+$", str) is not None


def word_tokenize(keyword: str):
    import re
    keyword = re.sub(r'(\d+\.?\d*|-|\"|,|,|?|\?|/|、|)', '', keyword).strip()

    from nltk.tokenize import word_tokenize
    result = word_tokenize(keyword, "english")
    # 过滤标点如下
    filter_arr = [
        " ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "'", "'", "\""
    ]

    return list(filter(lambda x: not is_number(x) and x not in filter_arr, result))


def run():
    spark = SparkUtil.get_spark_session("app_name")

    udf_word_tokenize = F.udf(word_tokenize, ArrayType(StringType()))

    keywords_all = spark.sql("select keyword from dwt_merchantwords_st_detail where site_name='us'").cache()
    df_all = keywords_all.withColumn("word", F.explode(udf_word_tokenize(F.col("keyword"))))
    df_all = df_all.groupby(F.col("word")) \
        .agg(F.count("word").alias("frequency")) \
        .orderBy(F.col("frequency").desc()) \
        .select(
        F.col("word"),
        F.col("frequency"),
        F.lit("us").alias("site_name")
    )

    hive_tb = 'tmp_word_frequency'
    # #  去重
    partition_dict = {
        "site_name": "us"
    }
    hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict)
    HdfsUtils.delete_hdfs_file(hdfs_path)
    partition_by = list(partition_dict.keys())
    print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
    df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)


def word_pluralize(keyword: str):
    from textblob import Word
    # 单数形式
    singularize = Word(keyword).singularize().string
    # 复数形式
    pluralize = Word(singularize).pluralize().string

    result = {
        "text": keyword,
        "singularize": singularize,
        "pluralize": pluralize,
        "pluralizeFlag": keyword == pluralize,
        "not_regular": keyword not in [singularize, pluralize]
    }
    return result


def word_stem(keyword: str):
    from nltk.stem.snowball import SnowballStemmer
    stemmer = SnowballStemmer("english", ignore_stopwords=False)
    return stemmer.stem(keyword)


def word_test():
    spark = SparkUtil.get_spark_session("word_test")
    udf_word_pluralize = F.udf(word_pluralize, StructType(
        [
            StructField('text', StringType(), True),
            StructField('singularize', StringType(), True),
            StructField('pluralize', StringType(), True),
            StructField('pluralizeFlag', BooleanType(), True),
            StructField('not_regular', BooleanType(), True),
        ]
    ))

    udf_word_stem = F.udf(word_stem, StringType())

    keywords_all = spark.sql("select word,frequency from tmp_word_frequency").cache()

    keywords_all = keywords_all.withColumn("resultMap", udf_word_pluralize(F.col("word"))).select(
        F.col("word"),
        F.col("frequency"),
        F.col("resultMap").getField("singularize").alias("singularize"),
        F.col("resultMap").getField("pluralize").alias("pluralize"),
        F.col("resultMap").getField("pluralizeFlag").alias("pluralizeFlag"),
        F.col("resultMap").getField("not_regular").alias("not_regular"),
    ).where("(pluralizeFlag == true) or (not_regular == true)")

    # 计算词根
    keywords_all = keywords_all.withColumn("word_stem", udf_word_stem(F.col("word")))
    keywords_all = keywords_all.withColumn("singularize_stem", udf_word_stem(F.col("singularize")))
    keywords_all = keywords_all.withColumn("pluralize_stem", udf_word_stem(F.col("pluralize")))

    hive_tb = 'tmp_word_not_regular_v2'

    keywords_all.write.saveAsTable(name=hive_tb, format='hive', mode='append')
    print("success")


def word_for_calc():
    spark = SparkUtil.get_spark_session("word_for_calc")
    keywords_all = spark.sql("""
    select *
    from (
        select word, sum(volume) as volume
        from (
            select regexp_extract(keyword, 'for (.*)', 0) as word, volume
            from big_data_selection.dwt_merchantwords_st_detail
            ) tmp
        where word != ''
        group by word
        ) tmp
    order by volume desc
    """)

    keywords_all.write.saveAsTable(name="tmp_for_market", format='hive', mode='append')
    print("success")


def word_for_download():
    spark = SparkUtil.get_spark_session("word_for_calc")
    keywords_all = spark.sql("""
    select word
    from tmp_for_market
    order by volume desc
    """)
    CommonUtil.df_export_csv(spark, keywords_all, csv_name='word_for_calc', limit=200 * 10000)
    print("success")
    pass


if __name__ == '__main__':
    # word_for_calc()
    word_for_download()
    print("success")