"""
author: 汪瑞
description: 基于dim_asin_detail表,计算出asin维度下的产品体积相关信息
table_read_name: dim_asin_detail
table_save_name: dim_asin_volume
table_save_level: dwd
version: 3.0
created_date: 2022-05-12
updated_date: 2022-12-15
"""

import os
import re
import sys

from pyspark.storagelevel import StorageLevel

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, MapType, DoubleType


class DimAsinVolume(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dim_asin_volume"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.week_counts = 1 if self.date_type == 'week' else len(self.year_week_tuple)
        self.partitions_by = ['site_name']
        self.reset_partitions(10)
        self.u_parse_asin_volume = self.spark.udf.register('u_parse_asin_volume', self.udf_parse_asin_volume,
                                                           MapType(StringType(), DoubleType(), True))

    @staticmethod
    def udf_parse_asin_volume(asin_volume):
        asin_volume = str(asin_volume)
        pattern_first = r"\d+\.?\d*[\'\"]?[LWHD]?"
        pattern_sec = r"\d+\.?\d*"
        asin_length = ''
        asin_width = ''
        asin_high = ''
        asin_element_unit = ''
        if asin_volume.lower() not in ['null', 'none'] and "A4" not in asin_volume and (
                asin_volume.count(" x ") > 0 or asin_volume.count("*") > 0) and not (
                asin_volume.count('cm') > 0 and asin_volume.count('inch') > 0):
            element_list = re.findall(pattern_first, asin_volume)
            if len(element_list) == 3:
                is_cm = asin_volume.lower().count('cm')
                if (("D" in asin_volume or "L" in asin_volume) and "W" in asin_volume and "H" in asin_volume):
                    for element in element_list:
                        if ("D" in element or "L" in element):
                            asin_length_list = re.findall(pattern_sec, element)
                            if len(asin_length_list) > 0:
                                asin_length = asin_length_list[0]
                        elif ("W" in element):
                            asin_width_list = re.findall(pattern_sec, element)
                            if len(asin_width_list) > 0:
                                asin_width = asin_width_list[0]
                        elif ("H" in element):
                            asin_high_list = re.findall(pattern_sec, element)
                            if len(asin_high_list) > 0:
                                asin_high = asin_high_list[0]
                    if (asin_length != '' and asin_width != '' and asin_high != ''):
                        asin_length = round(float(asin_length), 2)
                        asin_width = round(float(asin_width), 2)
                        asin_high = round(float(asin_high), 2)
                        if is_cm > 0:
                            asin_element_unit = 0
                        else:
                            asin_element_unit = 1
                    else:
                        asin_length = None
                        asin_width = None
                        asin_high = None
                else:
                    asin_element1 = element_list[0].strip()
                    asin_element1_list = re.findall(pattern_sec, asin_element1)
                    if len(asin_element1_list) > 0:
                        asin_element1 = asin_element1_list[0]
                    else:
                        asin_element1 = ''
                    asin_element2 = element_list[1].strip()
                    asin_element2_list = re.findall(pattern_sec, asin_element2)
                    if len(asin_element2_list) > 0:
                        asin_element2 = asin_element2_list[0]
                    else:
                        asin_element2 = ''
                    asin_element3 = element_list[-1].strip()
                    asin_element3_list = re.findall(pattern_sec, asin_element3)
                    if len(asin_element3_list) > 0:
                        asin_element3 = asin_element3_list[0]
                    else:
                        asin_element3 = ''
                    if (asin_element1 != '' and asin_element2 != '' and asin_element3 != ''):
                        asin_element1 = round(float(asin_element1), 2)
                        asin_element2 = round(float(asin_element2), 2)
                        asin_element3 = round(float(asin_element3), 2)
                        asin_element_list = [asin_element1, asin_element2, asin_element3]
                        asin_element_list.sort()
                        asin_high = asin_element_list[0]
                        asin_width = asin_element_list[1]
                        asin_length = asin_element_list[-1]
                        if is_cm > 0:
                            asin_element_unit = 0
                        else:
                            asin_element_unit = 1
                    else:
                        asin_length = None
                        asin_width = None
                        asin_high = None
                if asin_element_unit != '':
                    asin_element_unit = round(float(asin_element_unit), 2)
            return {"asin_length": asin_length, "asin_width": asin_width, "asin_high": asin_high,
                    "asin_element_unit": asin_element_unit}
        else:
            return {"asin_length": None, "asin_width": None, "asin_high": None,
                    "asin_element_unit": None}

    def read_data(self):
        print("1.读取dim_cal_asin_history_detail表")
        sql = f"select " \
              f"asin, " \
              f"asin_volume " \
              f"from dim_cal_asin_history_detail where site_name='{self.site_name}'"
        print("sql:" + sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_detail = self.df_asin_detail.na.fill({"asin_volume": "none"})
        self.df_asin_detail.show(10, truncate=False)

    def parse_asin_volume(self):
        # self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", F.when(
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_length"].isNotNull(),
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_length"]).otherwise(F.lit(None)))
        # self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", F.when(
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_width"].isNotNull(),
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_width"]).otherwise(F.lit(None)))
        # self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", F.when(
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_high"].isNotNull(),
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_high"]).otherwise(F.lit(None)))
        # self.df_save = self.df_asin_detail.drop("asin_volume")
        try:
            asin_volume_map = self.u_parse_asin_volume(self.df_asin_detail.asin_volume)
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_length",
                                                                 asin_volume_map["asin_length"])
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_width",
                                                                 asin_volume_map["asin_width"])
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_high",
                                                                 asin_volume_map["asin_high"])
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_element_unit",
                                                                 F.when(asin_volume_map["asin_element_unit"] == 0,
                                                                        'cm').when(
                                                                     asin_volume_map["asin_element_unit"] == 1,
                                                                     'inches').otherwise(F.lit(None)))
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", F.round(F.col("asin_length"), 2))
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", F.round(F.col("asin_width"), 2))
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", F.round(F.col("asin_high"), 2))
        except Exception as e:
            print(e)
        finally:
            self.df_save = self.df_asin_detail

    def handle_data(self):
        self.parse_asin_volume()
        self.df_save.show(10, truncate=False)
        self.df_save = self.df_save.withColumn("created_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
            withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0))
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DimAsinVolume(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()