us_aba_last365_word_frequency.py 2.34 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.spark_util import SparkUtil
from pyspark.sql.functions import count, col


class WordFrequency(object):

    def __init__(self):
        self.spark = SparkUtil.get_spark_session("us_aba_last365_word_frequency")

    def run(self):
        sql1 = f"""
            select search_term, date_info
            from dwt_aba_st_analytics
            where site_name = 'us'
              and date_type = 'month'
              and date_info in
                ('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05',
                 '2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11')
              and rank <= 1000000
              and st_brand_label = 1;
        """
        df_st = self.spark.sql(sql1).cache()
        print("df_st数量是:")
        print(df_st.count())

        sql2 = f"""
            select search_term, first_match_brand as brand, date_info
            from dws_st_brand_info
            where site_name = 'us'
              and date_type = 'month'
              and date_info in
                  ('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05',
                   '2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11')
              and st_brand_label = 1;
        """
        df_brand = self.spark.sql(sql2).cache()
        print("df_brand数量是:")
        print(df_brand.count())

        df_save = df_st.join(
            df_brand, on=['date_info', 'search_term'], how='left'
        ).drop('date_info')
        print("df_save数量是:")
        print(df_save.count())

        df_save = df_save.groupby(['brand']).agg(
            count('brand').alias('frequency')
        ).orderBy('frequency', ascending=False)
        df_save.show(20, False)

        df_save = df_save.withColumn("frequency", col("frequency").cast("int"))
        total_sum = df_save.select("frequency").groupBy().sum().collect()[0][0]
        if total_sum == df_st.count():
            print('验证成功')
        else:
            print('验证失败')

        output_path = "hdfs:///user/chenyuanjie/test1/"
        df_save.write.mode("overwrite").format("csv").option("delimiter", "^").option("lineSep", "\n").option("header", "false").option("compression", "none").save(output_path)


if __name__ == '__main__':
    obj = WordFrequency()
    obj.run()