update_merchantwords_measure.py 2.21 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.sql.functions import row_number
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil

if __name__ == '__main__':
    spark = SparkUtil.get_spark_session("UpdateMerchantwords")
    hive_tb = 'dwd_merchantwords_measure'
    partition_dict = {
        "site_name": 'us',
        "batch": '2023-01'
    }
    sql1 = f"""
    select 
        keyword,  
        lang, 
        st_ao_val, 
        st_zr_flow_proportion, 
        min_bid, 
        max_bid, 
        suggested_bid, 
        volume, 
        avg_3m, 
        avg_12m, 
        asin_total_num, 
        asin_num, 
        self_asin_num, 
        self_asin_proportion, 
        st_sp_counts, 
        st_zr_counts, 
        st_monthly_sales, 
        listing_sales_avg, 
        reviews_avg, 
        rating_avg, 
        price_avg, 
        depth 
    from dwd_merchantwords_measure 
    where site_name = 'us'
    and batch = '2023-01';
    """
    df_dwd = spark.sql(sqlQuery=sql1).cache()
    df_dwd.repartition(80)

    sql2 = f"""
    select 
        keyword, 
        results_count, 
        sponsored_ads_count, 
        page_1_reviews, 
        appearance, 
        last_seen, 
        update_time 
    from dwt_merchantwords_st_detail 
    where site_name = 'us'
    and batch = '2023-1';
    """
    df_merchantwords_detail = spark.sql(sqlQuery=sql2)
    df_merchantwords_detail = df_merchantwords_detail \
        .withColumn("row_num", row_number().over(Window.orderBy("keyword"))) \
        .filter("row_num BETWEEN 1 AND 12000000") \
        .repartition(80) \
        .drop("row_num") \
        .cache()
    df = df_dwd.join(df_merchantwords_detail, 'keyword', 'left')
    df = df.withColumn(
        'site_name',
        F.lit('us')
    ).withColumn(
        'batch',
        F.lit('2023-01')
    )
    CommonUtil.save_or_update_table(spark_session=spark,
                                    hive_tb_name=hive_tb,
                                    partition_dict=partition_dict,
                                    df_save=df,
                                    drop_exist_tmp_flag=True)