import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from pyspark.sql.types import ArrayType, StringType, StructType, StructField, BooleanType, MapType """ merchantwords 搜索词分词词频 """ def is_number(str): """ 判断一个字符是否是数字 :param str: :return: """ import re return re.match(r"^-?\d+\.?\d+$", str) is not None def word_tokenize(keyword: str): import re keyword = re.sub(r'(\d+\.?\d*|-|\"|,|,|?|\?|/|、|)', '', keyword).strip() from nltk.tokenize import word_tokenize result = word_tokenize(keyword, "english") # 过滤标点如下 filter_arr = [ " ", "\t", "\r", "\n", "(", ")", ",", ",", "[", "]", "、", "-", ":", "&", "|", "+", "``", "'", "'", "\"" ] return list(filter(lambda x: not is_number(x) and x not in filter_arr, result)) def run(): spark = SparkUtil.get_spark_session("app_name") udf_word_tokenize = F.udf(word_tokenize, ArrayType(StringType())) keywords_all = spark.sql("select keyword from dwt_merchantwords_st_detail where site_name='us'").cache() df_all = keywords_all.withColumn("word", F.explode(udf_word_tokenize(F.col("keyword")))) df_all = df_all.groupby(F.col("word")) \ .agg(F.count("word").alias("frequency")) \ .orderBy(F.col("frequency").desc()) \ .select( F.col("word"), F.col("frequency"), F.lit("us").alias("site_name") ) hive_tb = 'tmp_word_frequency' # # 去重 partition_dict = { "site_name": "us" } hdfs_path = CommonUtil.build_hdfs_path(hive_tb, partition_dict) HdfsUtils.delete_hdfs_file(hdfs_path) partition_by = list(partition_dict.keys()) print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) df_all.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) def word_pluralize(keyword: str): from textblob import Word # 单数形式 singularize = Word(keyword).singularize().string # 复数形式 pluralize = Word(singularize).pluralize().string result = { "text": keyword, "singularize": singularize, "pluralize": pluralize, "pluralizeFlag": keyword == pluralize, "not_regular": keyword not in [singularize, pluralize] } return result def word_stem(keyword: str): from nltk.stem.snowball import SnowballStemmer stemmer = SnowballStemmer("english", ignore_stopwords=False) return stemmer.stem(keyword) def word_test(): spark = SparkUtil.get_spark_session("word_test") udf_word_pluralize = F.udf(word_pluralize, StructType( [ StructField('text', StringType(), True), StructField('singularize', StringType(), True), StructField('pluralize', StringType(), True), StructField('pluralizeFlag', BooleanType(), True), StructField('not_regular', BooleanType(), True), ] )) udf_word_stem = F.udf(word_stem, StringType()) keywords_all = spark.sql("select word,frequency from tmp_word_frequency").cache() keywords_all = keywords_all.withColumn("resultMap", udf_word_pluralize(F.col("word"))).select( F.col("word"), F.col("frequency"), F.col("resultMap").getField("singularize").alias("singularize"), F.col("resultMap").getField("pluralize").alias("pluralize"), F.col("resultMap").getField("pluralizeFlag").alias("pluralizeFlag"), F.col("resultMap").getField("not_regular").alias("not_regular"), ).where("(pluralizeFlag == true) or (not_regular == true)") # 计算词根 keywords_all = keywords_all.withColumn("word_stem", udf_word_stem(F.col("word"))) keywords_all = keywords_all.withColumn("singularize_stem", udf_word_stem(F.col("singularize"))) keywords_all = keywords_all.withColumn("pluralize_stem", udf_word_stem(F.col("pluralize"))) hive_tb = 'tmp_word_not_regular_v2' keywords_all.write.saveAsTable(name=hive_tb, format='hive', mode='append') print("success") def word_for_calc(): spark = SparkUtil.get_spark_session("word_for_calc") keywords_all = spark.sql(""" select * from ( select word, sum(volume) as volume from ( select regexp_extract(keyword, 'for (.*)', 0) as word, volume from big_data_selection.dwt_merchantwords_st_detail ) tmp where word != '' group by word ) tmp order by volume desc """) keywords_all.write.saveAsTable(name="tmp_for_market", format='hive', mode='append') print("success") def word_for_download(): spark = SparkUtil.get_spark_session("word_for_calc") keywords_all = spark.sql(""" select word from tmp_for_market order by volume desc """) CommonUtil.df_export_csv(spark, keywords_all, csv_name='word_for_calc', limit=200 * 10000) print("success") pass if __name__ == '__main__': # word_for_calc() word_for_download() print("success")