import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from utils.templates import Templates

"""
获取搜索词对应asin标题最大最小数量等其他指标
依赖 dwd_asin_title_number 表 dwd_st_asin_measure 表
输出为 Dws_st_num_stats
"""


class DwsStNumStats(Templates):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.reset_partitions(partitions_num=1)
        self.asin_title_num = self.spark.sql("select 1+1;")
        self.st_asin_count = self.spark.sql("select 1+1;")
        self.self_asin = self.spark.sql("select 1+1;")
        self.search_term_id = self.spark.sql("select 1+1;")
        self.df_save = self.spark.sql("select 1+1;")
        self.db_save = "dws_st_num_stats"
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dws/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

    def read_data(self):
        print("读取dwd_asin_title_number:")
        sql1 = f"""
            select 
                asin,
                value
            from dwd_asin_title_number
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.asin_title_num = self.spark.sql(sql1).repartition(40, 'asin').cache()
        self.asin_title_num.show(10, truncate=True)

        print("读取dwd_st_asin_measure:")
        sql2 = f"""
            select 
                search_term,
                asin,
                count(asin) over (partition by search_term) as asin_count
            from dwd_st_asin_measure
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
            group by search_term, asin
        """
        self.st_asin_count = self.spark.sql(sql2).repartition(40, 'search_term', 'asin').cache()
        self.st_asin_count.show(10, truncate=True)

        print("读取ods_self_asin:")
        sql3 = f"""
            select 
                asin as max_num_asin,
                1 as is_self_max_num_asin
            from ods_self_asin 
            where site_name = '{self.site_name}'
        """
        self.self_asin = self.spark.sql(sql3).repartition(40, 'asin').cache()
        self.self_asin.show(10, truncate=True)

        print("读取ods_st_key:")
        sql4 = f"""
            select 
                cast(st_key as integer) as search_term_id,
                search_term
            from ods_st_key
            where site_name = '{self.site_name}'
        """
        self.search_term_id = self.spark.sql(sql4).repartition(40, 'search_term').cache()
        self.search_term_id.show(10, truncate=True)

    def handle_data(self):
        # st+asin维度与标题数量关联
        self.df_save = self.st_asin_count.join(
            self.asin_title_num, on='asin', how='left'
        ).fillna(
            {'value': 1}
        )
        # 计算搜索词下打包数量大于1count
        self.df_save = self.df_save.withColumn(
            'num_flag',
            F.when(F.col('value') > 1, 1).otherwise(0)
        )
        # 聚合,计算asin_count
        self.df_save = self.df_save.groupby('search_term').agg(
            F.max('asin_count').alias('asin_count'),
            F.sum('num_flag').alias('num_count'),
            F.max(F.struct('value', 'asin')).alias('max_row')
        ).repartition(30, 'search_term')
        # 关联搜索词id
        self.df_save = self.df_save.join(
            self.search_term_id, on='search_term', how='inner'
        )
        # 计算多数量占比
        self.df_save = self.df_save.withColumn(
            'max_num_asin',
            self.df_save.max_row.asin
        ).withColumn(
            'max_num',
            self.df_save.max_row.value
        ).withColumn(
            'most_proportion',
            F.round(F.col('num_count')/F.col('asin_count'), 3)
        )
        # 关联内部asin
        self.df_save = self.df_save.join(
            self.self_asin, on='max_num_asin', how='left'
        ).fillna(
            {'is_self_max_num_asin': 0}
        )
        # 字段入库前处理
        self.df_save = self.df_save.select(
            'search_term',
            'asin_count',
            'num_count',
            'max_num',
            'max_num_asin',
            'most_proportion',
            'search_term_id',
            'is_self_max_num_asin',
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwsStNumStats(site_name, date_type, date_info)
    obj.run()