aba_2023_year_word_frequency.py 5.26 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, explode, lit, desc, sum
from pyspark.sql.types import ArrayType, StringType
from textblob import Word
from googletrans import Translator


class ABA2023YearWordFrequency(object):

    def __init__(self):
        self.spark = SparkUtil.get_spark_session("spark_task: aba_2023_year_word_frequency")
        self.df_aba_2023 = self.spark.sql(f"select 1+1;")
        self.df_beside_category = self.spark.sql(f"select 1+1;")
        self.df_translate = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_save1 = self.spark.sql(f"select 1+1;")
        self.df_save2 = self.spark.sql(f"select 1+1;")
        self.df_agg = self.spark.sql(f"select 1+1;")
        # 自定义udf
        self.u_get_singular_form = self.spark.udf.register('get_singular_form', self.get_singular_form, StringType())
        self.u_word_tokenize = self.spark.udf.register('word_tokenize', self.word_tokenize, ArrayType(StringType()))
        # self.u_word_translate = self.spark.udf.register('word_translate', self.word_translate, StringType())

    @staticmethod
    def get_singular_form(word: str):
        """
        将单词全部转化为单数形式
        """
        if word:
            singular_form = Word(word).lemmatize("n")
            # word_object = Word(word)
            # singular_form = word_object.singularize()
            return singular_form
        return word

    @staticmethod
    def word_tokenize(title: str):
        """
        分词器
        """
        from nltk.tokenize import word_tokenize
        result = word_tokenize(title, "english")
        return result

    # @staticmethod
    # def word_translate(word: str):
    #     if word:
    #         try:
    #             translator = Translator()
    #             result = translator.translate(word, src='en', dest='zh-cn')
    #             return result.text
    #         except Exception as e:
    #             # 处理其他未知错误
    #             print(f"An unexpected error occurred: {e}")
    #             return None
    #     return None

    def read_data(self):
        sql1 = f"""
            select 
                search_term, 
                category_id 
            from dwt_aba_last365 
            where site_name = 'us' 
              and date_type = 'last365day' 
              and date_info = '2023-12';
        """
        self.df_aba_2023 = self.spark.sql(sql1).cache()
        print("df_aba_2023的数量:")
        print(self.df_aba_2023.count())

        sql2 = f"""
            select 
                category_id
            from dim_bsr_category_tree
            where site_name = 'us'
              and en_name in ('Audible Books & Originals', 'Books', 'Kindle Store', 'Apps & Games', 'Movies & TV', 'CDs & Vinyl', 'Software', 'Video Games')
              and category_parent_id = 0;
        """
        self.df_beside_category = self.spark.sql(sql2).cache()
        print("df_beside_category的数量:")
        print(self.df_beside_category.count())

        sql3 = f"""
            select 
                word, 
                simple_cn as cn 
            from tmp_en_dict;
        """
        self.df_translate = self.spark.sql(sql3).cache()
        print("df_translate的数量:")
        print(self.df_translate.count())

    def handle_data(self):
        self.df_save = self.df_aba_2023.join(
            self.df_beside_category, on='category_id', how='left_anti'
        ).select('search_term')

        self.df_save = self.df_save.select(explode(self.u_word_tokenize(self.df_save['search_term'])).alias('word'))
        self.df_save = self.df_save.groupby(['word']).agg(
            count('word').alias('word_frequency')
        )
        self.df_save = self.df_save.join(
            self.df_translate, on='word', how='left'
        ).withColumn(
            'word_singular_form',
            self.u_get_singular_form(self.df_save['word'])
        ).cache()

        self.df_save1 = self.df_save.select(
            'word', 'word_frequency', 'cn'
        ).orderBy(
            desc('word_frequency')
        ).withColumn(
            'date_info',
            lit('2023')
        )
        print("df_save1的数量:")
        print(self.df_save1.count())
        self.df_save1.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
        print("df_save1存储完成!")

        self.df_agg = self.df_save.groupby(['word_singular_form']).agg(
            sum('word_frequency').alias('word_frequency')
        )
        self.df_save2 = self.df_save.select('word', 'cn', 'word_singular_form').join(
            self.df_agg, on='word_singular_form', how='left'
        ).select(
            'word', 'word_frequency', 'cn'
        ).orderBy(
            desc('word_frequency')
        ).withColumn(
            'date_info',
            lit('2023-merge')
        )
        print("df_save2的数量:")
        print(self.df_save2.count())
        self.df_save2.write.saveAsTable(name='tmp_word_frequency', format='hive', mode='append', partitionBy='date_info')
        print("df_save2存储完成!")


if __name__ == '__main__':
    obj = ABA2023YearWordFrequency()
    obj.read_data()
    obj.handle_data()