import os
import re
import sys

os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import StringType, BooleanType, StructType, StructField, DoubleType, FloatType

# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from yswg_utils.common_udf import parse_weight_str


class DimAsinStableInfo(Templates):

    def __init__(self, site_name='us'):
        super().__init__()
        self.site_name = site_name
        self.db_save = f'dim_asin_stable_info'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_theme = self.spark.sql(f"select 1+1;")
        self.df_asin_img_url = self.spark.sql(f"select 1+1;")
        self.df_asin_title = self.spark.sql(f"select 1+1;")
        self.df_asin_weight = self.spark.sql(f"select 1+1;")
        self.df_asin_weight_new = self.spark.sql(f"select 1+1;")
        self.df_asin_weight_old = self.spark.sql(f"select 1+1;")
        self.df_asin_volume = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_save_std = self.spark.sql(f"select * from ods_search_term_zr limit 0;")
        self.partitions_by = ['site_name']
        self.reset_partitions(100)
        self.window = Window.partitionBy(['asin']).orderBy(F.desc("date_info"))  # 按照 date_info 列进行分区,并按照 date 列进行排序
        # self.window = Window.partitionBy(['asin']).orderBy(F.desc("created_time"))  # 按照 date_info 列进行分区,并按照 date 列进行排序

        schema = StructType([
            StructField('weight', FloatType(), True),
            StructField('weight_type', StringType(), True),
        ])
        self.u_get_weight = F.udf(parse_weight_str, schema)
        self.weight_type = 'pounds' if site_name == 'us' else 'grams'

        self.db_save_vertical = f'dim_asin_title_info_vertical'  # 主题竖表

        # 注册自定义函数 (UDF)
        self.u_contains_digit_udf = F.udf(self.udf_contains_digit, BooleanType())
        self.u_asin_volume_type = F.udf(self.udf_asin_volume_type, StringType())
        # 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType
        schema = StructType([
            StructField('length', DoubleType(), True),
            StructField('width', DoubleType(), True),
            StructField('height', DoubleType(), True)
        ])
        self.u_extract_dimensions = F.udf(self.udf_extract_dimensions, schema)
        self.u_extract_dimensions_others = F.udf(self.udf_extract_dimensions_others, schema)
        schema = StructType([
            StructField('asin_length', StringType(), True),
            StructField('asin_width', StringType(), True),
            StructField('asin_height', StringType(), True)
        ])
        self.u_volume_sorted = F.udf(self.udf_volume_sorted, schema)

        # 注册自定义函数 (UDF)
        self.u_theme_pattern = F.udf(self.udf_theme_pattern, StringType())
        # 其他变量
        # self.pattern = str()  # 正则匹配
        self.theme_list_str = str()  # 正则匹配

    @staticmethod
    def udf_theme_pattern(title, theme_list_str):
        found_themes = [theme.strip() for theme in eval(theme_list_str) if theme in title]
        if found_themes:
            return ','.join(set(found_themes))
        else:
            return None

    # 定义一个函数,检查字符串中是否包含数字
    @staticmethod
    def udf_contains_digit(s):
        # return any(char.isdigit() for char in s)

        if s is None:
            return False
        return any(char.isdigit() for char in s)

    # 定义一个函数,将asin_volume进行分类
    @staticmethod
    def udf_asin_volume_type(x):
        # pattern = r'\b\w+\b'
        pattern = r'[a-z]+'
        matches = re.findall(pattern, x)

        # 使用集合存储匹配的单词
        type_set = set()
        for word in matches:
            if word in ['inches', 'inch']:
                type_set.add('inches')
            elif word in ['cm', 'centímetros', 'centimetres']:
                type_set.add('cm')
            elif word in ['milímetros', 'millimeter', 'mm']:
                type_set.add('mm')
            elif word in ['metros']:
                type_set.add('m')

        # 根据集合的长度返回结果
        if len(type_set) == 1:
            return list(type_set)[0]
        elif len(type_set) >= 2:
            return ','.join(type_set)
        else:
            return 'none'

    @staticmethod
    def udf_extract_dimensions(volume_str, asin_volume_type):
        length, width, height = None, None, None
        dimensions = []
        if asin_volume_type == 'cm,inches':
            num_inches = volume_str.find('inch')
            num_cm = volume_str.find('cm')
            volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
        dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
        dimensions = [float(dim[0]) for dim in dimensions]
        # if asin_volume_type == 'inches':
        #     dimensions = volume_str.split(' x ')
        #     dimensions = [dim.split()[0] for dim in dimensions]
        #     dimensions = [float(dim) if dim.replace('.', '', 1).isdigit() else None for dim in dimensions]
        # else:
        #     if asin_volume_type == 'cm,inches':
        #         # 保留inches
        #         num_inches = volume_str.find('inch')
        #         num_cm = volume_str.find('cm')
        #         volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
        #
        #     dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
        #     dimensions = [float(dim[0]) for dim in dimensions]

        if len(dimensions) == 1:
            length = dimensions[0]
        elif len(dimensions) == 2:
            if asin_volume_type == 'none':
                if "l" in volume_str and "w" in volume_str:
                    length, width = dimensions
                elif "w" in volume_str and "h" in volume_str:
                    width, height = dimensions
                elif "l" in volume_str and "h" in volume_str:
                    length, height = dimensions
                elif "d" in volume_str and "w" in volume_str:
                    length, width = dimensions
                elif "d" in volume_str and "h" in volume_str:
                    length, height = dimensions
            else:
                length, width = dimensions
        elif len(dimensions) == 3:
            length, width, height = dimensions
        elif len(dimensions) >= 4:
            length, width, height = dimensions[:3]
        return (length, width, height)

    @staticmethod
    def udf_extract_dimensions_others(volume_str, asin_volume_type):
        length, width, height = None, None, None
        if asin_volume_type == 'cm':
            dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
            dimensions = [float(dim[0]) for dim in dimensions]
            if len(dimensions) == 1:
                length = dimensions[0]
            elif len(dimensions) == 2:
                length = dimensions[0]
                width = dimensions[1]
            elif len(dimensions) >= 3:
                length, width, height = dimensions[:3]
        return (length, width, height)

    @staticmethod
    def udf_volume_sorted(asin_length, asin_weight, asin_height):
        # 如果输入数据中有None,替换为0
        dimensions = [0 if x is None else x for x in [asin_length, asin_weight, asin_height]]
        # 对数据进行排序
        dimensions.sort(reverse=True)
        return tuple(dimensions)

    def sort_by_latest(self, df):
        df = df.withColumn('row_number', F.row_number().over(self.window))  # 使用窗口函数为每个分区的行编号
        df = df.filter(df.row_number == 1).drop('row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列
        return df

    def read_data(self):
        if self.site_name == 'us':
            params = f" and (date_type='week' or (date_type='month' and date_info='2023-10') or (date_type in ('month_week', 'month') and date_info>='2023-11'))"
        else:
            params = f" and (date_type='week' or (date_type in ('month_week', 'month') and date_info>='2023-05'))"

        sql = f"select asin, img_url as asin_img_url, title as asin_title, weight, weight_str, volume as asin_volume, date_type, date_info, site_name, created_at as created_time " \
              f"from ods_asin_detail where site_name='{self.site_name}' {params}"  #  and date_info='2023-27' limit 100000
        print("sql:", sql)
        self.df_asin_detail = self.spark.sql(sql).cache()
        self.df_asin_detail.show(10, truncate=False)
        params = "" if self.site_name == 'us' else " limit 10"
        sql = f"select id as theme_id, theme_type_en, theme_en, theme_en_lower, theme_ch from ods_theme where site_name='us' {params};"
        print("sql:", sql)
        self.df_theme = self.spark.sql(sql).cache()
        self.df_theme.show(10, truncate=False)

    def handle_data(self):
        # 根据created_time时间来去重保留最新
        window = Window.partitionBy(['asin']).orderBy(F.desc("created_time"))  # 按照 date_info 列进行分区,并按照 date 列进行排序
        self.df_asin_detail = self.df_asin_detail.withColumn('row_number', F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_asin_detail = self.df_asin_detail.filter(self.df_asin_detail.row_number == 1).drop('row_number', 'created_time')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列

        self.handle_img_url()
        if self.site_name == 'usx':
            self.handle_title()
        else:
            sql = f"select asin, asin_title, asin_title_lower, crowd_counts, crowd_ids, element_counts, element_ids, festival_counts, festival_ids, sports_counts, sports_ids, style_counts, style_ids, " \
                  f"theme_counts, theme_ids, material_counts, material_ids, date_info_title from {self.db_save} limit 0"
            self.df_asin_title = self.spark.sql(sql).cache()

        self.handle_weight()
        self.handle_volume()
        self.df_save = self.df_asin_detail.select("asin", "date_info", "site_name")
        self.df_save = self.sort_by_latest(df=self.df_save)
        self.df_save = self.df_save.join(
            self.df_asin_img_url, on='asin', how='left'
        ).join(
            self.df_asin_title, on='asin', how='left'
        ).join(
            self.df_asin_weight, on='asin', how='left'
        ).join(
            self.df_asin_volume, on='asin', how='left'
        )
        # if self.site_name != 'us':
        #     # 由于其他站点没有这些主题数据
        #     self.df_save = self.df_save_std.unionByName(self.df_save, allowMissingColumns=True)
        # self.df_save = self.df_save.drop("created_time")

        print("self.df_save.columns:", self.df_save.columns)
        # self.df_save.show(10, truncate=False)

    def handle_img_url(self):
        self.df_asin_img_url = self.df_asin_detail.select("asin", "asin_img_url", "date_info").filter("asin_img_url is not null")
        self.df_asin_img_url = self.df_asin_img_url.filter(self.df_asin_img_url.asin_img_url.contains('amazon'))  # 保留包含amazon的字符串记录
        self.df_asin_img_url = self.sort_by_latest(df=self.df_asin_img_url)
        for i in range(1, 10, 1):
            self.df_asin_img_url = self.df_asin_img_url.withColumn(f"asin_trun_{i}", F.substring(self.df_asin_img_url.asin, 1, i))
        self.df_asin_img_url = self.df_asin_img_url.withColumn(
            "asin_img_path",
            F.concat(
                F.lit("/"), self.df_asin_img_url.asin_trun_1,
                F.lit("/"), self.df_asin_img_url.asin_trun_2,
                F.lit("/"), self.df_asin_img_url.asin_trun_3,
                F.lit("/"), self.df_asin_img_url.asin_trun_4,
                F.lit("/"), self.df_asin_img_url.asin_trun_5,
                F.lit("/"), self.df_asin_img_url.asin_trun_6,
                F.lit("/")
            )
        )
        self.df_asin_img_url = self.df_asin_img_url.withColumnRenamed("date_info", "date_info_img_url")
        print("self.df_asin_img_url.columns:", self.df_asin_img_url.columns)
        # self.df_asin_img_url.show(10, truncate=False)

    def handle_title(self):
        # 过滤null和none字符串
        self.df_asin_title = self.df_asin_detail.select("asin", "asin_title", "date_info").filter("asin_title is not null and asin_title not in ('none', 'null', 'nan')")
        # 小写
        self.df_asin_title = self.df_asin_title.withColumn("asin_title_lower", F.lower(self.df_asin_title["asin_title"]))  # 小写
        # self.df_asin_title.show(10, truncate=False)
        # 取最新的date_info对应的title
        self.df_asin_title = self.sort_by_latest(df=self.df_asin_title)
        # self.df_asin_title.show(10, truncate=False)
        # 匹配主题数据
        self.handle_title_theme()
        # 存储一份主题竖表数据
        self.reset_partitions(partitions_num=100)
        self.save_data_common(
            df_save=self.df_save_vertical,
            db_save=self.db_save_vertical,
            partitions_num=self.partitions_num,
            partitions_by=self.partitions_by
        )

    def handle_title_theme(self):
        pdf_theme = self.df_theme.toPandas()
        theme_list = list(set(pdf_theme.theme_en_lower))
        self.theme_list_str = str([f" {theme} " for theme in theme_list])
        print("self.theme_list_str:", self.theme_list_str[:100])
        # 匹配宽表时用到
        df_asin_title = self.df_asin_title.cache()  # 后面用作匹配asin_title

        self.df_asin_title = self.df_asin_title.withColumn("asin_title_lower", F.concat(F.lit(" "), "asin_title_lower", F.lit(" ")))  # 标题两头加空字符串用来匹配整个词
        self.df_asin_title = self.df_asin_title.withColumn("theme_en_lower", self.u_theme_pattern('asin_title_lower', F.lit(self.theme_list_str)))
        # 将列拆分为数组多列
        self.df_asin_title = self.df_asin_title.withColumn("theme_en_lower", F.split(self.df_asin_title["theme_en_lower"], ","))
        # 将数组合并到多行
        self.df_asin_title = self.df_asin_title.withColumn("theme_en_lower", F.explode(self.df_asin_title["theme_en_lower"]))
        self.df_asin_title = self.df_asin_title.join(
            self.df_theme, on=['theme_en_lower'], how='left'  # 改成inner, 这样避免正则匹配结果不准
        )
        # 1. 竖表
        self.df_save_vertical = self.df_asin_title.cache()
        self.df_save_vertical = self.df_save_vertical.withColumn("site_name", F.lit(self.site_name))
        print("self.df_save_vertical.columns:", self.df_save_vertical.columns)
        # print("self.df_save_vertical.count():", self.df_save_vertical.count())
        # self.df_save_vertical.show(30, truncate=False)
        # self.df_save_vertical.filter("theme_en_lower is not null").show(30, truncate=False)

        # 2. 宽表
        self.df_asin_title = self.df_asin_title.drop_duplicates(['asin', 'theme_type_en', 'theme_ch'])
        self.df_asin_title = self.df_asin_title.withColumn("theme_type_en_counts", F.concat("theme_type_en", F.lit("_counts")))
        self.df_asin_title = self.df_asin_title.withColumn("theme_type_en_ids", F.concat("theme_type_en", F.lit("_ids")))
        # self.df_asin_title.filter('theme_type_en_counts is null').show(20, truncate=False)   # 没有记录
        self.df_asin_title = self.df_asin_title.filter('theme_type_en_counts is not null')
        pivot_df1 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_counts").agg(
            F.expr("IFNULL(count(*), 0) AS value"))
        pivot_df1 = pivot_df1.na.fill(0)
        pivot_df2 = self.df_asin_title.groupBy("asin").pivot("theme_type_en_ids").agg(
            F.concat_ws(",", F.collect_list("theme_id")))
        # pivot_df1.show(30, truncate=False)
        # pivot_df2.show(30, truncate=False)
        # self.df_save_wide = df_asin_title.join(
        self.df_asin_title = df_asin_title.join(
            pivot_df1, on='asin', how='left'
        ).join(
            pivot_df2, on='asin', how='left'
        )
        # self.df_save_wide.show(30, truncate=False)
        self.df_asin_title = self.df_asin_title.withColumnRenamed("date_info", "date_info_title")
        self.df_asin_title = self.df_asin_title.drop("site_name")
        print("self.df_asin_title.columns:", self.df_asin_title.columns)
        # self.df_asin_title.show(30, truncate=False)

    def handle_weight(self):
        self.df_asin_weight_new = self.df_asin_detail.filter("(date_info >= '2023-18' and date_type='week') or (date_type in ('month', 'month_week'))").select("asin", "weight", "weight_str", "date_info", "site_name").cache()
        self.df_asin_weight_old = self.df_asin_detail.filter("date_info < '2023-18' and date_type='week'").select("asin", "weight", "weight_str", "date_info", "site_name").cache()
        self.handle_weight_new()
        self.handle_weight_old()
        print("self.df_asin_weight.columns:", self.df_asin_weight.columns)
        print("self.df_asin_weight_old.columns:", self.df_asin_weight_old.columns)
        self.df_asin_weight = self.df_asin_weight_new.unionByName(self.df_asin_weight_old, allowMissingColumns=True)
        self.df_asin_weight = self.sort_by_latest(df=self.df_asin_weight)
        # 将weight列中的'none'转为null,并转为浮点数类型
        self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") == 'none', None).otherwise(
                                                                 F.col("weight").cast(FloatType())))
        # weight列中小于等于0.001的值设为0.001
        self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") <= 0.001, 0.001).otherwise(F.col("weight")))
        # 保留4位小数
        self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.round(self.df_asin_weight["weight"], 4))
        # self.df_asin_weight.show(20, truncate=False)

        self.df_asin_weight = self.df_asin_weight.withColumnRenamed(
            "weight_str", "asin_weight_str"
        ).withColumnRenamed(
            "weight", "asin_weight"
        ).withColumnRenamed(
            "weight_type", "asin_weight_type"
        )
        self.df_asin_weight = self.df_asin_weight.withColumnRenamed("date_info", "date_info_weight")
        self.df_asin_weight = self.df_asin_weight.drop("site_name")
        print("self.df_asin_weight.columns:", self.df_asin_weight.columns)
        # self.df_asin_title.show(30, truncate=False)

    def handle_weight_new(self):
        print("开始处理重量数据: 2023-18周之后")
        # 将列类型转为字符串并转为小写
        self.df_asin_weight_new = self.df_asin_weight_new.withColumn("weight_str", F.lower(F.col("weight_str").cast(StringType())))
        # 提取体积字符串中的weight_info, weight_type
        self.df_asin_weight_new = self.df_asin_weight_new.withColumn('weight_detail', self.u_get_weight('weight_str', 'site_name'))
        self.df_asin_weight_new = self.df_asin_weight_new \
            .withColumn('weight', self.df_asin_weight_new.weight_detail.getField('weight')) \
            .withColumn('weight_type', self.df_asin_weight_new.weight_detail.getField('weight_type')) \
            .drop('weight_detail')

        # # 将weight列中的'none'转为null,并转为浮点数类型
        # self.df_asin_weight_new = self.df_asin_weight_new.withColumn("weight", F.when(F.col("weight") == 'none', None).otherwise(
        #                                                          F.col("weight").cast(FloatType())))
        #
        # # weight列中小于等于0.001的值设为0.001
        # self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") <= 0.001, 0.001).otherwise(F.col("weight")))

        # # 将weight_str列中的'none'转为null
        # self.df_asin_weight = self.df_asin_weight.withColumn("weight_str", F.when(F.col("weight_str") == 'none', None).otherwise(F.col("weight_str")))

    def handle_weight_old(self):
        print("开始处理重量数据: 2023-18周之前")
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn("weight_type", F.lit(self.weight_type))
        window = Window.partitionBy(['asin']).orderBy(self.df_asin_weight_old.date_info.desc())
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn(
            "row_number", F.row_number().over(window)
        )
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn('row_number',
                                                                     F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_asin_weight_old = self.df_asin_weight_old.filter(self.df_asin_weight_old.row_number == 1).drop(
            'row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列

    def handle_volume(self):
        self.df_asin_volume = self.df_asin_detail.select("asin", "asin_volume", "date_info")
        if self.site_name == 'us':
            self.handle_volume_us()
        else:
            self.handle_volume_others()
        self.df_asin_volume = self.df_asin_volume.withColumnRenamed("date_info", "date_info_volume")
        self.df_asin_volume = self.df_asin_volume.drop("site_name")
        self.handle_volume_sorted()
        print("self.df_asin_volume.columns:", self.df_asin_volume.columns)
        # self.df_asin_volume.show(30, truncate=False)

    def handle_volume_us(self):
        self.handle_filter_dirty_data()
        # self.handle_type_inches()
        # self.handle_type_cm()
        df_inches = self.handle_asin_volume_types_to_dimensions(asin_volume_type='inches')
        df_cm = self.handle_asin_volume_types_to_dimensions(asin_volume_type='cm')
        df_cm_inches = self.handle_asin_volume_types_to_dimensions(asin_volume_type='cm,inches')
        df_none = self.handle_asin_volume_types_to_dimensions(asin_volume_type='none')
        df_none_not_null = df_none.filter(~(df_none.length.isNull() & df_none.width.isNull() & df_none.height.isNull()))
        df_none_null = df_none.filter(df_none.length.isNull() & df_none.width.isNull() & df_none.height.isNull())
        df_none_not_null = df_none_not_null.withColumn("asin_volume_type", F.lit("inches"))
        print("df_none_not_null, df_none_null:", df_none_not_null.count(), df_none_null.count())
        # self.df_save = pd.concat([df_inches, df_cm, df_cm_inches, df_none])
        # 假设 df_inches、df_cm、df_cm_inches 和 df_none 都是 PySpark DataFrame
        self.df_asin_volume = df_inches.union(df_cm).union(df_cm_inches).union(df_none_not_null).union(df_none_null)
        self.df_asin_volume = self.df_asin_volume.drop("asin_volume_flag")
        self.df_asin_volume = self.df_asin_volume.withColumnRenamed("length", "asin_length"). \
            withColumnRenamed("width", "asin_width"). \
            withColumnRenamed("height", "asin_height")

    def handle_filter_dirty_data(self):
        # 将 asin_volume 列转换为小写
        self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume", F.lower(self.df_asin_volume["asin_volume"]))
        # 使用自定义函数创建新列 asin_volume_flag
        self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume_flag", self.u_contains_digit_udf(self.df_asin_volume["asin_volume"]))
        # 假设 df 是一个 PySpark DataFrame,asin_volume_flag 是 DataFrame 中的一列
        # self.df_asin_volume.groupBy('asin_volume_flag').agg(F.count('asin_volume_flag')).show()
        # self.df_asin_volume.show()
        self.df_asin_volume = self.df_asin_volume.filter('asin_volume_flag is True')
        # self.df_asin_volume.groupBy('asin_volume_flag').agg(F.count('asin_volume_flag')).show()
        # self.df_asin_volume.show()
        self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume_type", self.u_asin_volume_type(self.df_asin_volume["asin_volume"]))
        self.df_asin_volume.groupBy('asin_volume_type').agg(F.count('asin_volume_type')).show()
        self.df_asin_volume.show()
        # 假设 df 是一个 PySpark DataFrame,date_info 是 DataFrame 中的一列
        window = Window.partitionBy('asin').orderBy(F.desc('date_info'))  # 按照 date_info 列进行分区,并按照 date 列进行排序
        self.df_asin_volume = self.df_asin_volume.withColumn('row_number', F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_asin_volume = self.df_asin_volume.filter(self.df_asin_volume.row_number == 1).drop('row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列
        self.df_asin_volume.groupBy('asin_volume_type').agg(F.count('asin_volume_type')).show()
        self.df_asin_volume.show()

    def handle_asin_volume_types_to_dimensions(self, asin_volume_type='inches'):
        df = self.df_asin_volume.filter(f'asin_volume_type="{asin_volume_type}"').cache()
        # 使用 UDF 提取长宽高并添加新的列
        df = df.withColumn('dimensions', self.u_extract_dimensions('asin_volume', F.lit(asin_volume_type)))
        # 将新的列拆分成三个列并删除 dimensions 列
        df = df \
            .withColumn('length', df.dimensions.getField('length')) \
            .withColumn('width', df.dimensions.getField('width')) \
            .withColumn('height', df.dimensions.getField('height')) \
            .drop('dimensions')
        df.show(10, truncate=False)
        # # 假设 df_asin_none 是一个 PySpark DataFrame,length、width 和 height 是 DataFrame 中的列
        # df_null = df.filter(df.length.isNull() & df.width.isNull() & df.height.isNull())
        # print("asin_volume_type, df_null:", asin_volume_type, df_null.count())
        # df_null.show(50, truncate=False)
        return df

    def handle_volume_others(self):
        self.handle_filter_dirty_data()
        # 提取体积字符串中的length, width, height
        self.df_asin_volume = self.df_asin_volume.withColumn('dimensions', self.u_extract_dimensions_others('asin_volume', 'asin_volume_type'))
        self.df_asin_volume = self.df_asin_volume \
            .withColumn('asin_length', self.df_asin_volume.dimensions.getField('length')) \
            .withColumn('asin_width', self.df_asin_volume.dimensions.getField('width')) \
            .withColumn('asin_height', self.df_asin_volume.dimensions.getField('height')) \
            .drop('dimensions')
        self.df_asin_volume = self.df_asin_volume.drop("asin_volume_flag")

    def handle_volume_sorted(self):
        self.df_asin_volume = self.df_asin_volume.withColumn('dimensions', self.u_volume_sorted('asin_length', 'asin_width', 'asin_height'))
        # 将新的列拆分成三个列并删除 dimensions 列
        self.df_asin_volume = self.df_asin_volume \
            .withColumn('asin_length_sorted', self.df_asin_volume.dimensions.getField('asin_length')) \
            .withColumn('asin_width_sorted', self.df_asin_volume.dimensions.getField('asin_width')) \
            .withColumn('asin_height_sorted', self.df_asin_volume.dimensions.getField('asin_height')) \
            .drop('dimensions')
        # self.df_asin_volume.show(10, truncate=False)
        self.df_asin_volume = self.df_asin_volume.replace({'0': None})

        self.df_asin_volume = self.df_asin_volume.withColumn('asin_length_sorted', F.col('asin_length_sorted').cast('double')) \
            .withColumn('asin_width_sorted', F.col('asin_width_sorted').cast('double')) \
            .withColumn('asin_height_sorted', F.col('asin_height_sorted').cast('double'))
        self.df_asin_volume.show(10, truncate=False)


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DimAsinStableInfo(site_name=site_name)
    handle_obj.run()