"""
   @Author      : HuangJian
   @Description : 找到月关键词下zr类型的top10asin和ac类型asin
   @SourceTable :
                  1.ods_st_key
                  2.dim_st_asin_info
                  3.dwd_st_measure

   @SinkTable   : dwt_st_top_asin_info
   @CreateTime  : 2023/02/13 17:01
   @UpdateTime  : 2023/02/13 17:01
"""

import os
import sys
import re
from functools import reduce

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, DoubleType
from utils.common_util import CommonUtil, DateTypes


class DwtStTopAsinInfo(Templates):
    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_st_top_asin_info"
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}, {self.site_name}, {self.date_type}, {self.date_info}")
        self.partitions_num = 10
        self.reset_partitions(partitions_num=self.partitions_num)
        self.partitions_by = ['site_name', 'date_type', 'date_info']

        self.get_date_info_tuple()
        self.get_year_month_days_dict(year=int(self.year))
        # 获取周流程的周tuple整合数据
        self.complete_date_info_tuple = CommonUtil.transform_week_tuple(self.spark, self.date_type, self.date_info)
        self.df_st_key = self.spark.sql(f"select 1+1;")
        self.df_st_asin_zr = self.spark.sql(f"select 1+1;")
        self.df_st_asin_ac = self.spark.sql(f"select 1+1;")
        self.df_st_zr = self.spark.sql(f"select 1+1;")
        self.df_st_ac = self.spark.sql(f"select 1+1;")
        self.df_zr_info = self.spark.sql(f"select 1+1;")
        self.df_ac_info = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.date_sql = self.date_sql_padding()

    def date_sql_padding(self):
        if self.site_name == 'us':
            if self.date_type == DateTypes.month_week.name:
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            elif self.date_type == DateTypes.month.name and self.date_info >= '2023-10':
                date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
            else:
                date_sql = f"and date_type='week' and date_info in {self.complete_date_info_tuple}"
        elif self.site_name in ['uk', 'de'] and self.date_type == DateTypes.month.name and self.date_info >= '2024-05':
            date_sql = f" and date_type='{self.date_type}' and date_info = '{self.date_info}'"
        else:
            date_sql = f" and date_type='week' and date_info in {self.complete_date_info_tuple}"
        print(date_sql)
        return date_sql

    def read_data(self):
        print("1.1 读取st的key: ods_st_key表")
        sql = f"select search_term,cast(st_key as int) as search_term_id from ods_st_key where site_name = '{self.site_name}' "
        print("sql:", sql)
        self.df_st_key = self.spark.sql(sqlQuery=sql)

        print("1.3 读取关键词的zr类型数据: ods_search_term_zr")
        sql = f"""select 
            search_term,
            asin,
            page_row as zr_rank,
            'zr' as data_type,date_info from ods_search_term_zr 
            where
            site_name='{self.site_name}' {self.date_sql} and page_row <=10 ;"""
        print("sql:", sql)
        self.df_st_asin_zr = self.spark.sql(sqlQuery=sql).cache()

        print("1.4 读取关键词的ac类型数据: ods_search_term_ac")
        sql = f"""select search_term,
                         asin,
                         null as zr_rank,
                         'ac' as data_type
                  from (
                           select
                                  search_term,
                                  asin,
                                  row_number() over (partition by search_term order by date_info desc, updated_time asc) as ac_rank
                           from ods_search_term_ac
                           where site_name = '{self.site_name}' {self.date_sql} and page = 1) t1
                  where t1.ac_rank = 1;"""
        print("sql:", sql)
        self.df_st_asin_ac = self.spark.sql(sqlQuery=sql).cache()

    def handle_data(self):
        self.handle_st_asin_duplicated()
        self.handle_st_asin_join()

        # 日期字段补全
        self.df_save = self.df_save.withColumn("created_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        self.df_save = self.df_save.withColumn("updated_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        # 补全分区字段
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))

    def handle_st_asin_duplicated(self):
        self.df_zr_info = self.df_st_asin_zr.select("search_term", "date_info")

        # 对zr类型搜索词进行去重:按搜索词取日期最大 得到最后出现该搜索词日期
        window_zr = Window.partitionBy(['search_term']).orderBy(
            self.df_zr_info.date_info.desc()
        )
        self.df_zr_info = self.df_zr_info \
            .withColumn("zr_rank_row", F.row_number().over(window=window_zr))
        self.df_zr_info = self.df_zr_info.filter("zr_rank_row=1")
        self.df_zr_info = self.df_zr_info.drop("zr_rank_row")
        # self.df_st_asin_zr = self.df_st_asin_zr.cache()

        # 得到最近日期zr类型的st-asin
        self.df_st_asin_zr = self.df_zr_info.join(
            self.df_st_asin_zr, on=['search_term', 'date_info'], how="inner")
        self.df_st_asin_zr = self.df_st_asin_zr.drop("date_info")
        # 对zr类型的同周期下进行去重按照st-asin-rank

        self.df_st_asin_zr = self.df_st_asin_zr.drop_duplicates(['search_term', 'asin', 'zr_rank'])

        # 对zr词进行二次排序
        self.df_st_asin_zr = self.df_st_asin_zr.groupby(['search_term', 'asin', 'data_type']).agg(
            F.max('zr_rank').alias('max_zr_rank')
        )

        print(self.df_st_asin_zr.columns)

        window_zr_rank = Window.partitionBy(['search_term']).orderBy(
            self.df_st_asin_zr.max_zr_rank.asc()
        )
        self.df_st_asin_zr = self.df_st_asin_zr.withColumn('zr_rank', F.row_number().over(window=window_zr_rank))
        self.df_st_asin_zr = self.df_st_asin_zr.filter("zr_rank <= 10")
        self.df_st_asin_zr = self.df_st_asin_zr.drop('max_zr_rank')

    def handle_st_asin_join(self):
        # 将zr类型关键词和ac类型关键词union
        df_st_asin_union = self.df_st_asin_zr.unionByName(self.df_st_asin_ac, allowMissingColumns=False)

        # st关联获取search_term_id
        self.df_save = df_st_asin_union.join(
            self.df_st_key, on=['search_term'], how="inner"
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtStTopAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()