import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from pyspark.sql.functions import col if __name__ == '__main__': spark = SparkUtil.get_spark_session("ABA_2023_10_12_export") sql1 = """ select date_info, search_term, st_bsr_cate_1_id_new as category_id, market_cycle_type, is_first_text, is_ascending_text, is_high_return_text, is_search_text, st_movie_label, st_brand_label, bsr_orders, st_word_num, st_num, rank from dwt_aba_st_analytics where site_name = 'us' and date_type = 'month' and date_info in ('2023-10','2023-11','2023-12'); """ df_dwt_aba_st_analytics = spark.sql(sql1).cache() sql2 = """ select category_id, en_name from dim_bsr_category_tree where site_name = 'us' and category_parent_id = 0; """ df_dim_bsr_category_tree = spark.sql(sql2).cache() sql3 = """ select search_term, rank_change_rate, rank_rate_of_change, date_info from dwt_aba_last_change_rate where site_name = 'us' and date_type = 'month' and date_info in ('2023-10','2023-11','2023-12'); """ df_dwt_aba_last_change_rate = spark.sql(sql3).cache() # 过滤出满足条件的词 df_dwt_aba_st_analytics = df_dwt_aba_st_analytics.filter( "(is_first_text = 1) or (is_ascending_text = 1) or (market_cycle_type in (1, 2))" ) df_save = df_dwt_aba_st_analytics.join( df_dim_bsr_category_tree, on='category_id', how='left' ).join( df_dwt_aba_last_change_rate, on=['date_info', 'search_term'], how='left' ) df_save = df_save.select( col('date_info').alias('year_month'), col('search_term'), col('en_name').alias('category'), col('market_cycle_type'), col('is_first_text'), col('is_ascending_text'), col('is_high_return_text'), col('is_search_text'), col('st_movie_label').alias('movie_label'), col('st_brand_label').alias('brand_label'), col('bsr_orders'), col('st_word_num').alias('word_counts'), col('st_num').alias('word_frequency'), col('rank'), col('rank_change_rate').alias('year_on_year'), col('rank_rate_of_change').alias('month_on_month') ) df_save.repartition(5).show(10, truncate=True) df_save.write.saveAsTable(name='tmp_aba_2023_export', format='hive', mode='append') spark.stop()