"""
author: wangrui
description: 根据dim_st_asin_info表得到新增的zr类型asin信息并导出到pg数据库存储
table_read_name: dim_st_asin_info
table_save_name: dwt_zr_asin_info
table_save_level: dwt
version: 2.0
created_date: 2023-04-23
updated_date: 2023-04-23
"""

import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
import re
from pyspark.storagelevel import StorageLevel


class DwtZrAsinInfo(Templates):

    def __init__(self, site_name="us"):
        super().__init__()
        self.site_name = site_name
        self.db_save = f"dwt_zr_asin_info"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}")
        self.df_date = self.get_year_week_tuple()
        self.get_date_info_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_asin_info = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'asin_type']
        self.reset_partitions(30)
        self.u_get_zr_asin_type = self.spark.udf.register("u_get_zr_asin_type", self.udf_get_zr_asin_type, StringType())
        self.u_change_data_type = self.spark.udf.register("u_change_data_type", self.udf_change_data_type, StringType())

    @staticmethod
    def udf_get_zr_asin_type(asin):
        pattern_list = [
            ("^B0[1-6][A-Z0-9]*", "B01_B06"), ("^B07[0-9][A-Z0-9]*", "B070_B079"),
            ("^B07[A-Z][A-Z0-9]*", "B07A_B07Z"), ("^B08[0-9][A-Z0-9]*", "B080_B089"),
            ("^B08[A-Z][A-Z0-9]*", "B08A_B08Z"), ("^B09[0-9][A-Z0-9]*", "B090_B099"),
            ("^B09[A-N][A-Z0-9]*", "B09A_B09N"), ("^B09[O-Z][A-Z0-9]*", "B09O_B09Z"),
            ("^B0B[0-9][A-Z0-9]*", "B0B0_B0B9"), ("^B0B[A-G][A-Z0-9]*", "B0BA_B0BG"),
            ("^B0B[H-K][A-Z0-9]*", "B0BH_B0BK"), ("^B0B[L-N][A-Z0-9]*", "B0BL_B0BN"),
            ("^B0B[O-R][A-Z0-9]*", "B0BO_B0BR"), ("^B0B[S-V][A-Z0-9]*", "B0BS_B0BV"),
            ("^B0B[W-Z][A-Z0-9]*", "B0BW_B0BZ")
        ]
        if asin is not None:
            asin_type = 'OTHER'
            asin = str(asin).upper()
            for pattern, type_value in pattern_list:
                if re.match(pattern, asin):
                    asin_type = type_value
                    break
            return asin_type

    @staticmethod
    def udf_change_data_type(column):
        column = str(column).replace("[", "{").replace("]", "}")
        return column

    def read_data(self):
        print("1. 读取dwt_flow_asin表")
        sql = f"""select asin, asin_zr_counts, asin_sp_counts, asin_sb_counts, round(asin_ao_val,3) as asin_ao_val, asin_price, 
            bsr_orders as asin_bsr_orders, round(sales, 2) as bsr_orders_sale, concat(date_info, '-15') as cur_time from dwt_flow_asin 
            where site_name='{self.site_name}' and date_type='month'
        """
        print("sql:", sql)
        self.df_asin_info = self.spark.sql(sqlQuery=sql)
        self.df_asin_info = self.df_asin_info.drop_duplicates(['asin', 'cur_time'])
        self.df_asin_info = self.df_asin_info.repartition(100).persist(StorageLevel.DISK_ONLY)
        self.df_asin_info.show(10, truncate=False)

    def get_asin_info_list(self):
        self.df_asin_info = self.df_asin_info.na.fill(
            {"asin_ao_val": -1.0, "asin_zr_counts": -1, "asin_sp_counts": -1, "asin_sb_counts": -1,
             "asin_bsr_orders": -1, "asin_price": -1.0, "bsr_orders_sale": -1})
        df_asin_info_agg = self.df_asin_info.groupby(['asin']).agg(F.sort_array(
            F.collect_list(F.struct("cur_time", "asin_ao_val", "asin_zr_counts", "asin_sp_counts", "asin_sb_counts",
                                    "asin_price", "asin_bsr_orders","bsr_orders_sale"))).alias("values"))
        df_asin_info_agg = df_asin_info_agg.\
            select("asin", F.expr("transform(values, x -> x.cur_time)").alias("times_array"),
                   F.expr("transform(values, x -> x.asin_ao_val)").alias("ao_array"),
                   F.expr("transform(values, x -> x.asin_zr_counts)").alias("zr_count_array"),
                   F.expr("transform(values, x -> x.asin_sp_counts)").alias("sp_count_array"),
                   F.expr("transform(values, x -> x.asin_sb_counts)").alias("sb_count_array"),
                   F.expr("transform(values, x -> x.asin_price)").alias("price_array"),
                   F.expr("transform(values, x -> x.asin_bsr_orders)").alias("bsr_orders_array"),
                   F.expr("transform(values, x -> x.bsr_orders_sale)").alias("bsr_orders_sale_array"))
        for column in ['times_array', 'ao_array', 'zr_count_array',
                       'sp_count_array', 'sb_count_array', 'price_array', 'bsr_orders_array', 'bsr_orders_sale_array']:
            new_column = column.replace("array", "list")
            df_asin_info_agg = df_asin_info_agg.withColumn(new_column, self.u_change_data_type(df_asin_info_agg[column]))
            df_asin_info_agg = df_asin_info_agg.drop(column).persist(StorageLevel.DISK_ONLY)
        df_asin_info_agg.show(10, truncate=False)
        self.df_save = df_asin_info_agg.withColumn("orders_list", F.lit(None))

    def handle_data_group(self):
        self.df_save = self.df_save.withColumn("asin_type", self.u_get_zr_asin_type(self.df_save.asin))
        self.df_save = self.df_save.withColumn("created_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
            withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))

    def handle_data(self):
        self.get_asin_info_list()
        self.handle_data_group()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DwtZrAsinInfo(site_name=site_name)
    handle_obj.run()