dwt_st_pcp_current.py 5.33 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F, Window

"""
广告搜索词当前竞价及近期历史dwt表计算
"""


def handle_calc():
    # 对前一日的数据进行分区合并
    day_end = CommonUtil.format_now("%Y-%m-%d")
    CommonUtil.orctable_concatenate(
        hive_table="dim_st_pcp_history",
        partition_dict={
            "date_info": CommonUtil.get_day_offset(day_end, -1)
        },
        innerFlag=False,
        min_part_num=10,
        max_retry_time=5
    )
    spark = SparkUtil.get_spark_session("dwt_st_pcp_current")
    day_start = CommonUtil.get_day_offset(day_end, -5)
    df_all = spark.sql(f"""
            select site_id,
               group_id,
               keyword_id,
               keyword,
               match_type,
               created_at,
               min_bid,
               max_bid,
               suggested_bid,
               date_info
            from dim_st_pcp_history 
            where date_info >= '{day_start}'
            and date_info <= '{day_end}'
            """)
    # 取最近的一天中的最小建议竞价的那一行作为过滤值
    df_save = df_all.withColumn("day_row_number",
                                F.row_number().over(Window.partitionBy(['keyword']).orderBy(F.col("date_info").desc())))
    df_save = df_save.where("day_row_number == 1")

    df_save = df_save.withColumn("min_row_number",
                                 F.row_number().over(Window.partitionBy(['keyword']).orderBy(F.col("suggested_bid").asc())))

    df_save = df_save.where("min_row_number == 1")

    df_history = df_all.groupby(F.col("keyword")).agg(
        F.collect_list(F.struct(F.col("min_bid"), F.col("max_bid"), F.col("suggested_bid"), F.col("created_at"))).alias("list")
    )

    df_history = df_history.withColumn("history_json",
                                       F.when(F.size(F.col("list")) <= 1, F.lit(None)).otherwise(F.to_json(F.col("list"))))

    df_save = df_save.join(df_history, on=['keyword'], how='left').select(
        F.col('site_id'),
        F.col('group_id'),
        F.col('keyword_id'),
        df_save['keyword'],
        F.col('match_type'),
        F.col('created_at'),
        F.col('min_bid'),
        F.col('max_bid'),
        F.col('suggested_bid'),
        F.col('history_json'),
    )
    # 删除后插入
    # CommonUtil.build_hdfs_path()
    # 更新
    CommonUtil.save_or_update_table(
        spark_session=spark,
        hive_tb_name="dwt_st_pcp_current",
        partition_dict={},
        df_save=df_save
    )
    print("success")


def handle_calc_new():
    day_end = CommonUtil.format_now("%Y-%m-%d")
    CommonUtil.orctable_concatenate(
        hive_table="dim_st_pcp_history",
        partition_dict={
            "date_info": CommonUtil.get_day_offset(day_end, -1)
        },
        innerFlag=False,
        min_part_num=10,
        max_retry_time=5
    )
    spark = SparkUtil.get_spark_session("dwt_st_pcp_current")
    day_start = CommonUtil.get_day_offset(day_end, -90)
    df_all = spark.sql(f"""
            select site_id,
               group_id,
               keyword_id,
               keyword,
               match_type,
               created_at,
               min_bid,
               max_bid,
               suggested_bid,
               date_info
            from dim_st_pcp_history 
            where date_info >= '{day_start}'
            and date_info <= '{day_end}'
            """)

    window = Window.partitionBy(['site_id', 'match_type', 'keyword'])
    df_all = df_all.where("site_id is not null and created_at is not null")
    # 去重
    df_all = df_all.dropDuplicates(['site_id', 'match_type', 'keyword', 'date_info'])

    # 获取最小的那天
    df_save = df_all.withColumn("day_row_number",
                                F.row_number().over(window.orderBy(F.col("date_info").desc())))
    df_save = df_save.where("day_row_number == 1")

    # 取最近的一天中的最小建议竞价的那一行作为过滤值
    df_save = df_save.withColumn("min_row_number",
                                 F.row_number().over(window.orderBy(F.col("suggested_bid").asc())))

    df_save = df_save.where("min_row_number == 1")

    df_history = df_all.groupby([F.col("site_id"), F.col("keyword"), F.col("match_type")]).agg(
        F.collect_list(F.struct(F.col("min_bid"), F.col("max_bid"), F.col("suggested_bid"), F.col("created_at"))).alias("list")
    )

    df_history = df_history.withColumn("history_json",
                                       F.when(F.size(F.col("list")) <= 1, F.lit(None)).otherwise(F.to_json(F.col("list"))))

    df_save = df_save.join(df_history, on=['site_id', 'keyword', 'match_type'], how='left').select(
        df_save['site_id'],
        F.col('group_id'),
        F.col('keyword_id'),
        df_save['keyword'],
        df_save['match_type'],
        F.col('created_at'),
        F.col('min_bid'),
        F.col('max_bid'),
        F.col('suggested_bid'),
        F.col('history_json'),
        F.lit("90").alias("day")
    )
    # 更新
    CommonUtil.save_or_update_table(
        spark_session=spark,
        hive_tb_name="dwt_st_pcp_current_v2",
        partition_dict={
            "day": "90"
        },
        df_save=df_save
    )
    print("success")


if __name__ == '__main__':
    handle_calc_new()