import re
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, MapType, StringType

from yswg_utils.udf_util import UdfUtil
import json


def udf_title_number_parse_reg():
    def udf_title_number_parse(title):
        val = udf_get_package_quantity(title)
        if val is not None:
            return [{
                "match": None,
                "label": None,
                "value": udf_get_package_quantity(title),
            }]
        return None

    return F.udf(udf_title_number_parse, ArrayType(MapType(StringType(), StringType())))


def get_Fba_Fee(longVal: float,
                width: float,
                high: float,
                weight: float,
                ):
    """
    根据长宽高计算fba类型,长宽高
    :param longVal:长=> cm
    :param width: 宽=> cm
    :param high:  高=> cm
    :param weight: 重量单位为g
    :return:
    """
    fee_type = 0
    fba_fee = 0
    if (longVal <= 36 and width <= 28 and high <= 1.6 and weight <= 113.5):
        fee_type = 1
        fba_fee = 3.22
    elif (longVal <= 36 and width <= 28 and high <= 1.6 and weight > 113.5 and weight <= 227):
        fee_type = 2
        fba_fee = 3.4
    elif (longVal <= 36 and width <= 28 and high <= 1.6 and weight > 227 and weight <= 340.5):
        fee_type = 3
        fba_fee = 3.58
    elif (longVal <= 36 and width <= 28 and high <= 1.6 and weight > 340.5 and weight <= 454):
        fee_type = 4
        fba_fee = 3.77
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight <= 113.5):
        fee_type = 5
        fba_fee = 3.86
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 113.5 and weight <= 227):
        fee_type = 6
        fba_fee = 4.08
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 227 and weight <= 340.5):
        fee_type = 7
        fba_fee = 4.24
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 340.5 and weight <= 454):
        fee_type = 8
        fba_fee = 4.75
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 454 and weight <= 681):
        fee_type = 9
        fba_fee = 5.4
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 681 and weight <= 908):
        fee_type = 10
        fba_fee = 5.69
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 908 and weight <= 1135):
        fee_type = 11
        fba_fee = 6.1
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 1135 and weight <= 1362):
        fee_type = 12
        fba_fee = 6.39
    elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 1362 and weight <= 9080):
        fee_type = 13
        fba_fee = 7.33
    elif (longVal <= 152.4 and (longVal + 2 * (width + high)) <= 330.2 and weight <= 31780):
        fee_type = 14
        fba_fee = 10.15
    elif (longVal <= 274.32 and (longVal + 2 * (width + high)) <= 419.1 and weight <= 68100):
        fee_type = 15
        fba_fee = 19.47
    elif (longVal <= 274.32 and (longVal + 2 * (width + high)) > 419.1 and weight <= 68100):
        fee_type = 16
        fba_fee = 90.81
    elif (longVal > 274.32 and (longVal + 2 * (width + high)) > 419.1 and weight > 68100):
        fee_type = 17
        fba_fee = 159.32
    return (fee_type, fba_fee)


def udf_get_package_quantity_with_flag(title):
    """
    获取打包数量
    :param title:
    :return:
    """
    if title != '':
        title = str(title).lower()
        title = title.replace(' ', ' ')
        eligible_list = []
        unit_list = []
        parse_list = []
        thousand_bit_symmbol_count = 0
        eligible_value_map = {"one": 1, "two": 2, "three": 3, "four": 4, "five": 5,
                              "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10}
        key_list = ['set', 'pack', 'pair', 'box', 'quantity']
        if title not in ['null', 'none']:
            patterns = [
                r'\b(?<!\d\.\d)(?<!\d\sx\s)((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!%)[-_\s]*(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)(?![a-zA-Z])',
                r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) ((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!\sx\s\d)(?!%)\b',
                r'\b(total|count|quantity)\s*[-_\s]*\s*((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.)(?!\sx\s\d)\b(?!%)',
                r'\b(one|two|three|four|five|six|seven|eight|nine|ten)(?: +)(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)(?![a-zA-Z])',
                r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) (one|two|three|four|five|six|seven|eight|nine|ten)\b',
                r'\b(total|count|quantity)(?: +)(one|two|three|four|five|six|seven|eight|nine|ten)\b'
            ]
            for i in range(len(patterns)):
                pattern = patterns[i]
                result_list = re.findall(pattern, title)
                if len(result_list) > 0:
                    for result in result_list:
                        if i in [0, 3]:
                            eligible_element = result[0]
                            unit_element = result[1]
                        else:
                            eligible_element = result[1]
                            unit_element = result[0]
                        eligible_list.append(eligible_element)
                        unit_list.append(unit_element)
            if eligible_list and unit_list:
                for key in key_list:
                    if key in unit_list and f'{key} of' in unit_list:
                        index = unit_list.index(key)
                        unit_list.pop(index)
                        eligible_list.pop(index)
                for eligible_element in eligible_list:
                    if eligible_element in eligible_value_map.keys():
                        eligible_element_value = eligible_value_map[eligible_element]
                        parse_list.append(int(eligible_element_value))
                    else:
                        if str(eligible_element).count(',') > 0:
                            thousand_bit_symmbol_count = thousand_bit_symmbol_count + 1
                            eligible_element = str(eligible_element).replace(',', '')
                        if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 10000) and (
                                int(eligible_element) >= 0):
                            parse_list.append(int(eligible_element))
        if len(parse_list) == 1:
            return parse_list[0], 0
        elif len(parse_list) > 1:
            if thousand_bit_symmbol_count >= 2 and len(parse_list) >= 3:
                return min(parse_list), 1
            else:
                return max(parse_list), 1
        else:
            return None, None


def udf_get_package_quantity(title):
    """
    获取打包数量
    :param title:
    :return:
    """
    if title != '':
        title = str(title).lower()
        title = title.replace(' ', ' ')
        eligible_list = []
        unit_list = []
        parse_list = []
        thousand_bit_symmbol_count = 0
        eligible_value_map = {"one": 1, "two": 2, "three": 3, "four": 4, "five": 5,
                              "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10}
        key_list = ['set', 'pack', 'pair', 'box', 'quantity']
        if title not in ['null', 'none']:
            patterns = [
                r'\b(?<!\d\.\d)(?<!\d\sx\s)((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!%)[-_\s]*(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)(?![a-zA-Z])',
                r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) ((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!\sx\s\d)(?!%)\b',
                r'\b(total|count|quantity)\s*[-_\s]*\s*((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.)(?!\sx\s\d)\b(?!%)',
                r'\b(one|two|three|four|five|six|seven|eight|nine|ten)(?: +)(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)\b',
                r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) (one|two|three|four|five|six|seven|eight|nine|ten)\b',
                r'\b(total|count|quantity)(?: +)(one|two|three|four|five|six|seven|eight|nine|ten)\b'
            ]
            for i in range(len(patterns)):
                pattern = patterns[i]
                result_list = re.findall(pattern, title)
                if len(result_list) > 0:
                    for result in result_list:
                        if i in [0, 3]:
                            eligible_element = result[0]
                            unit_element = result[1]
                        else:
                            eligible_element = result[1]
                            unit_element = result[0]
                        eligible_list.append(eligible_element)
                        unit_list.append(unit_element)
            if eligible_list and unit_list:
                for key in key_list:
                    if key in unit_list and f'{key} of' in unit_list:
                        index = unit_list.index(key)
                        unit_list.pop(index)
                        eligible_list.pop(index)
                for eligible_element in eligible_list:
                    if eligible_element in eligible_value_map.keys():
                        eligible_element_value = eligible_value_map[eligible_element]
                        parse_list.append(int(eligible_element_value))
                    else:
                        if str(eligible_element).count(',') > 0:
                            thousand_bit_symmbol_count = thousand_bit_symmbol_count + 1
                            eligible_element = str(eligible_element).replace(',', '')
                        if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 10000) and (
                                int(eligible_element) >= 0):
                            parse_list.append(int(eligible_element))
        if len(parse_list) == 1:
            return parse_list[0]
        elif len(parse_list) > 1:
            if thousand_bit_symmbol_count >= 2 and len(parse_list) >= 3:
                return min(parse_list)
            else:
                return max(parse_list)
        else:
            return None


# 公用函数-处理String类型空值返回NoneType
def udf_handle_string_null_value(value):
    # 转小写并去除头尾空值
    if value is not None:
        handle_value = str(value).strip().lower()
        if handle_value in ['null', 'none', '', '-1']:
            return None
        else:
            return value
    return None


def parse_best_sellers_href(href: str):
    """
    根据asin想去 best_sellers 解析获取一级分类/当前分类
    :param href:
    :return:
    """
    arr = href.split("/")

    last_val = UdfUtil.safeIndex(arr, len(arr) - 1, None)

    if "ref=" in last_val:
        category_id = UdfUtil.safeIndex(arr, len(arr) - 2, None)
        category_first_id = UdfUtil.safeIndex(arr, len(arr) - 3, None)
    else:
        category_id = UdfUtil.safeIndex(arr, len(arr) - 1, None)
        category_first_id = UdfUtil.safeIndex(arr, len(arr) - 2, None)

    return (category_id, category_first_id)
    pass


def parse_bsr_url(nodes_num: int, url: str):
    """
    统一解析链接获取 bsr 分类id等数据
    :param nodes_num:
    :param url:
    :return:
    """
    arr = url.split("/")
    if not "ref=" in url:
        ref_suffix = None
        category_id = UdfUtil.safeIndex(arr, len(arr) - 1, None)
        category_first_id = UdfUtil.safeIndex(arr, len(arr) - 2, None)
    else:
        ref_suffix = UdfUtil.safeIndex(arr, len(arr) - 1, None)
        category_first_id = UdfUtil.safeIndex(arr, len(arr) - 3, None)
        category_id = UdfUtil.safeIndex(arr, len(arr) - 2, None)
    if nodes_num == 1:
        level = 1
    elif url.endswith("_0"):
        level = 2
    elif url.endswith(f"{category_first_id}_1"):
        level = 3
    else:
        level = 4

    # 获取 parent id
    if level == 1:
        # 根节点
        category_id = "0"
        category_first_id = None
        category_parent_id = None
    elif level == 2:
        # 一级节点
        category_id = category_id
        category_first_id = category_id
        category_parent_id = "0"
    elif level == 3:
        # 一级节点下的次级节点
        category_id = category_id
        category_first_id = category_first_id
        category_parent_id = category_first_id
    elif level == 4:
        category_id = category_id
        category_first_id = category_first_id
        if ref_suffix is not None:
            category_parent_id = ref_suffix[ref_suffix.rfind("_") + 1:]
        else:
            category_parent_id = None
    else:
        category_id = None
        category_first_id = None
        category_parent_id = None

    return {
        "category_id": category_id,
        "category_first_id": category_first_id,
        "category_parent_id": category_parent_id
    }
    pass


def parse_weight_str(weight_str: str, site_name: str):
    """
    解析重量字符串获取重量和单位,逗号分隔
    :param weight_str:
    :param site_name:
    :return:
    """
    val = None
    weight_type = 'pounds' if site_name == 'us' else 'grams'
    if weight_str is not None:
        if 'pounds' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str)
            val = round(float(match.group(1)), 3) if site_name == 'us' and match else round(
                float(match.group(1)) * 1000 * 0.454, 3) if match else None
        elif 'ounces' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str)
            val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round(
                float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else None
        elif any(substring in weight_str for substring in ['kilogram', ' kg']):
            weight_str = weight_str.replace(' kg', ' kilogram')
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str)
            val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)) * 1000, 3) if match else None
        elif any(substring in weight_str for substring in ['milligrams']):
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str)
            val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)) / 1000, 3) if match else None
        elif ' gram' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str)
            val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)), 3) if match else None
        elif ' g' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str)
            val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)), 3) if match else None

    return val, weight_type


def udf_new_asin_flag(launch_time, cal_day):
    """
    计算asin是否新品标记公共udf函数
    :param launch_time: asin上架时间
    :param cal_day: 计算日期(周取周最后一天,月取月最后一天,需调用工具类get_calDay_by_dateInfo获取)
    :return: 是否新品标记,1为新品;0为非新品
    """
    if launch_time is None or cal_day is None:
        return None

    date_format = "%Y-%m-%d"
    try:
        # 将日期字符串转换为 datetime 对象
        datetime1 = datetime.strptime(launch_time, date_format)
        datetime2 = datetime.strptime(cal_day, date_format)
        # 计算日期的偏移量
        offset = (datetime2 - datetime1).days
        # 判断偏移量是否小于180天,是则确定为新品
        if offset <= 180:
            return 1
        else:
            return 0
    except ValueError:
        # 日期字符串格式不正确
        return None


def category_craw_flag(category_first_id, asin: str = None):
    """
    用于判断asin或者分类是否爬取
    :param category_first_id:
    :param asin:
    """
    if asin is not None and not asin.startswith("B0"):
        return False
    arr = [
        "audible",
        "books",
        "digital-text",
        "dmusic",
        "mobile-apps",
        "movies-tv",
        "music",
        "software",
        "videogames"
    ]
    if category_first_id is None or category_first_id in arr:
        return False
    return True


def sort_volume(val1, val2, val3):
    """
    排序长宽高
    """

    def custom_sort(item):
        if item is None:
            return 0
        return item

    arr = [val1, val2, val3]
    arr.sort(key=custom_sort, reverse=True)

    l = UdfUtil.safeIndex(arr, 0, None)
    w = UdfUtil.safeIndex(arr, 1, None)
    h = UdfUtil.safeIndex(arr, 2, None)
    return l, w, h


def parse_asin_volume_str(volume_str, sortFlag=False):
    """
    解析 volume_str
    :param volume_str: 体积长宽高字符串
    :param sortFlag: 是否按照大小定义长宽高排序
    :return: l, w, h, type 返回长宽高单位(均为原始数据)
    """
    if volume_str is None:
        return None, None, None, None

    types = re.findall(r"inches|inch|cm|centímetros|centimetres|milímetros|millimeter|mm|metros", volume_str)
    #  多个单位的截取第一个单位
    if len(types) >= 2:
        volume_str = volume_str[0:volume_str.find(types[0])]

    matches = re.findall(r"(\d+(\.\d+)?)", volume_str)
    values = [float(val[0]) for val in matches]
    if sortFlag:
        values.sort(reverse=True)

    l = UdfUtil.safeIndex(values, 0, None)
    w = UdfUtil.safeIndex(values, 1, None)
    h = UdfUtil.safeIndex(values, 2, None)
    type = UdfUtil.safeIndex(types, 0, None)
    if type in ['inches', 'inch']:
        type = "inches"
    elif type in ['cm', 'centímetros', 'centimetres']:
        type = "cm"
    elif type in ['milímetros', 'millimeter', 'mm']:
        type = "mm"
    elif type in ['metros']:
        type = "m"
    else:
        type = "inches"
        sortVal = re.findall(r"l|d|w|h", volume_str)
        if len(sortVal) > 0:
            tmpMap = {
                str(key): UdfUtil.safeIndex(values, i, None) for i, key in enumerate(sortVal)
            }
            l = tmpMap.get("l") or tmpMap.get("d")
            w = tmpMap.get("w")
            h = tmpMap.get("h")
    return l, w, h, type


def udf_rank_and_category(best_sellers_rank):
    """
    解析bs分类名称和排名
    """
    pattern = r"#([\d,]+) in ([\w&' ]+)"
    matches = re.findall(pattern, best_sellers_rank)

    bs_rank_str = ",".join([rank.replace(",", "") for rank, category in matches])
    bs_category_str = ",".join([category.strip().replace(",", " ") for rank, category in matches])
    return bs_rank_str, bs_category_str


def udf_ele_mattch(match_text: str, ele_list_str: str):
    """
    字符串多包含多个元素精准匹配
    :param match_text: 待匹配的字符串
    :param ele_list_str: 需要匹配的匹配词list(此处可将list直接str(list)传入)
    :return: 返回字符串中匹配到的多个匹配词的字符串。采用”,“拼接,可根据","拆分;如都无匹配结果则为None
    """
    pattern = re.compile(r'(?<!\+|\*|\-|\%|\.)\b({})\b'.format('|'.join([re.escape(x) for x in ele_list_str])), flags=re.IGNORECASE)
    ele_list = re.findall(pattern, match_text)
    if ele_list:
        return ','.join(set(ele_list))
    else:
        return None


# 插件-体积标准提取
def udf_extract_volume_format(volume_str: str):
    # 解析类型
    # pattern = r'\b\w+\b'
    volume_str = str(volume_str).lower()
    pattern = r'[a-z]+'
    matches = re.findall(pattern, volume_str)

    # 使用集合存储匹配的单词
    type_set = set()
    for word in matches:
        if word in ['inches', 'inch']:
            type_set.add('inches')
        elif word in ['cm', 'centímetros', 'centimetres']:
            type_set.add('cm')
        elif word in ['milímetros', 'millimeter', 'mm']:
            type_set.add('mm')
        elif word in ['metros']:
            type_set.add('m')

    # 根据集合的长度返回结果
    if len(type_set) == 1:
        asin_volume_type = list(type_set)[0]
    elif len(type_set) >= 2:
        asin_volume_type = ','.join(type_set)
    else:
        asin_volume_type = ''

    # 解析长宽高
    # length, width, height = None, None, None
    if asin_volume_type == 'cm,inches':
        num_inches = volume_str.find('inch')
        num_cm = volume_str.find('cm')
        volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
    dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
    dimensions = [float(dim[0]) for dim in dimensions]

    if len(dimensions) == 1:
        length = dimensions[0]
        result = f"{length}"
    elif len(dimensions) == 2:
        # if asin_volume_type == '':
        #     if "l" in volume_str and "w" in volume_str:
        #         length, width = dimensions
        #     elif "w" in volume_str and "h" in volume_str:
        #         width, height = dimensions
        #     elif "l" in volume_str and "h" in volume_str:
        #         length, height = dimensions
        #     elif "d" in volume_str and "w" in volume_str:
        #         length, width = dimensions
        #     elif "d" in volume_str and "h" in volume_str:
        #         length, height = dimensions
        # else:
        #     length, width = dimensions
        length, width = dimensions
        result = f"{length}*{width}"
    elif len(dimensions) == 3:
        length, width, height = dimensions
        result = f"{length}*{width}*{height}"
    elif len(dimensions) >= 4:
        length, width, height = dimensions[:3]
        result = f"{length}*{width}*{height}"
    else:
        result = ""

    if asin_volume_type == "inches":
        # 单位转换成cm
        return "*".join([str(round(float(dim) * 2.54, 2)) for dim in result.split("*")]) + "cm"
    else:
        return f"{result}{asin_volume_type}"


# 大数据 -- 返回长+宽+高+类型
def udf_extract_volume_dimensions(volume_str: str):
    # 解析类型
    # pattern = r'\b\w+\b'
    volume_str = str(volume_str).lower()
    pattern = r'[a-z]+'
    matches = re.findall(pattern, volume_str)

    # 使用集合存储匹配的单词
    type_set = set()
    for word in matches:
        if word in ['inches', 'inch']:
            type_set.add('inches')
        elif word in ['cm', 'centímetros', 'centimetres']:
            type_set.add('cm')
        elif word in ['milímetros', 'millimeter', 'mm']:
            type_set.add('mm')
        elif word in ['metros']:
            type_set.add('m')

    # 根据集合的长度返回结果
    if len(type_set) == 1:
        asin_volume_type = list(type_set)[0]
    elif len(type_set) >= 2:
        asin_volume_type = ','.join(type_set)
    else:
        asin_volume_type = ''

    # 解析长宽高
    length, width, height = None, None, None
    if asin_volume_type == 'cm,inches':
        num_inches = volume_str.find('inch')
        num_cm = volume_str.find('cm')
        volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
    dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
    dimensions = [float(dim[0]) for dim in dimensions]

    if len(dimensions) == 1:
        length = dimensions[0]
    elif len(dimensions) == 2:
        if asin_volume_type == '':
            if "l" in volume_str and "w" in volume_str:
                length, width = dimensions
            elif "w" in volume_str and "h" in volume_str:
                width, height = dimensions
            elif "l" in volume_str and "h" in volume_str:
                length, height = dimensions
            elif "d" in volume_str and "w" in volume_str:
                length, width = dimensions
            elif "d" in volume_str and "h" in volume_str:
                length, height = dimensions
            else:
                length, width = dimensions
        else:
            length, width = dimensions
        asin_volume_type = "inches"
    elif len(dimensions) >= 3:
        length, width, height = dimensions[:3]
        asin_volume_type = "inches"

    # 降序排序
    example_list = [length, width, height]

    # 使用 sorted 函数进行排序
    # key=lambda x: (x is not None, x) 确保 None 值被视为最小值并排在最后
    # reverse=True 以实现降序排序
    sorted_list = sorted(example_list, key=lambda x: (x is not None, x), reverse=True)
    length, width, height = sorted_list
    return (length, width, height, asin_volume_type)


# 插件-重量标准提取
def udf_extract_weight_format(weight_str: str):
    """
    解析重量字符串获取重量和单位,逗号分隔
    :param weight_str:
    :param site_name:
    :return:
    """
    val = None
    # weight_type = 'pounds' if site_name == 'us' else 'grams'
    weight_type = 'g'
    if weight_str is not None:
        if 'pounds' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str)
            val = round(float(match.group(1)) * 1000 * 0.454, 3) if match else None
        elif 'ounces' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str)
            val = round(float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else None
        elif any(substring in weight_str for substring in ['kilogram', ' kg']):
            weight_str = weight_str.replace(' kg', ' kilogram')
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str)
            val = round(float(match.group(1)) * 1000, 3) if match else None
        elif any(substring in weight_str for substring in ['milligrams']):
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str)
            val = round(float(match.group(1)) / 1000, 3) if match else None
        elif ' gram' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str)
            val = round(float(match.group(1)), 3) if match else None
        elif ' g' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str)
            val = round(float(match.group(1)), 3) if match else None
    if val:
        return f"{round(val, 2)}{weight_type}"
    else:
        return f"{val}"


# 分类提取-返回: 一级/当前分类id+一级/当前分类排名
# 参考dim_asin_bs_info.py使用
def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern,
                          cate_1_pattern):
    """
    asin_bs_sellers_rank_lower: 底部分类字符串
    last_herf: 最后一级分类链接
    all_best_sellers_href: 所有分类链接
    cate_current_pattern: 当前分类排名匹配规则
    cate_1_pattern: 一级分类排名匹配规则
    """

    # if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'):
    #     href_list = all_best_sellers_href.split("&&&&")

    # 1. 判断用哪个字段来解析分类
    if str(all_best_sellers_href).lower() not in ['', 'none', 'null']:
        bs_href = all_best_sellers_href
    elif str(last_herf).lower() not in ['', 'none', 'null']:
        bs_href = last_herf
    else:
        bs_href = ''
    href_list = bs_href.replace("?tf=1", "").split("&&&&")

    # 新增climate-pledge分类优化--若最后一级是climate-pledge的分类,则向前取
    rank_flag = None
    while True:
        if '/climate-pledge' in href_list[-1] and len(href_list) >= 2:
            href_list.pop()
            rank_flag = True
        else:
            break

    # 2. 解析一级和当前 分类 + 排名
    # 2.1 提取分类
    if href_list:
        if len(href_list) == 1:
            cate_list = re.findall('bestsellers/(.*)/ref', href_list[0])
            if cate_list:
                if "/" in cate_list[0]:
                    cate_1_id, cate_current_id = cate_list[0].split("/")[0], cate_list[0].split("/")[-1]
                else:
                    cate_1_id, cate_current_id = cate_list[0].split("/")[0], None
            else:
                cate_1_id, cate_current_id = None, None
        else:
            cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref',
                                                                                          href_list[0]) else None
            cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall('bestsellers/(.*)/ref',
                                                                                                 href_list[
                                                                                                     -1]) else None
            if "/" in cate_1_id:
                cate_1_id = cate_1_id.split("/")[0]
            if "/" in cate_current_id:
                cate_current_id = cate_current_id.split("/")[-1]
    else:
        cate_1_id, cate_current_id = None, None

    # 2.2 提取排名
    if asin_bs_sellers_rank_lower is not None:
        asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(".", "").replace(",", "").replace(" 100 ", "")
    else:
        asin_bs_sellers_rank_lower2 = ''
    rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2)  # 匹配排名
    rank_list = [int(rank) for rank in rank_list]  # 转换成int类型
    # print("rank_list:", rank_list)
    if rank_flag:
        if len(rank_list) > len(href_list):
            rank_list = rank_list[:len(href_list)]

    if rank_list:
        if len(rank_list) == 1:
            if cate_1_pattern in asin_bs_sellers_rank_lower:
                cate_1_rank, cate_current_rank = rank_list[0], None
            else:
                cate_1_rank, cate_current_rank = None, rank_list[0]
        else:
            if cate_1_pattern in asin_bs_sellers_rank_lower:
                cate_1_rank, cate_current_rank = rank_list[0], rank_list[-1]
            else:
                cate_1_rank, cate_current_rank = None, rank_list[0]
    else:
        cate_1_rank, cate_current_rank = None, None

    return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank


# 将asin转换成1-10亿数值--从而可以划分指定分区表
def udf_asin_to_number(asin):
    """
    Convert a 10-character ASIN string to a unique number.
    This function assumes that ASIN consists of uppercase letters and digits.
    """

    def char_to_number(char):
        if char.isdigit():
            return int(char)
        else:
            return ord(char) - 55  # 'A' -> 10, 'B' -> 11, ..., 'Z' -> 35

    if len(asin) != 10:
        raise ValueError("ASIN must be 10 characters long")

    base = 36
    asin_number = 0
    for i, char in enumerate(reversed(asin)):
        asin_number += char_to_number(char) * (base ** i)

    # The final number is taken modulo 1 billion to fit the range 1-10 billion
    return asin_number % 1000000000


# 判断buy_box_seller_type类型
def udf_parse_seller_json(seller_json):
    """
    :param ship_from: 爬虫爬取asin详情页面上的字段信息
    :param sold_by: 爬虫爬取asin详情页面上的字段信息
    :param fulfilled_by:爬虫爬取asin详情页面上的字段信息
    :return: buy_box_seller_type: 1.amazon,2.fba,3.fbm,4.默认值(无类型)
    :return: 类型、店铺名称、店铺id
    """
    if not seller_json:
        return 0, None, None
    else:
        seller_info_parsed = json.loads(seller_json)
        ship_from = seller_info_parsed["ship_from"]
        sold_by = seller_info_parsed["sold_by"]
        fulfilled_by = seller_info_parsed["fulfilled_by"]
        seller_id = seller_info_parsed["seller_id"]
        if (ship_from and ship_from.lower().strip().startswith("amazon")) or (
                fulfilled_by and 'amazon' in fulfilled_by.lower()):
            if sold_by and not sold_by.lower().strip().startswith("amazon"):
                return 2, sold_by, seller_id  # FBA
            elif sold_by and sold_by.lower().strip().startswith("amazon"):
                return 1, sold_by, seller_id  # Amazon
        elif (ship_from or fulfilled_by) and sold_by:
            return 3, sold_by, seller_id  # FBM
        return 4, sold_by, seller_id  # Other


def udf_parse_amazon_orders(asin_amazon_orders_str):
    """
    :param asin_amazon_orders_str: 示例: '50+ bought in past month'
    解析asin详情页面的月销字段, 适配usukde3个站点
    """
    pattern = "(\d+[k]{0,})\+"
    results_list = re.findall(pattern, str(asin_amazon_orders_str).lower())
    if len(results_list) == 1:
        result = int(results_list[0].replace("k", "000").replace(" ", ""))
    else:
        result = None
    return result


# 解析ABA词的语种
def udf_detect_phrase_reg(lang_word_map):
    def detect_phrase(phrase: str):
        # + 号替换为空格用于分词
        phrase = re.sub(r'(\+)', ' ', phrase).strip()
        # 分词
        from nltk.tokenize import word_tokenize
        word_list = list(filter(lambda x: len(x) >= 2, word_tokenize(phrase, "english")))
        tmp_map = {
            "en": {"frequency": 0, "word": []},
            "fr": {"frequency": 0, "word": []},
            "es": {"frequency": 0, "word": []},
            "de": {"frequency": 0, "word": []},
        }
        for word in word_list:
            lang_rank_map: dict = lang_word_map.get(word)
            if lang_rank_map is not None:
                for lang in lang_rank_map.keys():
                    frequency = lang_rank_map[lang]
                    tmp_map[lang]["frequency"] = tmp_map[lang]["frequency"] + frequency
                    tmp_map[lang]["word"].append(word)
            pass
        # 先根据word名称个数倒序后根据分数
        lang, hint_word_map = sorted(tmp_map.items(), key=lambda it: (len(it[1]['word']), it[1]['frequency']), reverse=True)[0]
        if hint_word_map['frequency'] == 0:
            return {"lang": None, "hint_word": None}
        else:
            # 如果en的频率大于0,优先设为en
            if tmp_map['en']['frequency'] > 0:
                lang = 'en'
                hint_word_map = tmp_map['en']
            hint_word_list = hint_word_map['word']
            hint_word = " ".join(hint_word_list)
            if len(hint_word) <= 2:
                return {"lang": None, "hint_word": None}
            return {"lang": lang, "hint_word": hint_word}
        pass
    return F.udf(detect_phrase, MapType(StringType(), StringType()))