"""
   @Author      : wangrui
   @Description : 流量选品
   @SourceTable :
                  dwd_asin_measure
                  dim_asin_detail
                  ods_bsr_end
                  dim_asin_bs_category
                  dim_fd_asin_info
                  dim_asin_volume


   @SinkTable   : dwt_flow_asin
   @CreateTime  : 2023/01/10 07:55
   @UpdateTime  : 2023/01/10 07:55
"""

import os
import sys
import re
from functools import reduce

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import datetime
from pyspark.sql.types import StringType, IntegerType, DoubleType, MapType
# from utils import common_udf
# common_udf.udf_get_package_quantity(title='xxpiec')
# quit()

from utils.db_util import DBUtil
from utils.spark_util import SparkUtil
from sqlalchemy import create_engine
import pandas as pd


class DwtFlowAsin(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dwt_flow_asin"
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.previous_date = self.udf_get_previous_last_30_day(self)
        self.current_date = self.udf_get_current_time(self)
        self.get_date_info_tuple()
        self.get_year_month_days_dict(year=int(self.year))
        self.orders_transform_rate = self.get_orders_transform_rate()
        # 写入、分区初始化
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        if self.date_type in ["week"]:
            self.reset_partitions(80)
        elif self.date_type in ["month", "last30day", '4_week']:
            self.reset_partitions(140)
        elif self.date_type in ["quarter"]:
            self.reset_partitions(200)
        # 初始化全局df
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_asin_measure = self.spark.sql(f"select 1+1;")
        self.df_bsr_end = self.spark.sql(f"select 1+1;")
        self.df_one_category_report = self.spark.sql(f"select 1+1;")
        self.df_fd_asin_info = self.spark.sql(f"select 1+1;")
        self.df_asin_variat = self.spark.sql(f"select 1+1;")
        self.df_flow_asin_last = self.spark.sql(f"select 1+1;")
        self.df_asin_volume = self.spark.sql(f"select 1+1;")
        self.df_asin_counts = self.spark.sql(f"select 1+1;")
        self.df_st_asin_info = self.spark.sql(f"select 1+1;")
        self.df_bs_report = self.spark.sql(f"select 1+1;")
        self.u_judge_us_size_type = self.spark.udf.register('u_judge_us_size_type', self.udf_get_us_size_type,
                                                            IntegerType())
        self.u_judge_other_size_type = self.spark.udf.register('u_judge_other_size_type', self.udf_get_other_size_type,
                                                               IntegerType())
        self.u_judge_rating_type = self.spark.udf.register('u_judge_rating_type', self.udf_get_rating_type,
                                                           IntegerType())
        self.u_judge_site_name_type = self.spark.udf.register('u_judge_site_name_type', self.udf_get_site_name_type,
                                                              IntegerType())
        self.u_judge_weight_type = self.spark.udf.register('u_judge_weight_type', self.udf_get_weight_type,
                                                           IntegerType())
        self.u_judge_quantity_variartion_type = self.spark.udf.register('u_judge_quantity_variation_type',
                                                                        self.udf_get_quantity_variation_type,
                                                                        IntegerType())
        self.u_judget_bsr_type = self.spark.udf.register('u_judge_bsr_type', self.udf_get_bsr_type, IntegerType())
        self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType())
        self.u_judge_package_quantity = self.spark.udf.register('u_judge_package_quantity',
                                                                common_udf.udf_get_package_quantity,
                                                                IntegerType())
        self.u_judge_sp_type = self.spark.udf.register('u_judge_sp_type',
                                                       self.udf_get_sp_type,
                                                       MapType(StringType(), IntegerType(), False))
        self.u_judge_brand_type = self.spark.udf.register('u_judge_brand_type', self.udf_get_brand_type, IntegerType())

    def get_orders_transform_rate(self):
        month_days = self.year_month_days_dict[int(self.month)]
        if self.date_type in ['day', 'week']:
            if self.date_type == 'day':
                return 1 / month_days
            if self.date_type == 'week':
                return 7 / month_days
        else:
            return 1

    @staticmethod
    def udf_year_month(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30;")
        df = self.df_date.toPandas()
        if self.date_type == 'day' or self.date_type == 'last30day':
            df_today = df.loc[df.date == f'{self.date_info}']
            year_month = list(df_today.year_month)[0]
            print(year_month)
            return year_month
        elif self.date_type == 'month':
            year_month = self.date_info
            return year_month

    @staticmethod
    def udf_year_week(dt):
        year, week = dt.split("-")[0], dt.split("-")[1]
        if int(week) < 10:
            return f"{year}-0{week}"
        else:
            return f"{year}-{week}"

    @staticmethod
    def udf_get_previous_last_30_day(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = self.df_date.toPandas()
        if self.date_type == 'last30day':
            df_loc = df.loc[df.date == f'{self.date_info}']
            current_date_id = list(df_loc.id)[0]
            original_date_id = int(current_date_id) - 30
            df_loc = df.loc[df.id == original_date_id]
            original_year_month = list(df_loc.year_month)[0]
            df_loc = df.loc[(df.year_month == original_year_month) & (df.day == 1)]
            original_year_month_id = list(df_loc.id)[0]
            previous_year_month_id = int(original_year_month_id) - 1
            df_loc = df.loc[df.id == previous_year_month_id]
            previous_date = list(df_loc.year_month)[0]
            return previous_date
        elif self.date_type == 'month':
            df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 1)]
            current_month_id = list(df_loc.id)[0]
            previous_month_id = int(current_month_id) - 1
            df_loc = df.loc[df.id == previous_month_id]
            previous_date = list(df_loc.year_month)[0]
            return previous_date
        elif self.date_type == '4_week':
            df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 1)]
            current_4_week_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == int(current_4_week_id) - 21]
            current_4_week_month = list(df_loc.year_month)[0]
            df_loc = df.loc[(df.year_month == current_4_week_month) & (df.day == 1)]
            current_4_week_month_id = list(df_loc.id)[0]
            previous_4_week_month_id = int(current_4_week_month_id) - 1
            df_loc = df.loc[df.id == previous_4_week_month_id]
            previous_date = list(df_loc.year_month)[0]
            return previous_date

    @staticmethod
    def udf_get_current_time(self):
        self.df_date = self.spark.sql(f"select * from dim_date_20_to_30 ;")
        df = self.df_date.toPandas()
        if self.date_type == 'month':
            df_loc = df.loc[(df.year_month == f'{self.date_info}') & (df.day == 25)]
            current_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == current_id]
            current_date = list(df_loc.date)[0]
            return current_date
        elif self.date_type in ['week', '4_week']:
            df_loc = df.loc[(df.year_week == f'{self.date_info}') & (df.week_day == 7)]
            current_id = list(df_loc.id)[0]
            df_loc = df.loc[df.id == current_id]
            current_date = list(df_loc.date)[0]
            return current_date
        elif self.date_type in ['last30day', 'day']:
            return self.date_info

    @staticmethod
    def udf_get_us_size_type(asin_weight, asin_length, asin_width, asin_high, asin_element_unit):
        if (asin_weight is not None) and (asin_length is not None) and (asin_width is not None) and (
                asin_high is not None):
            if asin_element_unit == 'cm':
                asin_length = asin_length * 0.39
                asin_width = asin_width * 0.39
                asin_high = asin_high * 0.39
            asin_girths = asin_length + (asin_width + asin_high) * 2
            if (asin_weight > 0 and asin_weight * 16 <= 16) and (asin_length > 0 and asin_length <= 15) and (
                    asin_width > 0 and asin_width <= 12) and (asin_high > 0 and asin_high <= 0.75):
                return 1
            elif (asin_weight * 16 > 16 and asin_weight <= 20) and (asin_length > 15 and asin_length <= 18) and (
                    asin_width > 12 and asin_width <= 14) and (asin_high > 0.75 and asin_high <= 8):
                return 2
            elif (asin_weight > 20 and asin_weight <= 70) and (asin_length > 18 and asin_length <= 60) and (
                    asin_width > 14 and asin_width <= 30) and (asin_girths <= 130):
                return 3
            elif (asin_weight > 70 and asin_weight <= 150) and (asin_length > 60 and asin_length <= 108) and (
                    asin_girths <= 130):
                return 4
            elif (asin_weight > 70 and asin_weight <= 150) and (asin_length > 60 and asin_length <= 108) and (
                    asin_girths > 130) and (asin_girths <= 165):
                return 5
            elif (asin_weight > 150) and (asin_length > 108) and (asin_girths > 165):
                return 6
            else:
                return 0
        else:
            return 0

    @staticmethod
    def udf_get_other_size_type(asin_weight, asin_length, asin_width, asin_high, asin_element_unit):
        if (asin_weight is not None) and (asin_length is not None) and (asin_width is not None) and (
                asin_high is not None):
            if asin_element_unit == 'inches':
                asin_length = asin_length * 2.54
                asin_width = asin_width * 2.54
                asin_high = asin_high * 2.54
            asin_girths = asin_length + (asin_width + asin_high) * 2
            if (asin_weight > 0 and asin_weight <= 100) and (asin_length > 0 and asin_length <= 20) and (
                    asin_width > 0 and asin_width <= 15) and (asin_high > 0 and asin_high <= 1):
                return 1
            elif (asin_weight > 100 and asin_weight <= 500) and (asin_length > 20 and asin_length <= 33) and (
                    asin_width > 15 and asin_width <= 23) and (asin_high > 1 and asin_high <= 2.5):
                return 2
            elif (asin_weight > 500 and asin_weight <= 1000) and (asin_length > 20 and asin_length <= 33) and (
                    asin_width > 15 and asin_width <= 23) and (asin_high > 2.5 and asin_high <= 5):
                return 3
            elif (asin_weight > 1000 and asin_weight <= 12000) and (asin_length > 33 and asin_length <= 45) and (
                    asin_width > 23 and asin_width <= 34) and (asin_high > 5 and asin_high <= 26):
                return 4
            elif (asin_weight > 1000 and asin_weight <= 2000) and (asin_length > 45 and asin_length <= 61) and (
                    asin_width > 34 and asin_width <= 46) and (asin_high > 26 and asin_high <= 46):
                return 5
            elif (asin_length > 0 and asin_length <= 150) and (asin_girths > 0 and asin_girths <= 300):
                return 6
            elif (asin_length > 150) and (asin_girths > 300):
                return 7
            else:
                return 0
        else:
            return 0

    @staticmethod
    def udf_get_rating_type(asin_rating):
        if asin_rating is not None:
            if asin_rating >= 4.5:
                return 1
            elif (asin_rating >= 4 and asin_rating < 4.5):
                return 2
            elif (asin_rating >= 3.5 and asin_rating < 4):
                return 3
            elif (asin_rating >= 3 and asin_rating < 3.5):
                return 4
            elif (asin_rating < 3):
                return 5
            else:
                return 0
        else:
            return 0

    @staticmethod
    def udf_get_quantity_variation_type(size):
        size = str(size).lower()
        if size not in ['null', 'none']:
            sign_word = 'quantity'
            if sign_word in size:
                return 1
            else:
                return 0
        else:
            return 0

    @staticmethod
    def udf_get_site_name_type(asin_buy_box_seller_type, seller_country_name):
        if str(seller_country_name).lower() not in ['none', 'null']:
            if asin_buy_box_seller_type == 1:
                return 4
            elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('US') != -1:
                return 1
            elif asin_buy_box_seller_type != 1 and str(seller_country_name).upper().find('CN') != -1:
                return 2
            else:
                return 3
        else:
            return 0

    @staticmethod
    def udf_get_weight_type(asin_weight):
        if asin_weight is not None:
            if (asin_weight > 0 and asin_weight < 0.2):
                return 1
            if (asin_weight >= 0.2 and asin_weight < 0.4):
                return 2
            if (asin_weight >= 0.4 and asin_weight < 0.6):
                return 3
            if (asin_weight >= 0.6 and asin_weight < 1):
                return 4
            if (asin_weight >= 1 and asin_weight < 2):
                return 5
            if (asin_weight >= 2):
                return 6
            else:
                return 0
        return 0

    @staticmethod
    def udf_get_bsr_type(asin_rank, limit_rank):
        if asin_rank is not None:
            if asin_rank <= limit_rank:
                return 1
            else:
                return 0
        else:
            return 0

    # @staticmethod
    # def udf_get_package_quantity(title):
    #     if title != '':
    #         title = str(title).lower()
    #         eligible_list = []
    #         if title not in ['null', 'none']:
    #             pattern1 = r'\b[^\d\s]*(\d+)[^\d\s]* (pcs|piece|pieces|set|pack|packs|pairs|pk)\b'
    #             pattern2 = r'\b(set|pack) of [^\d\s]*(\d+)[^\d\s]*\b'
    #             pattern3 = r'\b[^\d\s]*(\d+)[^\d\s]*-(pack|piece)\b'
    #             pattern4 = r'\b[^\d\s]*(\d+)pcs\b'
    #             title_list1 = re.findall(pattern1, title)
    #             title_list2 = re.findall(pattern2, title)
    #             title_list3 = re.findall(pattern3, title)
    #             title_list4 = re.findall(pattern4, title)
    #             if len(title_list1) > 0:
    #                 for title_element in title_list1:
    #                     eligible_element = title_element[0]
    #                     if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 100000) and (
    #                             int(eligible_element) >= 0):
    #                         eligible_list.append(int(eligible_element))
    #             if len(title_list2) > 0:
    #                 for title_element in title_list2:
    #                     eligible_element = title_element[1]
    #                     if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 100000) and (
    #                             int(eligible_element) >= 0):
    #                         eligible_list.append(int(eligible_element))
    #             if len(title_list3) > 0:
    #                 for title_element in title_list3:
    #                     eligible_element = title_element[0]
    #                     if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 100000) and (
    #                             int(eligible_element) >= 0):
    #                         eligible_list.append(int(eligible_element))
    #             if len(title_list4) > 0:
    #                 for title_element in title_list4:
    #                     if (not str(title_element).startswith('0')) and (int(title_element) < 100000) and (
    #                             int(title_element) >= 0):
    #                         eligible_list.append(int(title_element))
    #             if len(eligible_list) > 0:
    #                 max_eligible_element = max(eligible_list)
    #                 min_eligible_element = min(eligible_list)
    #                 if max_eligible_element - min_eligible_element >= 1000:
    #                     return min_eligible_element
    #                 else:
    #                     return max_eligible_element

    @staticmethod
    def udf_get_sp_type(sp_num):
        if sp_num is not None:
            sp_type_num_list = str(sp_num).split(",")
            if len(sp_type_num_list) == 3:
                return {"sp_type1": int(sp_type_num_list[0]), "sp_type2": int(sp_type_num_list[1]),
                        "sp_type3": int(sp_type_num_list[2])}
            elif len(sp_type_num_list) == 1:
                return {"sp_type1": int(sp_type_num_list[0]), "sp_type2": 0, "sp_type3": 0}

    @staticmethod
    def udf_get_brand_type(asin_brand_name):
        if asin_brand_name is not None:
            asin_brand_name = str(asin_brand_name).lower()
            if asin_brand_name not in ['null', 'none']:
                return 1

    def read_data(self):
        # 1.获取dwd_asin_measure,得到各种类型的统计、asin的ao_val、asin的预估销量跟bsr销量
        if self.site_name == 'us':
            sql = f"select " \
                  f"asin, " \
                  f"asin_zr_counts, " \
                  f"asin_sp_counts, " \
                  f"asin_sb_counts, " \
                  f"asin_sb1_counts, " \
                  f"asin_ac_counts, " \
                  f"asin_bs_counts, " \
                  f"asin_er_counts, " \
                  f"asin_tr_counts, " \
                  f"asin_ao_val, " \
                  f"asin_zr_orders_sum, " \
                  f"asin_bsr_orders " \
                  f"from dwd_asin_measure " \
                  f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'"
            print("sql:" + sql)
            self.df_asin_measure = self.spark.sql(sqlQuery=sql).cache()
            self.df_asin_measure = self.df_asin_measure.withColumnRenamed("asin_sb1_counts", "asin_vi_counts")
            self.df_asin_measure = self.df_asin_measure.withColumnRenamed("asin_zr_orders_sum", "orders")
            self.df_asin_measure = self.df_asin_measure.withColumnRenamed("asin_bsr_orders", "bsr_orders")
        else:
            sql = f"select " \
                  f"asin, " \
                  f"asin_zr_counts, " \
                  f"asin_sp_counts, " \
                  f"asin_sb_counts, " \
                  f"asin_sb1_counts, " \
                  f"asin_ac_counts, " \
                  f"asin_bs_counts, " \
                  f"asin_er_counts, " \
                  f"asin_tr_counts, " \
                  f"asin_ao_val " \
                  f"from dwd_asin_counts " \
                  f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'"
            print("sql:" + sql)
            self.df_asin_counts = self.spark.sql(sqlQuery=sql).cache()
            self.df_asin_counts = self.df_asin_counts.withColumnRenamed("asin_sb1_counts", "asin_vi_counts")
            sql = f"select " \
                  f"asin, " \
                  f"asin_st_zr_orders_sum as orders " \
                  f"from dwd_st_asin_info " \
                  f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'"
            print("sql:" + sql)
            self.df_st_asin_info = self.spark.sql(sqlQuery=sql).cache()
            self.df_st_asin_info = self.df_st_asin_info.drop_duplicates(['asin'])
            if int(self.year) == 2022 and int(self.month) < 3:
                sql = f"select " \
                      f"cate_1_id, " \
                      f"rank as asin_rank, " \
                      f"ceil(orders*{self.orders_transform_rate}) as bsr_orders " \
                      f"from ods_one_category_report " \
                      f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';"
            else:
                sql = f"select " \
                      f"cate_1_id, " \
                      f"rank as asin_rank, " \
                      f"ceil(orders*{self.orders_transform_rate}) as bsr_orders " \
                      f"from ods_one_category_report " \
                      f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';"
            print("orders_transform_rate:" + str(self.orders_transform_rate))
            print("sql:" + sql)
            self.df_bs_report = self.spark.sql(sqlQuery=sql).cache()

        # 2.获取dim_asin_detail,得到asin详情
        sql = f"select " \
              f"asin, " \
              f"bsr_cate_1_id, " \
              f"bsr_cate_current_id, " \
              f"asin_img_url, " \
              f"asin_title, " \
              f"asin_title_len, " \
              f"asin_price, " \
              f"asin_rating, " \
              f"asin_total_comments, " \
              f"asin_buy_box_seller_type, " \
              f"asin_page_inventory, " \
              f"asin_category_desc, " \
              f"asin_volume, " \
              f"asin_weight, " \
              f"asin_color, " \
              f"asin_size, " \
              f"asin_style, " \
              f"asin_is_sale, " \
              f"asin_rank, " \
              f"if(((lower(asin_launch_time) == 'none') or (lower(asin_launch_time) == 'null')), '1970-01-01 00:00:00', asin_launch_time) as asin_launch_time, " \
              f"asin_is_new, " \
              f"asin_img_num, " \
              f"asin_img_type, " \
              f"asin_material, " \
              f"asin_brand_name, " \
              f"asin_activity_type, " \
              f"act_one_two_val, " \
              f"act_three_four_val, " \
              f"act_five_six_val, " \
              f"act_eight_val, " \
              f"qa_num, " \
              f"one_star," \
              f"two_star, " \
              f"three_star, " \
              f"four_star, " \
              f"five_star, " \
              f"low_star, " \
              f"together_asin, " \
              f"ac_name, " \
              f"parent_asin," \
              f"sp_num," \
              f"asin_label_type as is_movie_label " \
              f"from dim_asin_detail " \
              f"where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' " \
              f"and asin not in " \
              f"(select asin from ods_self_asin " \
              f"where site_name = '{self.site_name}' )"
        print("sql:" + sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_detail = self.df_asin_detail.withColumnRenamed("bsr_cate_1_id", "cate_1_id")
        self.df_asin_detail = self.df_asin_detail.withColumnRenamed("bsr_cate_current_id", "cate_current_id")
        self.df_asin_detail = self.df_asin_detail.withColumnRenamed("asin_label_type", "label_type")
        # self.df_asin_bs_category.show(10, truncate=False)
        # 3.获取ods_bsr_end,获取有效rank信息
        sql = f"select rank,bsr_name " \
              f"from ods_bsr_end " \
              f"where site_name = '{self.site_name}'"
        print("sql:" + sql)
        self.df_bsr_end = self.spark.sql(sqlQuery=sql).cache()
        self.df_bsr_end = self.df_bsr_end.withColumnRenamed("bsr_name", "name")
        self.df_bsr_end = self.df_bsr_end.withColumnRenamed("rank", "limit_rank")
        # self.df_bsr_end.show(10, truncate=False)
        # 4.获取dim_asin_bs_category,获取分类名称
        sql = f"select  asin_bs_cate_1_id as cate_1_id, asin_bs_cate_1_en_name as name, " \
              f" date_info as dt from dim_asin_bs_category " \
              f" where site_name='{self.site_name}';"
        print("sql:" + sql)
        self.df_one_category_report = self.spark.sql(sqlQuery=sql)
        # 5.获取dim_fd_asin_info,得到卖家相关信息
        sql = f"select " \
              f"fd_account_id, " \
              f"fd_account_name, " \
              f"fd_country_name, " \
              f"asin " \
              f" from dim_fd_asin_info " \
              f" where site_name = '{self.site_name}'"
        print("sql:" + sql)
        self.df_fd_asin_info = self.spark.sql(sqlQuery=sql).cache()
        self.df_fd_asin_info = self.df_fd_asin_info.drop_duplicates(['asin'])
        self.df_fd_asin_info = self.df_fd_asin_info.withColumnRenamed("fd_account_id", "account_id")
        self.df_fd_asin_info = self.df_fd_asin_info.withColumnRenamed("fd_account_name", "account_name")
        self.df_fd_asin_info = self.df_fd_asin_info.withColumnRenamed("fd_country_name", "seller_country_name")
        self.df_fd_asin_info = self.df_fd_asin_info.na.fill({"seller_country_name": 'none'})

        # 6.获取ods_asin_variat,得到变体信息
        sql = f"select " \
              f"asin, " \
              f"parent_asin, " \
              f"size, " \
              f"updated_time " \
              f" from ods_asin_variat " \
              f" where site_name = '{self.site_name}'"
        print("sql:" + sql)
        self.df_asin_variat = self.spark.sql(sqlQuery=sql)
        self.df_asin_variat = self.df_asin_variat.orderBy(self.df_asin_variat.updated_time.desc_nulls_last())
        self.df_asin_variat = self.df_asin_variat.drop_duplicates(["asin"])
        self.df_asin_variat = self.df_asin_variat.drop("updated_time")
        self.df_asin_variat = self.df_asin_variat.na.fill({"size": "none"})

        # 7.获取上一个最近30天的整合结果
        sql = f"select " \
              f"asin, " \
              f"asin_rank as previous_asin_rank, " \
              f"asin_ao_val as previous_asin_ao_val, " \
              f"asin_price as previous_asin_price, " \
              f"orders as previous_orders, " \
              f"bsr_orders as previous_bsr_orders, " \
              f"asin_rating as previous_asin_rating, " \
              f"asin_total_comments as previous_asin_total_comments, " \
              f"sales as previous_sales, " \
              f"variation_num as previous_variation_num " \
              f"from dwt_flow_asin " \
              f"where site_name='{self.site_name}' and date_type = 'month' and date_info = '{self.previous_date}'"
        print("sql:" + sql)
        self.df_flow_asin_last = self.spark.sql(sqlQuery=sql)

        # 8.获取dwd_asin_volume,得到体系信息
        sql = f"select " \
              f"asin, " \
              f"asin_length, " \
              f"asin_width, " \
              f"asin_high," \
              f"asin_element_unit " \
              f"from dim_asin_volume where site_name = '{self.site_name}'"
        print("sql:" + sql)
        self.df_asin_volume = self.spark.sql(sqlQuery=sql)
        self.df_asin_volume = self.df_asin_volume.na.fill({"asin_length": 0.0, "asin_width": 0.0, "asin_high": 0.0})

    def get_asin_detail(self):
        if self.site_name == 'us':
            self.df_asin_detail = self.df_asin_detail.join(
                self.df_asin_measure, on=['asin'], how='left'
            )
        else:
            self.df_asin_detail = self.df_asin_detail.join(
                self.df_asin_counts, on=['asin'], how='left'
            ).join(
                self.df_st_asin_info, on=['asin'], how='left'
            )
            self.df_asin_detail = self.df_asin_detail.join(
                self.df_bs_report, on=['asin_rank', 'cate_1_id'], how='left'
            )
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_fd_asin_info, on=['asin'], how='left'
        ).join(
            self.df_asin_volume, on=['asin'], how='left'
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("sales", F.col("bsr_orders") * F.col("asin_price"))
        self.df_asin_detail = self.df_asin_detail.withColumn("is_brand_label", self.u_judge_brand_type(
            self.df_asin_detail.asin_brand_name))

    def get_package_quantity(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("package_quantity", self.u_judge_package_quantity(
            self.df_asin_detail.asin_title))
        quit()

    def get_volume_detail(self):
        if self.site_name == 'us':
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", self.u_judge_us_size_type(
                self.df_asin_detail.asin_weight,
                self.df_asin_detail.asin_length,
                self.df_asin_detail.asin_width,
                self.df_asin_detail.asin_high,
                self.df_asin_detail.asin_element_unit))
        else:
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type", self.u_judge_other_size_type(
                self.df_asin_detail.asin_weight,
                self.df_asin_detail.asin_length,
                self.df_asin_detail.asin_width,
                self.df_asin_detail.asin_high,
                self.df_asin_detail.asin_element_unit))

        # else:
        #    self.df_asin_detail = self.df_asin_detail.withColumn("asin_size_type",
        #                                                         self.u_judge_size_type(self.df_asin_detail.asin_weight,
        #                                                                               self.df_asin_detail.asin_length,
        #                                                                                self.df_asin_detail.asin_width,
        #                                                                                self.df_asin_detail.asin_high))
        self.df_asin_detail = self.df_asin_detail.drop("asin_length")
        self.df_asin_detail = self.df_asin_detail.drop("asin_width")
        self.df_asin_detail = self.df_asin_detail.drop("asin_high")
        self.df_asin_detail = self.df_asin_detail.drop("asin_element_unit")

    def get_rating_type(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_rating_type", self.u_judge_rating_type(
            self.df_asin_detail.asin_rating))

    def get_site_name_type(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_site_name_type",
                                                             self.u_judge_site_name_type(
                                                                 self.df_asin_detail.asin_buy_box_seller_type,
                                                                 self.df_asin_detail.seller_country_name))

    def get_weight_type(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_weight_type",
                                                             self.u_judge_weight_type(self.df_asin_detail.asin_weight))

    def get_launch_time_type(self):
        time = datetime.datetime.strptime(self.current_date, '%Y-%m-%d')
        one_month_time = time + datetime.timedelta(days=-30)
        three_months_time = time + datetime.timedelta(days=-90)
        six_months_time = time + datetime.timedelta(days=-180)
        twelve_months_time = time + datetime.timedelta(days=-360)
        twenty_four_months_time = time + datetime.timedelta(days=-720)
        thirty_six_months_time = time + datetime.timedelta(days=-1080)
        self.df_asin_detail = self.df_asin_detail.withColumn('asin_launch_time_type', F.when(
            F.col("asin_launch_time") >= one_month_time, F.lit(1)). \
                                                             when(
            (F.col("asin_launch_time") >= three_months_time) & (F.col("asin_launch_time") < one_month_time),
            F.lit(2)). \
                                                             when(
            (F.col("asin_launch_time") >= six_months_time) & (F.col("asin_launch_time") < three_months_time),
            F.lit(3)). \
                                                             when(
            (F.col("asin_launch_time") >= twelve_months_time) & (F.col("asin_launch_time") < six_months_time),
            F.lit(4)). \
                                                             when(
            (F.col("asin_launch_time") >= twenty_four_months_time) & (F.col("asin_launch_time") < twelve_months_time),
            F.lit(5)). \
                                                             when(
            (F.col("asin_launch_time") >= thirty_six_months_time) & (
                    F.col("asin_launch_time") < twenty_four_months_time),
            F.lit(6)). \
                                                             when(
            (F.col("asin_launch_time") < thirty_six_months_time),
            F.lit(7)).otherwise(F.lit(0)))

    def get_ao_val_type(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_ao_val_type",
                                                             F.when(F.col("asin_ao_val").isNotNull(), F.when(
                                                                 (F.col("asin_ao_val") > 0) & (
                                                                         F.col("asin_ao_val") < 0.1),
                                                                 F.lit(1)).when(
                                                                 (F.col("asin_ao_val") >= 0.1) & (
                                                                         F.col("asin_ao_val") < 0.2),
                                                                 F.lit(2)).when(
                                                                 (F.col("asin_ao_val") >= 0.2) & (
                                                                         F.col("asin_ao_val") < 0.4),
                                                                 F.lit(3)).when(
                                                                 (F.col("asin_ao_val") >= 0.4) & (
                                                                         F.col("asin_ao_val") < 0.8),
                                                                 F.lit(4)).when(
                                                                 (F.col("asin_ao_val") >= 0.8) & (
                                                                         F.col("asin_ao_val") < 1.2),
                                                                 F.lit(5)).when(
                                                                 (F.col("asin_ao_val") >= 1.2) & (
                                                                         F.col("asin_ao_val") < 2), F.lit(6)).when(
                                                                 (F.col("asin_ao_val") >= 2), F.lit(7)).otherwise(
                                                                 F.lit(0))).otherwise(
                                                                 F.lit(0)))

    def get_rank_type(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_rank_type",
                                                             F.when(F.col("asin_rank").isNotNull(), F.when(
                                                                 (F.col("asin_rank") > 0) & (F.col("asin_rank") < 1000),
                                                                 F.lit(1)).when(
                                                                 (F.col("asin_rank") >= 1000) & (
                                                                         F.col("asin_rank") < 5000), F.lit(2)).when(
                                                                 (F.col("asin_rank") >= 5000) & (
                                                                         F.col("asin_rank") < 10000),
                                                                 F.lit(3)).when(
                                                                 (F.col("asin_rank") >= 10000) & (
                                                                         F.col("asin_rank") < 20000),
                                                                 F.lit(4)).when(
                                                                 (F.col("asin_rank") >= 20000) & (
                                                                         F.col("asin_rank") < 30000),
                                                                 F.lit(5)).when(
                                                                 (F.col("asin_rank") >= 30000) & (
                                                                         F.col("asin_rank") < 50000),
                                                                 F.lit(6)).when(
                                                                 (F.col("asin_rank") >= 50000) & (
                                                                         F.col("asin_rank") < 70000),
                                                                 F.lit(7)).when(
                                                                 (F.col("asin_rank") >= 70000), F.lit(8)).otherwise(
                                                                 F.lit(0))).otherwise(
                                                                 F.lit(0)))

    def get_price_type(self):
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_price_type",
                                                             F.when(F.col("asin_price").isNotNull(), F.when(
                                                                 (F.col("asin_price") > 0) & (F.col("asin_price") < 10),
                                                                 F.lit(1)).when(
                                                                 (F.col("asin_price") >= 10) & (
                                                                         F.col("asin_price") < 15), F.lit(2)).when(
                                                                 (F.col("asin_price") >= 15) & (
                                                                         F.col("asin_price") < 20), F.lit(3)).when(
                                                                 (F.col("asin_price") >= 20) & (
                                                                         F.col("asin_price") < 30), F.lit(4)).when(
                                                                 (F.col("asin_price") >= 30) & (
                                                                         F.col("asin_price") < 50), F.lit(5)).when(
                                                                 (F.col("asin_price") >= 50), F.lit(6)).otherwise(
                                                                 F.lit(0))).otherwise(
                                                                 F.lit(0)))

    def get_asin_index_change(self):
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_flow_asin_last, on=['asin'], how='left'
        )
        # 1.排名变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_rank_rise",
                                                             F.col("asin_rank") - F.col("previous_asin_rank"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_rank_change",
                                                             F.when((F.col("previous_asin_rank") != 0) & (
                                                                 F.col("previous_asin_rank").isNotNull()),
                                                                    (F.col("asin_rank") - F.col(
                                                                        "previous_asin_rank")) / F.col(
                                                                        "previous_asin_rank")).otherwise(F.lit(None)))
        # 2.AO值变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_ao_rise",
                                                             F.col("asin_ao_val") - F.col("previous_asin_ao_val"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_ao_change",
                                                             F.when((F.col("previous_asin_ao_val") != 0.0) & (
                                                                 F.col("previous_asin_ao_val").isNotNull()),
                                                                    (F.col("asin_ao_val") - F.col(
                                                                        "previous_asin_ao_val")) / F.col(
                                                                        "previous_asin_ao_val")).otherwise(F.lit(None)))
        # 3.价格变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_price_rise",
                                                             F.col("asin_price") - F.col("previous_asin_price"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_price_change",
                                                             F.when((F.col("previous_asin_price") != 0.0) & (
                                                                 F.col("previous_asin_price").isNotNull()),
                                                                    (F.col("asin_price") - F.col(
                                                                        "previous_asin_price")) / F.col(
                                                                        "previous_asin_price")).otherwise(F.lit(None)))
        # 4.预估销量变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_orders_rise",
                                                             F.col("orders") - F.col("previous_orders"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_orders_change",
                                                             F.when((F.col("previous_orders") != 0) & (
                                                                 F.col("previous_orders").isNotNull()),
                                                                    (F.col("orders") - F.col(
                                                                        "previous_orders")) / F.col(
                                                                        "previous_orders")).otherwise(F.lit(None)))
        # 5.评分变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_rating_rise",
                                                             F.col("asin_rating") - F.col("previous_asin_rating"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_rating_change",
                                                             F.when((F.col("previous_asin_rating") != 0.0) & (
                                                                 F.col("previous_asin_rating").isNotNull()),
                                                                    (F.col("asin_rating") - F.col(
                                                                        "previous_asin_rating")) / F.col(
                                                                        "previous_asin_rating")).otherwise(F.lit(None)))
        # 6.评论数变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_comments_rise",
                                                             F.col("asin_total_comments") - F.col(
                                                                 "previous_asin_total_comments"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_comments_change",
                                                             F.when((F.col("previous_asin_total_comments") != 0) & (
                                                                 F.col("previous_asin_total_comments").isNotNull()),
                                                                    (F.col("asin_total_comments") - F.col(
                                                                        "previous_asin_total_comments")) / F.col(
                                                                        "previous_asin_total_comments")).otherwise(
                                                                 F.lit(None)))
        # 7.bsr销量变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_bsr_orders_rise",
                                                             F.col("bsr_orders") - F.col("previous_bsr_orders"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_bsr_orders_change",
                                                             F.when((F.col("previous_bsr_orders") != 0) & (
                                                                 F.col("previous_bsr_orders").isNotNull()),
                                                                    (F.col("bsr_orders") - F.col(
                                                                        "previous_bsr_orders")) / F.col(
                                                                        "previous_bsr_orders")).otherwise(F.lit(None)))
        # 7.销售额变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_sales_rise",
                                                             F.col("sales") - F.col("previous_sales"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_sales_change",
                                                             F.when((F.col("previous_sales") != 0) & (
                                                                 F.col("previous_sales").isNotNull()),
                                                                    (F.col("sales") - F.col(
                                                                        "previous_sales")) / F.col(
                                                                        "previous_sales")).otherwise(F.lit(None)))
        # 8.变体数量变化
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_variation_rise",
                                                             F.col("variation_num") - F.col("previous_variation_num"))
        self.df_asin_detail = self.df_asin_detail.withColumn("asin_variation_change",
                                                             F.when((F.col("previous_variation_num") != 0) & (
                                                                 F.col("previous_variation_num").isNotNull()),
                                                                    (F.col("variation_num") - F.col(
                                                                        "previous_variation_num")) / F.col(
                                                                        "previous_variation_num")).otherwise(
                                                                 F.lit(None)))
        self.df_asin_detail = self.df_asin_detail.drop("previous_asin_rank")
        self.df_asin_detail = self.df_asin_detail.drop("previous_asin_ao_val")
        self.df_asin_detail = self.df_asin_detail.drop("previous_asin_price")
        self.df_asin_detail = self.df_asin_detail.drop("previous_orders")
        self.df_asin_detail = self.df_asin_detail.drop("previous_bsr_orders")
        self.df_asin_detail = self.df_asin_detail.drop("previous_asin_rating")
        self.df_asin_detail = self.df_asin_detail.drop("previous_asin_total_comments")
        self.df_asin_detail = self.df_asin_detail.drop("previous_sales")
        self.df_asin_detail = self.df_asin_detail.drop("previous_variation_num")

    def get_asin_bsr_type(self):
        self.df_one_category_report = self.df_one_category_report.withColumn(
            "dt_sort", self.u_year_week(self.df_one_category_report.dt)
        )
        # bs窗口内排序
        window = Window.partitionBy(['cate_1_id']).orderBy(
            self.df_one_category_report.dt_sort.desc()
        )
        self.df_one_category_report = self.df_one_category_report.withColumn("asin_dt_top",
                                                                             F.row_number().over(window=window))
        # 取按asin分组的组内第一条,就是去重后的最新asin_bs_category
        self.df_one_category_report = self.df_one_category_report.filter("asin_dt_top=1")
        self.df_one_category_report = self.df_one_category_report.drop("asin_dt_top", "dt", "dt_sort")

        self.df_asin_detail = self.df_asin_detail.join(
            self.df_one_category_report, on=['cate_1_id'], how='left'
        )
        self.df_asin_detail = self.df_asin_detail.join(
            self.df_bsr_end, on=['name'], how='left'
        )
        self.df_asin_detail = self.df_asin_detail.na.fill({"limit_rank": 500000})
        self.df_asin_detail = self.df_asin_detail.withColumn("bsr_type",
                                                             self.u_judget_bsr_type(self.df_asin_detail.asin_rank,
                                                                                    self.df_asin_detail.limit_rank))
        self.df_asin_detail = self.df_asin_detail.drop("name")
        self.df_asin_detail = self.df_asin_detail.drop("limit_rank")

    def get_asin_zr_best_orders_type(self):
        df_parent_asin_group = self.df_asin_detail.filter("parent_asin is not null")
        orders_window = Window.partitionBy(["parent_asin"]).orderBy(
            df_parent_asin_group.orders.desc_nulls_last()
        )
        df_parent_asin_group = df_parent_asin_group.withColumn("orders_rank", F.row_number().over(window=orders_window))
        df_parent_asin_group = df_parent_asin_group.select("asin", "orders_rank")
        self.df_asin_detail = self.df_asin_detail.join(
            df_parent_asin_group, on='asin', how='left'
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("zr_best_orders_type",
                                                             F.when(F.col("orders_rank") == 1, F.lit(1)).otherwise(
                                                                 F.lit(0)))
        self.df_asin_detail = self.df_asin_detail.drop("orders_rank")

    def get_asin_bsr_best_orders_type(self):
        df_parent_asin_group = self.df_asin_detail.filter("parent_asin is not null")
        df_no_parent_asin_group = self.df_asin_detail.filter("parent_asin is null")
        df_no_parent_asin_group = df_no_parent_asin_group.withColumn("bsr_best_orders_type", F.lit(1))
        bsr_orders_window = Window.partitionBy(["parent_asin"]).orderBy(
            df_parent_asin_group.bsr_orders.desc_nulls_last()
        )
        df_parent_asin_group = df_parent_asin_group.withColumn("bsr_orders_rank",
                                                               F.row_number().over(window=bsr_orders_window))
        df_parent_asin_group = df_parent_asin_group.withColumn("bsr_best_orders_type",
                                                               F.when(F.col("bsr_orders_rank") == 1,
                                                                      F.lit(1)).otherwise(F.lit(0)))
        df_parent_asin_group = df_parent_asin_group.drop("bsr_orders_rank")
        self.df_asin_detail = df_parent_asin_group.unionByName(df_no_parent_asin_group)

    def get_asin_variation_number(self):
        df_variat_group = self.df_asin_variat.groupby(["parent_asin"]).agg(
            F.count("asin").alias("variation_num")
        )
        self.df_asin_detail = self.df_asin_detail.join(
            df_variat_group, on=['parent_asin'], how='left'
        )

    def get_asin_quantity_variation_type(self):
        df_asin_quantity_variation = self.df_asin_variat.withColumn("asin_quantity_variation_type",
                                                                    self.u_judge_quantity_variartion_type(
                                                                        self.df_asin_variat.size))
        df_asin_quantity_variation = df_asin_quantity_variation.drop("parent_asin")
        df_asin_quantity_variation = df_asin_quantity_variation.drop("size")
        self.df_asin_detail = self.df_asin_detail.join(
            df_asin_quantity_variation, on=['asin'], how='left'
        )

    def get_asin_sp_type_num(self):
        sp_type_num_map = self.u_judge_sp_type(self.df_asin_detail.sp_num)
        self.df_asin_detail = self.df_asin_detail.withColumn("sp_type1", sp_type_num_map["sp_type1"])
        self.df_asin_detail = self.df_asin_detail.withColumn("sp_type2", sp_type_num_map["sp_type2"])
        self.df_asin_detail = self.df_asin_detail.withColumn("sp_type3", sp_type_num_map["sp_type3"])
        self.df_asin_detail = self.df_asin_detail.drop("sp_num")

    def get_alarm_brand_type(self):
        if self.site_name == 'us':
            pg_sql = f"select brand_name as asin_brand_name, 1 as is_alarm_brand from brand_alert_erp where brand_name is not null group by brand_name"
            db_type = "postgresql"
            con_info = DBUtil.get_connection_info(db_type=db_type, site_name=self.site_name)
            if con_info is not None:
                df_alarm_brand = SparkUtil.read_jdbc_query(session=self.spark, url=con_info['url'], pwd=con_info['pwd'],
                                                           username=con_info['username'], query=pg_sql)
                self.df_asin_detail = self.df_asin_detail.join(
                    df_alarm_brand, on=['asin_brand_name'], how='left'
                )

    def handle_data_group(self):
        self.df_save = self.df_asin_detail
        self.df_save = self.df_save.withColumn("created_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
            withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0))
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save = self.df_save.na.fill(
            {"asin_zr_counts": 0, "asin_sp_counts": 0, "asin_sb_counts": 0, "asin_vi_counts": 0,
             "asin_bs_counts": 0, "asin_ac_counts": 0, "asin_tr_counts": 0, "asin_er_counts": 0,
             "asin_title_len": 0, "asin_total_comments": 0, "variation_num": 0, "asin_img_num": 0,
             "act_one_two_val": 0.0, "act_three_four_val": 0.0, "act_five_six_val": 0.0, "act_eight_val": 0.0,
             "qa_num": 0, "one_star": 0, "two_star": 0, "three_star": 0, "four_star": 0, "five_star": 0, "low_star": 0,
             "asin_size_type": 0, "asin_rating_type": 0, "asin_site_name_type": 0, "asin_weight_type": 0,
             "asin_launch_time_type": 0,
             "asin_ao_val_type": 0, "asin_rank_type": 0, "asin_price_type": 0, "asin_quantity_variation_type": 0,
             "package_quantity": 1, "is_movie_label": 0, "is_brand_label": 1, "is_alarm_brand": 0})

    def handle_data(self):
        self.get_asin_detail()
        self.get_asin_sp_type_num()
        self.get_package_quantity()
        self.get_volume_detail()
        self.get_site_name_type()
        self.get_rating_type()
        self.get_weight_type()
        self.get_launch_time_type()
        self.get_ao_val_type()
        self.get_rank_type()
        self.get_price_type()
        self.get_asin_bsr_type()
        self.get_asin_zr_best_orders_type()
        self.get_asin_variation_number()
        self.get_asin_quantity_variation_type()
        self.get_asin_bsr_best_orders_type()
        self.get_alarm_brand_type()
        self.get_asin_index_change()
        self.handle_data_group()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DwtFlowAsin(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()