aba_2023_word_frequency.py 5.26 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, explode, lit, desc, sum
from pyspark.sql.types import ArrayType, StringType
from textblob import Word
from googletrans import Translator


class ABA2023YearWordFrequency(object):

    def __init__(self):
        self.spark = SparkUtil.get_spark_session("spark_task: aba_2023_year_word_frequency")
        self.df_aba_2023 = self.spark.sql(f"select 1+1;")
        self.df_beside_category = self.spark.sql(f"select 1+1;")
        self.df_translate = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_save1 = self.spark.sql(f"select 1+1;")
        self.df_save2 = self.spark.sql(f"select 1+1;")
        self.df_agg = self.spark.sql(f"select 1+1;")
        # 自定义udf
        self.u_get_singular_form = self.spark.udf.register('get_singular_form', self.get_singular_form, StringType())
        self.u_word_tokenize = self.spark.udf.register('word_tokenize', self.word_tokenize, ArrayType(StringType()))
        # self.u_word_translate = self.spark.udf.register('word_translate', self.word_translate, StringType())

    @staticmethod
    def get_singular_form(word: str):
        """
        将单词全部转化为单数形式
        """
        if word:
            singular_form = Word(word).lemmatize("n")
            # word_object = Word(word)
            # singular_form = word_object.singularize()
            return singular_form
        return word

    @staticmethod
    def word_tokenize(title: str):
        """
        分词器
        """
        from nltk.tokenize import word_tokenize
        result = word_tokenize(title, "english")
        return result

    # @staticmethod
    # def word_translate(word: str):
    #     if word:
    #         try:
    #             translator = Translator()
    #             result = translator.translate(word, src='en', dest='zh-cn')
    #             return result.text
    #         except Exception as e:
    #             # 处理其他未知错误
    #             print(f"An unexpected error occurred: {e}")
    #             return None
    #     return None

    def read_data(self):
        sql1 = f"""
            select 
                search_term, 
                category_id 
            from dwt_aba_last365 
            where site_name = 'us' 
              and date_type = 'last365day' 
              and date_info = '2023-12';
        """
        self.df_aba_2023 = self.spark.sql(sql1).cache()
        print("df_aba_2023的数量:")
        print(self.df_aba_2023.count())

        sql2 = f"""
            select 
                category_id
            from dim_bsr_category_tree
            where site_name = 'us'
              and en_name in ('Audible Books & Originals', 'Books', 'Kindle Store', 'Apps & Games', 'Movies & TV', 'CDs & Vinyl', 'Software', 'Video Games')
              and category_parent_id = 0;
        """
        self.df_beside_category = self.spark.sql(sql2).cache()
        print("df_beside_category的数量:")
        print(self.df_beside_category.count())

        sql3 = f"""
            select 
                word, 
                simple_cn as cn 
            from tmp_en_dict;
        """
        self.df_translate = self.spark.sql(sql3).cache()
        print("df_translate的数量:")
        print(self.df_translate.count())

    def handle_data(self):
        self.df_save = self.df_aba_2023.join(
            self.df_beside_category, on='category_id', how='left_anti'
        ).select('search_term')

        self.df_save = self.df_save.select(explode(self.u_word_tokenize(self.df_save['search_term'])).alias('word'))
        self.df_save = self.df_save.groupby(['word']).agg(
            count('word').alias('word_frequency')
        )
        self.df_save = self.df_save.join(
            self.df_translate, on='word', how='left'
        ).withColumn(
            'word_singular_form',
            self.u_get_singular_form(self.df_save['word'])
        ).cache()

        self.df_save1 = self.df_save.select(
            'word', 'word_frequency', 'cn'
        ).orderBy(
            desc('word_frequency')
        ).withColumn(
            'date_info',
            lit('2023')
        )
        print("df_save1的数量:")
        print(self.df_save1.count())
        self.df_save1.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
        print("df_save1存储完成!")

        self.df_agg = self.df_save.groupby(['word_singular_form']).agg(
            sum('word_frequency').alias('word_frequency')
        )
        self.df_save2 = self.df_save.select('word', 'cn', 'word_singular_form').join(
            self.df_agg, on='word_singular_form', how='left'
        ).select(
            'word', 'word_frequency', 'cn'
        ).orderBy(
            desc('word_frequency')
        ).withColumn(
            'date_info',
            lit('2023-merge')
        )
        print("df_save2的数量:")
        print(self.df_save2.count())
        self.df_save2.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
        print("df_save2存储完成!")


if __name__ == '__main__':
    obj = ABA2023YearWordFrequency()
    obj.read_data()
    obj.handle_data()