import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.templates import Templates
from utils.common_util import CommonUtil
from datetime import datetime, timedelta
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
from utils.db_util import DBUtil


class DwtUserStoreCollectionsInfo(Templates):
    def __init__(self, site_name, date_type, date_info, run_type, seller_id_tuple):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.run_type = run_type
        self.seller_id_tuple = seller_id_tuple
        self.db_save = "dwt_user_store_collections_info"
        if self.run_type == 'real_time':
            self.seller_id_tuple = str(self.seller_id_tuple).split(',')
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}:{self.site_name}-{self.date_type}-{self.date_info}")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(100)
        self.previous_date_info = self.get_previous_date_info()

        # df初始化
        self.df_store_asin_detail = self.spark.sql("select 1+1;")
        self.df_previous_store_asin_detail = self.spark.sql("select 1+1;")
        self.df_user_store_collections_info = self.spark.sql("select 1+1;")

        # udf注册
        self.u_judge_store_label = self.spark.udf.register('u_judge_store_label', self.udf_judge_store_label,
                                                           StringType())

    @staticmethod
    def udf_judge_store_label(high_quantity_num, standard_ao_num, total_num, new_asin_sales_surge_num,
                              old_asin_sales_surge_num):
        if total_num > 0 and old_asin_sales_surge_num > 0:
            if high_quantity_num / total_num >= 0.8 and standard_ao_num / total_num >= 0.5 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.5:
                return 'A'
            elif high_quantity_num / total_num >= 0.5 and standard_ao_num / total_num >= 0.4 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.4:
                return 'B'
            elif high_quantity_num / total_num >= 0.3 and standard_ao_num / total_num >= 0.3 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.3:
                return 'C'
            elif high_quantity_num / total_num >= 0.1 and standard_ao_num / total_num >= 0.2 and new_asin_sales_surge_num / old_asin_sales_surge_num >= 0.2:
                return 'D'
            else:
                return 'E'
        else:
            return 'E'

    def get_previous_date_info(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = self.df_date.toPandas()
        df_loc = df.loc[df.date == f'{self.date_info}']
        current_date_id = list(df_loc.id)[0]
        previous_date_id = int(current_date_id) - 1
        df_loc = df.loc[df.id == previous_date_id]
        previous_date = list(df_loc.date)[0]
        return previous_date

    def read_data(self):
        print("1. 读取店铺收藏下asin详情")
        if self.run_type == 'real_time':
            hdfs_path = "/home/big_data_selection/tmp/user_collect_store_asin_detail_tmp/*.parquet"
            self.df_store_asin_detail = self.spark.read.parquet(hdfs_path)
        else:
            sql = f"""
                select seller_id, store_name, store_location, store_crawl_time, store_asin_total_num, asin, asin_price, 
                asin_rating, asin_total_comments, asin_ao_val, is_standard_ao, asin_bsr_rank, asin_bsr_orders, 
                category_first_id, category_first_name, category_id, category_name, parent_asin, asin_type, is_raise_asin, 
                is_popular_asin, is_high_quantity_asin, is_sales_surge_asin 
                from dws_user_collect_store_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}'
                and date_info='{self.date_info}'
            """
            print("sql=", sql)
            self.df_store_asin_detail = self.spark.sql(sqlQuery=sql).cache()

        print("2. 读取店铺收藏上个维度的asin详情")
        if self.run_type == 'real_time':
            sql1 = f"""
                select seller_id, asin, asin_price as previous_asin_price, asin_bsr_rank as previous_asin_bsr_rank
            from dws_user_collect_store_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}'
            and date_info='{self.previous_date_info}' and seller_id
            """
            query_store = ', '.join([f"'{value}'" for value in self.seller_id_tuple])
            sql2 = f" in ({query_store})"
            sql = sql1 + sql2
        else:
            sql = f"""
                select seller_id, asin, asin_price as previous_asin_price, asin_bsr_rank as previous_asin_bsr_rank
                from dws_user_collect_store_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type}'
                and date_info='{self.previous_date_info}'
            """
        print("sql=", sql)
        self.df_previous_store_asin_detail = self.spark.sql(sqlQuery=sql).cache()

    def handle_asin_change(self):
        self.df_store_asin_detail = self.df_store_asin_detail.join(
            self.df_previous_store_asin_detail, on=['seller_id', 'asin'], how='left'
        )
        # 判断asin价格是否上涨
        self.df_store_asin_detail = self.df_store_asin_detail.withColumn(
            "is_asin_price_raise",
            F.when(F.col("asin_price") - F.col("previous_asin_price") > 0, F.lit(1)).otherwise(F.lit(0))
        )
        # 判断asin价格是否下跌
        self.df_store_asin_detail = self.df_store_asin_detail.withColumn(
            "is_asin_price_decline",
            F.when(F.col("asin_price") - F.col("previous_asin_price") < 0, F.lit(1)).otherwise(F.lit(0))
        )
        # 判断asin的bsr排名是否上升超过一倍
        self.df_store_asin_detail = self.df_store_asin_detail.withColumn(
            "is_asin_rank_raise",
            F.when((F.col("previous_asin_bsr_rank").isNotNull()) & (
                    (F.col("asin_bsr_rank") - F.col("previous_asin_bsr_rank")) / F.col("previous_asin_bsr_rank")
                    <= -0.5), F.lit(1)).otherwise(F.lit(0))
        )
        # 判断asin的bsr排名是否下降超过一倍
        self.df_store_asin_detail = self.df_store_asin_detail.withColumn(
            "is_asin_rank_decline",
            F.when((F.col("previous_asin_bsr_rank").isNotNull()) & (
                    (F.col("asin_bsr_rank") - F.col("previous_asin_bsr_rank")) / F.col("previous_asin_bsr_rank")
                    >= 0.5), F.lit(1)).otherwise(F.lit(0))
        )
        self.df_store_asin_detail = self.df_store_asin_detail.drop("previous_asin_price", "previous_asin_bsr_rank")

    def handle_data_group(self):
        # 获取多数量占比
        df_variant_ratio = self.df_store_asin_detail.select("seller_id", "asin", "parent_asin").withColumn(
            "parent_asin", F.when(F.col("parent_asin").isNull(), F.col("asin")).otherwise(F.col("parent_asin")))
        df_variant_ratio = df_variant_ratio.groupby(['seller_id', 'parent_asin']).agg(
            F.count('asin').alias("asin_son_count")
        )
        df_variant_ratio = df_variant_ratio.withColumn("is_variant_flag", F.when(F.col("asin_son_count") > 1, F.lit(1)))

        df_variant_ratio = df_variant_ratio.groupby(['seller_id']).agg(
            F.sum("is_variant_flag").alias("store_more_variant_num"),
            F.count("parent_asin").alias("store_variant_asin_total")
        )
        df_variant_ratio = df_variant_ratio.withColumn(
            "store_page20_variant_rate",
            F.round(F.col("store_more_variant_num") / F.col("store_variant_asin_total"), 4))
        df_variant_ratio = df_variant_ratio.drop("store_more_variant_num", "store_variant_asin_total")

        self.df_user_store_collections_info = self.df_store_asin_detail.groupby(['seller_id']).agg(
            F.first("store_name").alias("store_name"),
            F.first("store_asin_total_num").alias("store_asin_total_num"),
            F.count("asin").alias("store_page20_asin_total_num"),
            F.count(F.when(F.col("asin_type") == 1, True)).alias("store_page20_new_asin_total_num"),
            F.count(F.when(F.col("asin_type") == 2, True)).alias("store_page20_old_asin_total_num"),
            F.round(F.avg("asin_price"), 4).alias("store_page20_asin_avg_price"),
            F.round(F.avg("asin_rating"), 4).alias("store_page20_asin_avg_rating"),
            F.round(F.avg("asin_total_comments"), 4).alias("store_page20_asin_avg_comments"),
            F.count(F.when(F.col("is_raise_asin") == 1, True)).alias("store_page20_raise_asin_num"),
            F.count(F.when(F.col("is_popular_asin") == 1, True)).alias("store_page20_popular_asin_num"),
            F.count(F.when(F.col("is_high_quantity_asin") == 1, True)).alias("store_page20_high_quantity_asin_num"),
            F.count(F.when((F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 1), True)).alias(
                "store_page20_new_asin_sales_surge_num"),
            F.count(F.when((F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 2), True)).alias(
                "store_page20_old_asin_sales_surge_num"),
            F.first("store_location").alias("store_location"),
            F.first("store_crawl_time").alias("store_crawl_time"),
            F.count(F.when(F.col("is_standard_ao") == 1, True)).alias("standard_ao_num"),
            F.sum("is_asin_price_raise").alias("store_page20_price_raise_asin_num"),
            F.sum("is_asin_price_decline").alias("store_page20_price_decline_asin_num"),
            F.sum("is_asin_rank_raise").alias("store_page20_rank_raise_asin_num"),
            F.sum("is_asin_rank_decline").alias("store_page20_rank_decline_asin_num")
        )
        self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn(
            "store_label_type", self.u_judge_store_label(F.col("store_page20_high_quantity_asin_num"),
                                                         F.col("standard_ao_num"), F.col("store_page20_asin_total_num"),
                                                         F.col("store_page20_new_asin_sales_surge_num"),
                                                         F.col("store_page20_old_asin_sales_surge_num"))
        )
        self.df_user_store_collections_info = self.df_user_store_collections_info.drop("standard_ao_num")
        self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn(
            "store_page20_new_asin_num_percent",
            F.round(F.col("store_page20_new_asin_total_num") / F.col("store_page20_asin_total_num"), 4))
        self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn(
            "store_page20_old_asin_num_percent",
            F.round(F.col("store_page20_old_asin_total_num") / F.col("store_page20_asin_total_num"), 4))
        df_store_raise_asin = self.df_store_asin_detail.filter("is_raise_asin=1").groupby(['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_raise_asin")
        )
        df_store_popular_asin = self.df_store_asin_detail.filter("is_popular_asin=1").groupby(['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_popular_asin")
        )
        df_store_high_quantity_asin = self.df_store_asin_detail.filter("is_high_quantity_asin=1").groupby(
            ['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_high_quantity_asin")
        )
        df_store_new_asin_sales_surge = self.df_store_asin_detail.filter(
            (F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 1)).groupby(['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_new_asin_sales_surge")
        )
        df_store_old_asin_sales_surge = self.df_store_asin_detail.filter(
            (F.col("is_sales_surge_asin") == 1) & (F.col("asin_type") == 2)).groupby(['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_old_asin_sales_surge")
        )
        df_store_price_raise_asin = self.df_store_asin_detail.filter("is_asin_price_raise=1").groupby(
            ['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_price_raise_asin")
        )
        df_store_price_decline_asin = self.df_store_asin_detail.filter("is_asin_price_decline=1").groupby(
            ['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_price_decline_asin")
        )
        df_store_rank_raise_asin = self.df_store_asin_detail.filter("is_asin_rank_raise=1").groupby(
            ['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_rank_raise_asin")
        )
        df_store_rank_decline_asin = self.df_store_asin_detail.filter("is_asin_rank_decline=1").groupby(
            ['seller_id']).agg(
            F.concat_ws(',', F.collect_list("asin")).alias("store_page20_rank_decline_asin")
        )
        self.df_user_store_collections_info = self.df_user_store_collections_info.join(
            df_variant_ratio, on=['seller_id'], how='left'
        ).join(
            df_store_raise_asin, on=['seller_id'], how='left'
        ).join(
            df_store_popular_asin, on=['seller_id'], how='left'
        ).join(
            df_store_high_quantity_asin, on=['seller_id'], how='left'
        ).join(
            df_store_new_asin_sales_surge, on=['seller_id'], how='left'
        ).join(
            df_store_old_asin_sales_surge, on=['seller_id'], how='left'
        ).join(
            df_store_price_raise_asin, on=['seller_id'], how='left'
        ).join(
            df_store_price_decline_asin, on=['seller_id'], how='left'
        ).join(
            df_store_rank_raise_asin, on=['seller_id'], how='left'
        ).join(
            df_store_rank_decline_asin, on=['seller_id'], how='left'
        )
        df_store_seller_num_info = self.df_user_store_collections_info.select("seller_id",
                                                                              "store_page20_asin_total_num")
        df_store_asin_category_id_info = self.df_store_asin_detail.filter("category_id is not null").groupby(
            ['seller_id', 'category_id']).agg(
            F.count("asin").alias("asin_count"),
            F.first("category_name").alias("en_name")
        )
        df_store_asin_category_id_info = df_store_asin_category_id_info.join(
            df_store_seller_num_info, on=['seller_id'], how='left'
        )
        df_store_asin_category_id_info = df_store_asin_category_id_info.withColumn(
            "asin_percent", F.round(F.col("asin_count") / F.col("store_page20_asin_total_num"), 4))
        df_store_category_id_agg = df_store_asin_category_id_info.groupby(['seller_id']).agg(
            F.collect_list(
                F.struct(F.col("category_id"), F.col("en_name"), F.col("asin_percent"), F.col("asin_count"))).alias(
                "category_value")
        )
        df_store_category_id_agg = df_store_category_id_agg.withColumn("store_current_category_percent",
                                                                       F.to_json("category_value"))

        df_store_category_id_agg = df_store_category_id_agg.drop("category_value")
        df_store_asin_category_first_id_info = self.df_store_asin_detail.filter(
            "category_first_id is not null").groupby(
            ['seller_id', 'category_first_id']).agg(
            F.count("asin").alias("asin_count"),
            F.first("category_first_name").alias("en_name")
        )
        df_store_asin_category_first_id_info = df_store_asin_category_first_id_info.join(
            df_store_seller_num_info, on=['seller_id'], how='left'
        )
        df_store_asin_category_first_id_info = df_store_asin_category_first_id_info.withColumn(
            "asin_percent", F.round(F.col("asin_count") / F.col("store_page20_asin_total_num"), 4))
        df_store_category_first_id_agg = df_store_asin_category_first_id_info.groupby(['seller_id']).agg(
            F.collect_list(F.struct(F.col("category_first_id"), F.col("en_name"), F.col("asin_percent"),
                                    F.col("asin_count"))).alias(
                "category_first_vale")
        )
        df_store_category_first_id_agg = df_store_category_first_id_agg.withColumn("store_first_category_percent",
                                                                                   F.to_json("category_first_vale"))
        df_store_category_first_id_agg = df_store_category_first_id_agg.drop("category_first_vale")
        self.df_user_store_collections_info = self.df_user_store_collections_info.join(
            df_store_category_id_agg, on=['seller_id'], how='left'
        ).join(
            df_store_category_first_id_agg, on=['seller_id'], how='left'
        )
        self.df_user_store_collections_info = self.df_user_store_collections_info.withColumn(
            "store_new_flag", F.when(F.col("store_page20_new_asin_num_percent") >= 0.5, F.lit(1)).when(
                F.col("store_page20_old_asin_num_percent") >= 0.5, F.lit(2)).otherwise(F.lit(0)))

    def handle_data_complete(self):
        self.df_save = self.df_user_store_collections_info
        if self.run_type != 'real_time':
            self.df_save = self.df_save.withColumn("created_time",
                                                   F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
                withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
            self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
            self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
            self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))

    def handle_data(self):
        self.handle_asin_change()
        self.handle_data_group()
        self.handle_data_complete()

    def save_data(self):
        if self.run_type == 'real_time':
            engine = DBUtil.get_db_engine("postgresql", self.site_name)
            pg_con_info = DBUtil.get_connection_info("postgresql", self.site_name)
            export_tmp_tb = 'user_store_collections_info_tmp'
            export_tb = 'user_store_collections_info'
            sql = f"""
                   truncate table {export_tmp_tb};
               """

            DBUtil.engine_exec_sql(engine, sql)
            connection_properties = {
                "user": pg_con_info["username"],
                "password": pg_con_info["pwd"],
                "driver": "org.postgresql.Driver"
            }
            pg_url = pg_con_info["url"]
            df_save = self.df_save
            df_save.write.jdbc(url=pg_url, table=export_tmp_tb, mode="overwrite", properties=connection_properties)
            after_sql = f"""
                insert into {export_tb}(seller_id, store_asin_total_num, store_page20_asin_total_num, 
                store_page20_variant_rate, store_page20_new_asin_total_num, store_page20_new_asin_num_percent, 
                store_page20_old_asin_total_num, store_page20_old_asin_num_percent, store_page20_asin_avg_price, 
                store_page20_asin_avg_rating, store_page20_asin_avg_comments, store_page20_raise_asin_num, 
                store_page20_raise_asin, store_page20_popular_asin_num, store_page20_popular_asin, 
                store_page20_high_quantity_asin_num, store_page20_high_quantity_asin, 
                store_page20_new_asin_sales_surge_num, store_page20_new_asin_sales_surge, 
                store_page20_old_asin_sales_surge_num, store_page20_old_asin_sales_surge, store_location, 
                store_crawl_time, store_first_category_percent, store_current_category_percent, store_label_type, 
                store_name, store_page20_price_raise_asin_num, store_page20_price_raise_asin, 
                store_page20_price_decline_asin_num, store_page20_price_decline_asin, store_page20_rank_raise_asin_num, 
                store_page20_rank_raise_asin, store_page20_rank_decline_asin_num, store_page20_rank_decline_asin, store_new_flag) 
                select 
                    seller_id, store_asin_total_num, store_page20_asin_total_num, 
                    store_page20_variant_rate, store_page20_new_asin_total_num, store_page20_new_asin_num_percent, 
                    store_page20_old_asin_total_num, store_page20_old_asin_num_percent, store_page20_asin_avg_price, 
                    store_page20_asin_avg_rating, store_page20_asin_avg_comments, store_page20_raise_asin_num, 
                    store_page20_raise_asin, store_page20_popular_asin_num, store_page20_popular_asin, 
                    store_page20_high_quantity_asin_num, store_page20_high_quantity_asin, 
                    store_page20_new_asin_sales_surge_num, store_page20_new_asin_sales_surge, 
                    store_page20_old_asin_sales_surge_num, store_page20_old_asin_sales_surge, store_location, 
                    store_crawl_time, store_first_category_percent, store_current_category_percent, store_label_type, 
                    store_name, store_page20_price_raise_asin_num, store_page20_price_raise_asin, 
                    store_page20_price_decline_asin_num, store_page20_price_decline_asin, 
                    store_page20_rank_raise_asin_num, store_page20_rank_raise_asin, store_page20_rank_decline_asin_num, 
                    store_page20_rank_decline_asin, store_new_flag
                from {export_tmp_tb}
                ON CONFLICT (seller_id)
                DO UPDATE SET
                    store_asin_total_num = excluded.store_asin_total_num,
                    store_page20_asin_total_num = excluded.store_page20_asin_total_num,
                    store_page20_variant_rate = excluded.store_page20_variant_rate,
                    store_page20_new_asin_total_num = excluded.store_page20_new_asin_total_num,
                    store_page20_new_asin_num_percent = excluded.store_page20_new_asin_num_percent,
                    store_page20_old_asin_total_num = excluded.store_page20_old_asin_total_num,
                    store_page20_old_asin_num_percent = excluded.store_page20_old_asin_num_percent,
                    store_page20_asin_avg_price = excluded.store_page20_asin_avg_price,
                    store_page20_asin_avg_rating = excluded.store_page20_asin_avg_rating,
                    store_page20_asin_avg_comments = excluded.store_page20_asin_avg_comments,
                    store_page20_raise_asin_num = excluded.store_page20_raise_asin_num,
                    store_page20_raise_asin = excluded.store_page20_raise_asin,
                    store_page20_popular_asin_num = excluded.store_page20_popular_asin_num,
                    store_page20_popular_asin = excluded.store_page20_popular_asin,
                    store_page20_high_quantity_asin_num = excluded.store_page20_high_quantity_asin_num,
                    store_page20_high_quantity_asin = excluded.store_page20_high_quantity_asin,
                    store_page20_new_asin_sales_surge_num = excluded.store_page20_new_asin_sales_surge_num,
                    store_page20_new_asin_sales_surge = excluded.store_page20_new_asin_sales_surge,
                    store_page20_old_asin_sales_surge_num = excluded.store_page20_old_asin_sales_surge_num,
                    store_page20_old_asin_sales_surge = excluded.store_page20_old_asin_sales_surge,
                    store_location = excluded.store_location,
                    store_crawl_time = excluded.store_crawl_time,
                    store_first_category_percent = excluded.store_first_category_percent,
                    store_current_category_percent = excluded.store_current_category_percent,
                    store_label_type = excluded.store_label_type,
                    store_name = excluded.store_name,
                    store_page20_price_raise_asin_num = excluded.store_page20_price_raise_asin_num,
                    store_page20_price_raise_asin = excluded.store_page20_price_raise_asin,
                    store_page20_price_decline_asin_num = excluded.store_page20_price_decline_asin_num,
                    store_page20_price_decline_asin = excluded.store_page20_price_decline_asin,
                    store_page20_rank_raise_asin_num = excluded.store_page20_rank_raise_asin_num,
                    store_page20_rank_raise_asin = excluded.store_page20_rank_raise_asin,
                    store_page20_rank_decline_asin_num = excluded.store_page20_rank_decline_asin_num,
                    store_page20_rank_decline_asin = excluded.store_page20_rank_decline_asin,
                    store_new_flag = excluded.store_new_flag,
                    created_time = now(),
                    updated_time = now();         
            """
            DBUtil.engine_exec_sql(engine, after_sql)
        else:
            Templates.save_data(self)

    def run(self):
        self.read_data()
        self.handle_data()
        self.save_data()


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    run_type = sys.argv[4]
    seller_id_tuple = sys.argv[5]
    assert site_name is not None, "site_name 不能为空!"
    assert date_type is not None, "date_type 不能为空!"
    assert date_info is not None, "date_info 不能为空!"
    obj = DwtUserStoreCollectionsInfo(site_name=site_name, date_type=date_type, date_info=date_info, run_type=run_type,
                                      seller_id_tuple=seller_id_tuple)
    obj.run()