aba_2023_10_12_export.py 2.66 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from pyspark.sql.functions import col


if __name__ == '__main__':
    spark = SparkUtil.get_spark_session("ABA_2023_10_12_export")

    sql1 = """
        select 
            date_info,
            search_term,
            st_bsr_cate_1_id_new as category_id,
            market_cycle_type,
            is_first_text,
            is_ascending_text,
            is_high_return_text,
            is_search_text,
            st_movie_label,
            st_brand_label,
            bsr_orders,
            st_word_num,
            st_num,
            rank
        from dwt_aba_st_analytics
        where site_name = 'us' 
        and date_type = 'month' 
        and date_info in ('2023-10','2023-11','2023-12');
    """
    df_dwt_aba_st_analytics = spark.sql(sql1).cache()

    sql2 = """
        select 
            category_id, 
            en_name
        from dim_bsr_category_tree
        where site_name = 'us'
        and category_parent_id = 0;
    """
    df_dim_bsr_category_tree = spark.sql(sql2).cache()

    sql3 = """
        select 
            search_term,
            rank_change_rate,
            rank_rate_of_change,
            date_info
        from dwt_aba_last_change_rate
        where site_name = 'us' 
        and date_type = 'month' 
        and date_info in ('2023-10','2023-11','2023-12');
    """
    df_dwt_aba_last_change_rate = spark.sql(sql3).cache()

    # 过滤出满足条件的词
    df_dwt_aba_st_analytics = df_dwt_aba_st_analytics.filter(
        "(is_first_text = 1) or (is_ascending_text = 1) or (market_cycle_type in (1, 2))"
    )

    df_save = df_dwt_aba_st_analytics.join(
        df_dim_bsr_category_tree, on='category_id', how='left'
    ).join(
        df_dwt_aba_last_change_rate, on=['date_info', 'search_term'], how='left'
    )

    df_save = df_save.select(
        col('date_info').alias('year_month'),
        col('search_term'),
        col('en_name').alias('category'),
        col('market_cycle_type'),
        col('is_first_text'),
        col('is_ascending_text'),
        col('is_high_return_text'),
        col('is_search_text'),
        col('st_movie_label').alias('movie_label'),
        col('st_brand_label').alias('brand_label'),
        col('bsr_orders'),
        col('st_word_num').alias('word_counts'),
        col('st_num').alias('word_frequency'),
        col('rank'),
        col('rank_change_rate').alias('year_on_year'),
        col('rank_rate_of_change').alias('month_on_month')
    )
    df_save.repartition(5).show(10, truncate=True)
    df_save.write.saveAsTable(name='tmp_aba_2023_export', format='hive', mode='append')
    spark.stop()