import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.spark_util import SparkUtil from utils.common_util import CommonUtil from pyspark.sql import functions as F, Window """ 广告搜索词当前竞价及近期历史dwt表计算 """ def handle_calc(): # 对前一日的数据进行分区合并 day_end = CommonUtil.format_now("%Y-%m-%d") CommonUtil.orctable_concatenate( hive_table="dim_st_pcp_history", partition_dict={ "date_info": CommonUtil.get_day_offset(day_end, -1) }, innerFlag=False, min_part_num=10, max_retry_time=5 ) spark = SparkUtil.get_spark_session("dwt_st_pcp_current") day_start = CommonUtil.get_day_offset(day_end, -5) df_all = spark.sql(f""" select site_id, group_id, keyword_id, keyword, match_type, created_at, min_bid, max_bid, suggested_bid, date_info from dim_st_pcp_history where date_info >= '{day_start}' and date_info <= '{day_end}' """) # 取最近的一天中的最小建议竞价的那一行作为过滤值 df_save = df_all.withColumn("day_row_number", F.row_number().over(Window.partitionBy(['keyword']).orderBy(F.col("date_info").desc()))) df_save = df_save.where("day_row_number == 1") df_save = df_save.withColumn("min_row_number", F.row_number().over(Window.partitionBy(['keyword']).orderBy(F.col("suggested_bid").asc()))) df_save = df_save.where("min_row_number == 1") df_history = df_all.groupby(F.col("keyword")).agg( F.collect_list(F.struct(F.col("min_bid"), F.col("max_bid"), F.col("suggested_bid"), F.col("created_at"))).alias("list") ) df_history = df_history.withColumn("history_json", F.when(F.size(F.col("list")) <= 1, F.lit(None)).otherwise(F.to_json(F.col("list")))) df_save = df_save.join(df_history, on=['keyword'], how='left').select( F.col('site_id'), F.col('group_id'), F.col('keyword_id'), df_save['keyword'], F.col('match_type'), F.col('created_at'), F.col('min_bid'), F.col('max_bid'), F.col('suggested_bid'), F.col('history_json'), ) # 删除后插入 # CommonUtil.build_hdfs_path() # 更新 CommonUtil.save_or_update_table( spark_session=spark, hive_tb_name="dwt_st_pcp_current", partition_dict={}, df_save=df_save ) print("success") if __name__ == '__main__': handle_calc()