import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from pyspark.sql.functions import count, explode, lit, desc, sum from pyspark.sql.types import ArrayType, StringType from textblob import Word from googletrans import Translator class ABA2023YearWordFrequency(object): def __init__(self): self.spark = SparkUtil.get_spark_session("spark_task: aba_2023_year_word_frequency") self.df_aba_2023 = self.spark.sql(f"select 1+1;") self.df_beside_category = self.spark.sql(f"select 1+1;") self.df_translate = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.df_save1 = self.spark.sql(f"select 1+1;") self.df_save2 = self.spark.sql(f"select 1+1;") self.df_agg = self.spark.sql(f"select 1+1;") # 自定义udf self.u_get_singular_form = self.spark.udf.register('get_singular_form', self.get_singular_form, StringType()) self.u_word_tokenize = self.spark.udf.register('word_tokenize', self.word_tokenize, ArrayType(StringType())) # self.u_word_translate = self.spark.udf.register('word_translate', self.word_translate, StringType()) @staticmethod def get_singular_form(word: str): """ 将单词全部转化为单数形式 """ if word: singular_form = Word(word).lemmatize("n") # word_object = Word(word) # singular_form = word_object.singularize() return singular_form return word @staticmethod def word_tokenize(title: str): """ 分词器 """ from nltk.tokenize import word_tokenize result = word_tokenize(title, "english") return result # @staticmethod # def word_translate(word: str): # if word: # try: # translator = Translator() # result = translator.translate(word, src='en', dest='zh-cn') # return result.text # except Exception as e: # # 处理其他未知错误 # print(f"An unexpected error occurred: {e}") # return None # return None def read_data(self): sql1 = f""" select search_term, category_id from dwt_aba_last365 where site_name = 'us' and date_type = 'last365day' and date_info = '2023-12'; """ self.df_aba_2023 = self.spark.sql(sql1).cache() print("df_aba_2023的数量:") print(self.df_aba_2023.count()) sql2 = f""" select category_id from dim_bsr_category_tree where site_name = 'us' and en_name in ('Audible Books & Originals', 'Books', 'Kindle Store', 'Apps & Games', 'Movies & TV', 'CDs & Vinyl', 'Software', 'Video Games') and category_parent_id = 0; """ self.df_beside_category = self.spark.sql(sql2).cache() print("df_beside_category的数量:") print(self.df_beside_category.count()) sql3 = f""" select word, simple_cn as cn from tmp_en_dict; """ self.df_translate = self.spark.sql(sql3).cache() print("df_translate的数量:") print(self.df_translate.count()) def handle_data(self): self.df_save = self.df_aba_2023.join( self.df_beside_category, on='category_id', how='left_anti' ).select('search_term') self.df_save = self.df_save.select(explode(self.u_word_tokenize(self.df_save['search_term'])).alias('word')) self.df_save = self.df_save.groupby(['word']).agg( count('word').alias('word_frequency') ) self.df_save = self.df_save.join( self.df_translate, on='word', how='left' ).withColumn( 'word_singular_form', self.u_get_singular_form(self.df_save['word']) ).cache() self.df_save1 = self.df_save.select( 'word', 'word_frequency', 'cn' ).orderBy( desc('word_frequency') ).withColumn( 'date_info', lit('2023') ) print("df_save1的数量:") print(self.df_save1.count()) self.df_save1.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info') print("df_save1存储完成!") self.df_agg = self.df_save.groupby(['word_singular_form']).agg( sum('word_frequency').alias('word_frequency') ) self.df_save2 = self.df_save.select('word', 'cn', 'word_singular_form').join( self.df_agg, on='word_singular_form', how='left' ).select( 'word', 'word_frequency', 'cn' ).orderBy( desc('word_frequency') ).withColumn( 'date_info', lit('2023-merge') ) print("df_save2的数量:") print(self.df_save2.count()) self.df_save2.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info') print("df_save2存储完成!") if __name__ == '__main__': obj = ABA2023YearWordFrequency() obj.read_data() obj.handle_data()