import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.sql.functions import row_number from pyspark.sql import functions as F from pyspark.sql.window import Window from utils.common_util import CommonUtil from utils.spark_util import SparkUtil if __name__ == '__main__': spark = SparkUtil.get_spark_session("UpdateMerchantwords") hive_tb = 'dwd_merchantwords_measure' partition_dict = { "site_name": 'us', "batch": '2023-01' } sql1 = f""" select keyword, lang, st_ao_val, st_zr_flow_proportion, min_bid, max_bid, suggested_bid, volume, avg_3m, avg_12m, asin_total_num, asin_num, self_asin_num, self_asin_proportion, st_sp_counts, st_zr_counts, st_monthly_sales, listing_sales_avg, reviews_avg, rating_avg, price_avg, depth from dwd_merchantwords_measure where site_name = 'us' and batch = '2023-01'; """ df_dwd = spark.sql(sqlQuery=sql1).cache() df_dwd.repartition(80) sql2 = f""" select keyword, results_count, sponsored_ads_count, page_1_reviews, appearance, last_seen, update_time from dwt_merchantwords_st_detail where site_name = 'us' and batch = '2023-1'; """ df_merchantwords_detail = spark.sql(sqlQuery=sql2) df_merchantwords_detail = df_merchantwords_detail \ .withColumn("row_num", row_number().over(Window.orderBy("keyword"))) \ .filter("row_num BETWEEN 1 AND 12000000") \ .repartition(80) \ .drop("row_num") \ .cache() df = df_dwd.join(df_merchantwords_detail, 'keyword', 'left') df = df.withColumn( 'site_name', F.lit('us') ).withColumn( 'batch', F.lit('2023-01') ) CommonUtil.save_or_update_table(spark_session=spark, hive_tb_name=hive_tb, partition_dict=partition_dict, df_save=df, drop_exist_tmp_flag=True)