import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.spark_util import SparkUtil
from utils.common_util import CommonUtil
from pyspark.sql import functions as F, Window

"""
广告搜索词当前竞价及近期历史dwt表计算
"""


def handle_calc():
    # 对前一日的数据进行分区合并
    day_end = CommonUtil.format_now("%Y-%m-%d")
    CommonUtil.orctable_concatenate(
        hive_table="dim_st_pcp_history",
        partition_dict={
            "date_info": CommonUtil.get_day_offset(day_end, -1)
        },
        innerFlag=False,
        min_part_num=10,
        max_retry_time=5
    )
    spark = SparkUtil.get_spark_session("dwt_st_pcp_current")
    day_start = CommonUtil.get_day_offset(day_end, -5)
    df_all = spark.sql(f"""
            select site_id,
               group_id,
               keyword_id,
               keyword,
               match_type,
               created_at,
               min_bid,
               max_bid,
               suggested_bid,
               date_info
            from dim_st_pcp_history 
            where date_info >= '{day_start}'
            and date_info <= '{day_end}'
            """)
    # 取最近的一天中的最小建议竞价的那一行作为过滤值
    df_save = df_all.withColumn("day_row_number",
                                F.row_number().over(Window.partitionBy(['keyword']).orderBy(F.col("date_info").desc())))
    df_save = df_save.where("day_row_number == 1")

    df_save = df_save.withColumn("min_row_number",
                                 F.row_number().over(Window.partitionBy(['keyword']).orderBy(F.col("suggested_bid").asc())))

    df_save = df_save.where("min_row_number == 1")

    df_history = df_all.groupby(F.col("keyword")).agg(
        F.collect_list(F.struct(F.col("min_bid"), F.col("max_bid"), F.col("suggested_bid"), F.col("created_at"))).alias("list")
    )

    df_history = df_history.withColumn("history_json",
                                       F.when(F.size(F.col("list")) <= 1, F.lit(None)).otherwise(F.to_json(F.col("list"))))

    df_save = df_save.join(df_history, on=['keyword'], how='left').select(
        F.col('site_id'),
        F.col('group_id'),
        F.col('keyword_id'),
        df_save['keyword'],
        F.col('match_type'),
        F.col('created_at'),
        F.col('min_bid'),
        F.col('max_bid'),
        F.col('suggested_bid'),
        F.col('history_json'),
    )
    # 删除后插入
    # CommonUtil.build_hdfs_path()
    # 更新
    CommonUtil.save_or_update_table(
        spark_session=spark,
        hive_tb_name="dwt_st_pcp_current",
        partition_dict={},
        df_save=df_save
    )
    print("success")


if __name__ == '__main__':
    handle_calc()