import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from pyspark.sql.functions import count, col class WordFrequency(object): def __init__(self): self.spark = SparkUtil.get_spark_session("us_aba_last365_word_frequency") def run(self): sql1 = f""" select search_term, date_info from dwt_aba_st_analytics where site_name = 'us' and date_type = 'month' and date_info in ('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05', '2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11') and rank <= 1000000 and st_brand_label = 1; """ df_st = self.spark.sql(sql1).cache() print("df_st数量是:") print(df_st.count()) sql2 = f""" select search_term, first_match_brand as brand, date_info from dws_st_brand_info where site_name = 'us' and date_type = 'month' and date_info in ('2024-10', '2024-09', '2024-08', '2024-07', '2024-06', '2024-05', '2024-04', '2024-03', '2024-02', '2024-01', '2023-12', '2023-11') and st_brand_label = 1; """ df_brand = self.spark.sql(sql2).cache() print("df_brand数量是:") print(df_brand.count()) df_save = df_st.join( df_brand, on=['date_info', 'search_term'], how='left' ).drop('date_info') print("df_save数量是:") print(df_save.count()) df_save = df_save.groupby(['brand']).agg( count('brand').alias('frequency') ).orderBy('frequency', ascending=False) df_save.show(20, False) df_save = df_save.withColumn("frequency", col("frequency").cast("int")) total_sum = df_save.select("frequency").groupBy().sum().collect()[0][0] if total_sum == df_st.count(): print('验证成功') else: print('验证失败') output_path = "hdfs:///user/chenyuanjie/test1/" df_save.write.mode("overwrite").format("csv").option("delimiter", "^").option("lineSep", "\n").option("header", "false").option("compression", "none").save(output_path) if __name__ == '__main__': obj = WordFrequency() obj.run()