import re from datetime import datetime from pyspark.sql import functions as F from pyspark.sql.types import ArrayType, MapType, StringType from yswg_utils.udf_util import UdfUtil import json def udf_title_number_parse_reg(): def udf_title_number_parse(title): val = udf_get_package_quantity(title) if val is not None: return [{ "match": None, "label": None, "value": udf_get_package_quantity(title), }] return None return F.udf(udf_title_number_parse, ArrayType(MapType(StringType(), StringType()))) def get_Fba_Fee(longVal: float, width: float, high: float, weight: float, ): """ 根据长宽高计算fba类型,长宽高 :param longVal:长=> cm :param width: 宽=> cm :param high: 高=> cm :param weight: 重量单位为g :return: """ fee_type = 0 fba_fee = 0 if (longVal <= 36 and width <= 28 and high <= 1.6 and weight <= 113.5): fee_type = 1 fba_fee = 3.22 elif (longVal <= 36 and width <= 28 and high <= 1.6 and weight > 113.5 and weight <= 227): fee_type = 2 fba_fee = 3.4 elif (longVal <= 36 and width <= 28 and high <= 1.6 and weight > 227 and weight <= 340.5): fee_type = 3 fba_fee = 3.58 elif (longVal <= 36 and width <= 28 and high <= 1.6 and weight > 340.5 and weight <= 454): fee_type = 4 fba_fee = 3.77 elif (longVal <= 43 and width <= 34 and high <= 19 and weight <= 113.5): fee_type = 5 fba_fee = 3.86 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 113.5 and weight <= 227): fee_type = 6 fba_fee = 4.08 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 227 and weight <= 340.5): fee_type = 7 fba_fee = 4.24 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 340.5 and weight <= 454): fee_type = 8 fba_fee = 4.75 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 454 and weight <= 681): fee_type = 9 fba_fee = 5.4 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 681 and weight <= 908): fee_type = 10 fba_fee = 5.69 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 908 and weight <= 1135): fee_type = 11 fba_fee = 6.1 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 1135 and weight <= 1362): fee_type = 12 fba_fee = 6.39 elif (longVal <= 43 and width <= 34 and high <= 19 and weight > 1362 and weight <= 9080): fee_type = 13 fba_fee = 7.33 elif (longVal <= 152.4 and (longVal + 2 * (width + high)) <= 330.2 and weight <= 31780): fee_type = 14 fba_fee = 10.15 elif (longVal <= 274.32 and (longVal + 2 * (width + high)) <= 419.1 and weight <= 68100): fee_type = 15 fba_fee = 19.47 elif (longVal <= 274.32 and (longVal + 2 * (width + high)) > 419.1 and weight <= 68100): fee_type = 16 fba_fee = 90.81 elif (longVal > 274.32 and (longVal + 2 * (width + high)) > 419.1 and weight > 68100): fee_type = 17 fba_fee = 159.32 return (fee_type, fba_fee) def udf_get_package_quantity_with_flag(title): """ 获取打包数量 :param title: :return: """ if title != '': title = str(title).lower() title = title.replace(' ', ' ') eligible_list = [] unit_list = [] parse_list = [] thousand_bit_symmbol_count = 0 eligible_value_map = {"one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10} key_list = ['set', 'pack', 'pair', 'box', 'quantity'] if title not in ['null', 'none']: patterns = [ r'\b(?<!\d\.\d)(?<!\d\sx\s)((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!%)[-_\s]*(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)(?![a-zA-Z])', r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) ((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!\sx\s\d)(?!%)\b', r'\b(total|count|quantity)\s*[-_\s]*\s*((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.)(?!\sx\s\d)\b(?!%)', r'\b(one|two|three|four|five|six|seven|eight|nine|ten)(?: +)(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)(?![a-zA-Z])', r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) (one|two|three|four|five|six|seven|eight|nine|ten)\b', r'\b(total|count|quantity)(?: +)(one|two|three|four|five|six|seven|eight|nine|ten)\b' ] for i in range(len(patterns)): pattern = patterns[i] result_list = re.findall(pattern, title) if len(result_list) > 0: for result in result_list: if i in [0, 3]: eligible_element = result[0] unit_element = result[1] else: eligible_element = result[1] unit_element = result[0] eligible_list.append(eligible_element) unit_list.append(unit_element) if eligible_list and unit_list: for key in key_list: if key in unit_list and f'{key} of' in unit_list: index = unit_list.index(key) unit_list.pop(index) eligible_list.pop(index) for eligible_element in eligible_list: if eligible_element in eligible_value_map.keys(): eligible_element_value = eligible_value_map[eligible_element] parse_list.append(int(eligible_element_value)) else: if str(eligible_element).count(',') > 0: thousand_bit_symmbol_count = thousand_bit_symmbol_count + 1 eligible_element = str(eligible_element).replace(',', '') if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 10000) and ( int(eligible_element) >= 0): parse_list.append(int(eligible_element)) if len(parse_list) == 1: return parse_list[0], 0 elif len(parse_list) > 1: if thousand_bit_symmbol_count >= 2 and len(parse_list) >= 3: return min(parse_list), 1 else: return max(parse_list), 1 else: return None, None def udf_get_package_quantity(title): """ 获取打包数量 :param title: :return: """ if title != '': title = str(title).lower() title = title.replace(' ', ' ') eligible_list = [] unit_list = [] parse_list = [] thousand_bit_symmbol_count = 0 eligible_value_map = {"one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10} key_list = ['set', 'pack', 'pair', 'box', 'quantity'] if title not in ['null', 'none']: patterns = [ r'\b(?<!\d\.\d)(?<!\d\sx\s)((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!%)[-_\s]*(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)(?![a-zA-Z])', r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) ((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.\d)(?!\sx\s\d)(?!%)\b', r'\b(total|count|quantity)\s*[-_\s]*\s*((?:\d{1,3})(?:,\d{3})*|\d+)(?!\.)(?!\sx\s\d)\b(?!%)', r'\b(one|two|three|four|five|six|seven|eight|nine|ten)(?: +)(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)\b', r'\b((?:set|pack|pair|pairs|box|case|carton|quantity) of) (one|two|three|four|five|six|seven|eight|nine|ten)\b', r'\b(total|count|quantity)(?: +)(one|two|three|four|five|six|seven|eight|nine|ten)\b' ] for i in range(len(patterns)): pattern = patterns[i] result_list = re.findall(pattern, title) if len(result_list) > 0: for result in result_list: if i in [0, 3]: eligible_element = result[0] unit_element = result[1] else: eligible_element = result[1] unit_element = result[0] eligible_list.append(eligible_element) unit_list.append(unit_element) if eligible_list and unit_list: for key in key_list: if key in unit_list and f'{key} of' in unit_list: index = unit_list.index(key) unit_list.pop(index) eligible_list.pop(index) for eligible_element in eligible_list: if eligible_element in eligible_value_map.keys(): eligible_element_value = eligible_value_map[eligible_element] parse_list.append(int(eligible_element_value)) else: if str(eligible_element).count(',') > 0: thousand_bit_symmbol_count = thousand_bit_symmbol_count + 1 eligible_element = str(eligible_element).replace(',', '') if (not str(eligible_element).startswith('0')) and (int(eligible_element) < 10000) and ( int(eligible_element) >= 0): parse_list.append(int(eligible_element)) if len(parse_list) == 1: return parse_list[0] elif len(parse_list) > 1: if thousand_bit_symmbol_count >= 2 and len(parse_list) >= 3: return min(parse_list) else: return max(parse_list) else: return None # 公用函数-处理String类型空值返回NoneType def udf_handle_string_null_value(value): # 转小写并去除头尾空值 if value is not None: handle_value = str(value).strip().lower() if handle_value in ['null', 'none', '', '-1']: return None else: return value return None def parse_best_sellers_href(href: str): """ 根据asin想去 best_sellers 解析获取一级分类/当前分类 :param href: :return: """ arr = href.split("/") last_val = UdfUtil.safeIndex(arr, len(arr) - 1, None) if "ref=" in last_val: category_id = UdfUtil.safeIndex(arr, len(arr) - 2, None) category_first_id = UdfUtil.safeIndex(arr, len(arr) - 3, None) else: category_id = UdfUtil.safeIndex(arr, len(arr) - 1, None) category_first_id = UdfUtil.safeIndex(arr, len(arr) - 2, None) return (category_id, category_first_id) pass def parse_bsr_url(nodes_num: int, url: str): """ 统一解析链接获取 bsr 分类id等数据 :param nodes_num: :param url: :return: """ arr = url.split("/") if not "ref=" in url: ref_suffix = None category_id = UdfUtil.safeIndex(arr, len(arr) - 1, None) category_first_id = UdfUtil.safeIndex(arr, len(arr) - 2, None) else: ref_suffix = UdfUtil.safeIndex(arr, len(arr) - 1, None) category_first_id = UdfUtil.safeIndex(arr, len(arr) - 3, None) category_id = UdfUtil.safeIndex(arr, len(arr) - 2, None) if nodes_num == 1: level = 1 elif url.endswith("_0"): level = 2 elif url.endswith(f"{category_first_id}_1"): level = 3 else: level = 4 # 获取 parent id if level == 1: # 根节点 category_id = "0" category_first_id = None category_parent_id = None elif level == 2: # 一级节点 category_id = category_id category_first_id = category_id category_parent_id = "0" elif level == 3: # 一级节点下的次级节点 category_id = category_id category_first_id = category_first_id category_parent_id = category_first_id elif level == 4: category_id = category_id category_first_id = category_first_id if ref_suffix is not None: category_parent_id = ref_suffix[ref_suffix.rfind("_") + 1:] else: category_parent_id = None else: category_id = None category_first_id = None category_parent_id = None return { "category_id": category_id, "category_first_id": category_first_id, "category_parent_id": category_parent_id } pass def parse_weight_str(weight_str: str, site_name: str): """ 解析重量字符串获取重量和单位,逗号分隔 :param weight_str: :param site_name: :return: """ val = None weight_type = 'pounds' if site_name == 'us' else 'grams' if weight_str is not None: if 'pounds' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str) val = round(float(match.group(1)), 3) if site_name == 'us' and match else round( float(match.group(1)) * 1000 * 0.454, 3) if match else None elif 'ounces' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str) val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round( float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else None elif any(substring in weight_str for substring in ['kilogram', ' kg']): weight_str = weight_str.replace(' kg', ' kilogram') match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str) val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)) * 1000, 3) if match else None elif any(substring in weight_str for substring in ['milligrams']): match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str) val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)) / 1000, 3) if match else None elif ' gram' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str) val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)), 3) if match else None elif ' g' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str) val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)), 3) if match else None return val, weight_type def udf_new_asin_flag(launch_time, cal_day): """ 计算asin是否新品标记公共udf函数 :param launch_time: asin上架时间 :param cal_day: 计算日期(周取周最后一天,月取月最后一天,需调用工具类get_calDay_by_dateInfo获取) :return: 是否新品标记,1为新品;0为非新品 """ if launch_time is None or cal_day is None: return None date_format = "%Y-%m-%d" try: # 将日期字符串转换为 datetime 对象 datetime1 = datetime.strptime(launch_time, date_format) datetime2 = datetime.strptime(cal_day, date_format) # 计算日期的偏移量 offset = (datetime2 - datetime1).days # 判断偏移量是否小于180天,是则确定为新品 if offset <= 180: return 1 else: return 0 except ValueError: # 日期字符串格式不正确 return None def category_craw_flag(category_first_id, asin: str = None): """ 用于判断asin或者分类是否爬取 :param category_first_id: :param asin: """ if asin is not None and not asin.startswith("B0"): return False arr = [ "audible", "books", "digital-text", "dmusic", "mobile-apps", "movies-tv", "music", "software", "videogames" ] if category_first_id is None or category_first_id in arr: return False return True def sort_volume(val1, val2, val3): """ 排序长宽高 """ def custom_sort(item): if item is None: return 0 return item arr = [val1, val2, val3] arr.sort(key=custom_sort, reverse=True) l = UdfUtil.safeIndex(arr, 0, None) w = UdfUtil.safeIndex(arr, 1, None) h = UdfUtil.safeIndex(arr, 2, None) return l, w, h def parse_asin_volume_str(volume_str, sortFlag=False): """ 解析 volume_str :param volume_str: 体积长宽高字符串 :param sortFlag: 是否按照大小定义长宽高排序 :return: l, w, h, type 返回长宽高单位(均为原始数据) """ if volume_str is None: return None, None, None, None types = re.findall(r"inches|inch|cm|centímetros|centimetres|milímetros|millimeter|mm|metros", volume_str) # 多个单位的截取第一个单位 if len(types) >= 2: volume_str = volume_str[0:volume_str.find(types[0])] matches = re.findall(r"(\d+(\.\d+)?)", volume_str) values = [float(val[0]) for val in matches] if sortFlag: values.sort(reverse=True) l = UdfUtil.safeIndex(values, 0, None) w = UdfUtil.safeIndex(values, 1, None) h = UdfUtil.safeIndex(values, 2, None) type = UdfUtil.safeIndex(types, 0, None) if type in ['inches', 'inch']: type = "inches" elif type in ['cm', 'centímetros', 'centimetres']: type = "cm" elif type in ['milímetros', 'millimeter', 'mm']: type = "mm" elif type in ['metros']: type = "m" else: type = "inches" sortVal = re.findall(r"l|d|w|h", volume_str) if len(sortVal) > 0: tmpMap = { str(key): UdfUtil.safeIndex(values, i, None) for i, key in enumerate(sortVal) } l = tmpMap.get("l") or tmpMap.get("d") w = tmpMap.get("w") h = tmpMap.get("h") return l, w, h, type def udf_rank_and_category(best_sellers_rank): """ 解析bs分类名称和排名 """ pattern = r"#([\d,]+) in ([\w&' ]+)" matches = re.findall(pattern, best_sellers_rank) bs_rank_str = ",".join([rank.replace(",", "") for rank, category in matches]) bs_category_str = ",".join([category.strip().replace(",", " ") for rank, category in matches]) return bs_rank_str, bs_category_str def udf_ele_mattch(match_text: str, ele_list_str: str): """ 字符串多包含多个元素精准匹配 :param match_text: 待匹配的字符串 :param ele_list_str: 需要匹配的匹配词list(此处可将list直接str(list)传入) :return: 返回字符串中匹配到的多个匹配词的字符串。采用”,“拼接,可根据","拆分;如都无匹配结果则为None """ pattern = re.compile(r'(?<!\+|\*|\-|\%|\.)\b({})\b'.format('|'.join([re.escape(x) for x in ele_list_str])), flags=re.IGNORECASE) ele_list = re.findall(pattern, match_text) if ele_list: return ','.join(set(ele_list)) else: return None # 插件-体积标准提取 def udf_extract_volume_format(volume_str: str): # 解析类型 # pattern = r'\b\w+\b' volume_str = str(volume_str).lower() pattern = r'[a-z]+' matches = re.findall(pattern, volume_str) # 使用集合存储匹配的单词 type_set = set() for word in matches: if word in ['inches', 'inch']: type_set.add('inches') elif word in ['cm', 'centímetros', 'centimetres']: type_set.add('cm') elif word in ['milímetros', 'millimeter', 'mm']: type_set.add('mm') elif word in ['metros']: type_set.add('m') # 根据集合的长度返回结果 if len(type_set) == 1: asin_volume_type = list(type_set)[0] elif len(type_set) >= 2: asin_volume_type = ','.join(type_set) else: asin_volume_type = '' # 解析长宽高 # length, width, height = None, None, None if asin_volume_type == 'cm,inches': num_inches = volume_str.find('inch') num_cm = volume_str.find('cm') volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches] dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str) dimensions = [float(dim[0]) for dim in dimensions] if len(dimensions) == 1: length = dimensions[0] result = f"{length}" elif len(dimensions) == 2: # if asin_volume_type == '': # if "l" in volume_str and "w" in volume_str: # length, width = dimensions # elif "w" in volume_str and "h" in volume_str: # width, height = dimensions # elif "l" in volume_str and "h" in volume_str: # length, height = dimensions # elif "d" in volume_str and "w" in volume_str: # length, width = dimensions # elif "d" in volume_str and "h" in volume_str: # length, height = dimensions # else: # length, width = dimensions length, width = dimensions result = f"{length}*{width}" elif len(dimensions) == 3: length, width, height = dimensions result = f"{length}*{width}*{height}" elif len(dimensions) >= 4: length, width, height = dimensions[:3] result = f"{length}*{width}*{height}" else: result = "" if asin_volume_type == "inches": # 单位转换成cm return "*".join([str(round(float(dim) * 2.54, 2)) for dim in result.split("*")]) + "cm" else: return f"{result}{asin_volume_type}" # 大数据 -- 返回长+宽+高+类型 def udf_extract_volume_dimensions(volume_str: str): # 解析类型 # pattern = r'\b\w+\b' volume_str = str(volume_str).lower() pattern = r'[a-z]+' matches = re.findall(pattern, volume_str) # 使用集合存储匹配的单词 type_set = set() for word in matches: if word in ['inches', 'inch']: type_set.add('inches') elif word in ['cm', 'centímetros', 'centimetres']: type_set.add('cm') elif word in ['milímetros', 'millimeter', 'mm']: type_set.add('mm') elif word in ['metros']: type_set.add('m') # 根据集合的长度返回结果 if len(type_set) == 1: asin_volume_type = list(type_set)[0] elif len(type_set) >= 2: asin_volume_type = ','.join(type_set) else: asin_volume_type = '' # 解析长宽高 length, width, height = None, None, None if asin_volume_type == 'cm,inches': num_inches = volume_str.find('inch') num_cm = volume_str.find('cm') volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches] dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str) dimensions = [float(dim[0]) for dim in dimensions] if len(dimensions) == 1: length = dimensions[0] elif len(dimensions) == 2: if asin_volume_type == '': if "l" in volume_str and "w" in volume_str: length, width = dimensions elif "w" in volume_str and "h" in volume_str: width, height = dimensions elif "l" in volume_str and "h" in volume_str: length, height = dimensions elif "d" in volume_str and "w" in volume_str: length, width = dimensions elif "d" in volume_str and "h" in volume_str: length, height = dimensions else: length, width = dimensions else: length, width = dimensions asin_volume_type = "inches" elif len(dimensions) >= 3: length, width, height = dimensions[:3] asin_volume_type = "inches" # 降序排序 example_list = [length, width, height] # 使用 sorted 函数进行排序 # key=lambda x: (x is not None, x) 确保 None 值被视为最小值并排在最后 # reverse=True 以实现降序排序 sorted_list = sorted(example_list, key=lambda x: (x is not None, x), reverse=True) length, width, height = sorted_list return (length, width, height, asin_volume_type) # 插件-重量标准提取 def udf_extract_weight_format(weight_str: str): """ 解析重量字符串获取重量和单位,逗号分隔 :param weight_str: :param site_name: :return: """ val = None # weight_type = 'pounds' if site_name == 'us' else 'grams' weight_type = 'g' if weight_str is not None: if 'pounds' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str) val = round(float(match.group(1)) * 1000 * 0.454, 3) if match else None elif 'ounces' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str) val = round(float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else None elif any(substring in weight_str for substring in ['kilogram', ' kg']): weight_str = weight_str.replace(' kg', ' kilogram') match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str) val = round(float(match.group(1)) * 1000, 3) if match else None elif any(substring in weight_str for substring in ['milligrams']): match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str) val = round(float(match.group(1)) / 1000, 3) if match else None elif ' gram' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str) val = round(float(match.group(1)), 3) if match else None elif ' g' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str) val = round(float(match.group(1)), 3) if match else None if val: return f"{round(val, 2)}{weight_type}" else: return f"{val}" # 分类提取-返回: 一级/当前分类id+一级/当前分类排名 # 参考dim_asin_bs_info.py使用 def udf_parse_bs_category(asin_bs_sellers_rank_lower, last_herf, all_best_sellers_href, cate_current_pattern, cate_1_pattern): """ asin_bs_sellers_rank_lower: 底部分类字符串 last_herf: 最后一级分类链接 all_best_sellers_href: 所有分类链接 cate_current_pattern: 当前分类排名匹配规则 cate_1_pattern: 一级分类排名匹配规则 """ # if (site_name == 'us' and date_type in ['month', 'month_week'] and date_info >= '2023-11') or (site_name != 'us' and date_type in ['week'] and date_info >= '2023-41'): # href_list = all_best_sellers_href.split("&&&&") # 1. 判断用哪个字段来解析分类 if str(all_best_sellers_href).lower() not in ['', 'none', 'null']: bs_href = all_best_sellers_href elif str(last_herf).lower() not in ['', 'none', 'null']: bs_href = last_herf else: bs_href = '' href_list = bs_href.replace("?tf=1", "").split("&&&&") # 新增climate-pledge分类优化--若最后一级是climate-pledge的分类,则向前取 rank_flag = None while True: if '/climate-pledge' in href_list[-1] and len(href_list) >= 2: href_list.pop() rank_flag = True else: break # 2. 解析一级和当前 分类 + 排名 # 2.1 提取分类 if href_list: if len(href_list) == 1: cate_list = re.findall('bestsellers/(.*)/ref', href_list[0]) if cate_list: if "/" in cate_list[0]: cate_1_id, cate_current_id = cate_list[0].split("/")[0], cate_list[0].split("/")[-1] else: cate_1_id, cate_current_id = cate_list[0].split("/")[0], None else: cate_1_id, cate_current_id = None, None else: cate_1_id = re.findall('bestsellers/(.*)/ref', href_list[0])[0] if re.findall('bestsellers/(.*)/ref', href_list[0]) else None cate_current_id = re.findall('bestsellers/(.*)/ref', href_list[-1])[0] if re.findall('bestsellers/(.*)/ref', href_list[ -1]) else None if "/" in cate_1_id: cate_1_id = cate_1_id.split("/")[0] if "/" in cate_current_id: cate_current_id = cate_current_id.split("/")[-1] else: cate_1_id, cate_current_id = None, None # 2.2 提取排名 if asin_bs_sellers_rank_lower is not None: asin_bs_sellers_rank_lower2 = asin_bs_sellers_rank_lower.replace(".", "").replace(",", "").replace(" 100 ", "") else: asin_bs_sellers_rank_lower2 = '' rank_list = re.findall(cate_current_pattern, asin_bs_sellers_rank_lower2) # 匹配排名 rank_list = [int(rank) for rank in rank_list] # 转换成int类型 # print("rank_list:", rank_list) if rank_flag: if len(rank_list) > len(href_list): rank_list = rank_list[:len(href_list)] if rank_list: if len(rank_list) == 1: if cate_1_pattern in asin_bs_sellers_rank_lower: cate_1_rank, cate_current_rank = rank_list[0], None else: cate_1_rank, cate_current_rank = None, rank_list[0] else: if cate_1_pattern in asin_bs_sellers_rank_lower: cate_1_rank, cate_current_rank = rank_list[0], rank_list[-1] else: cate_1_rank, cate_current_rank = None, rank_list[0] else: cate_1_rank, cate_current_rank = None, None return cate_1_id, cate_current_id, cate_1_rank, cate_current_rank # 将asin转换成1-10亿数值--从而可以划分指定分区表 def udf_asin_to_number(asin): """ Convert a 10-character ASIN string to a unique number. This function assumes that ASIN consists of uppercase letters and digits. """ def char_to_number(char): if char.isdigit(): return int(char) else: return ord(char) - 55 # 'A' -> 10, 'B' -> 11, ..., 'Z' -> 35 if len(asin) != 10: raise ValueError("ASIN must be 10 characters long") base = 36 asin_number = 0 for i, char in enumerate(reversed(asin)): asin_number += char_to_number(char) * (base ** i) # The final number is taken modulo 1 billion to fit the range 1-10 billion return asin_number % 1000000000 # 判断buy_box_seller_type类型 def udf_parse_seller_json(seller_json): """ :param ship_from: 爬虫爬取asin详情页面上的字段信息 :param sold_by: 爬虫爬取asin详情页面上的字段信息 :param fulfilled_by:爬虫爬取asin详情页面上的字段信息 :return: buy_box_seller_type: 1.amazon,2.fba,3.fbm,4.默认值(无类型) :return: 类型、店铺名称、店铺id """ if not seller_json: return 0, None, None else: seller_info_parsed = json.loads(seller_json) ship_from = seller_info_parsed["ship_from"] sold_by = seller_info_parsed["sold_by"] fulfilled_by = seller_info_parsed["fulfilled_by"] seller_id = seller_info_parsed["seller_id"] if (ship_from and ship_from.lower().strip().startswith("amazon")) or ( fulfilled_by and 'amazon' in fulfilled_by.lower()): if sold_by and not sold_by.lower().strip().startswith("amazon"): return 2, sold_by, seller_id # FBA elif sold_by and sold_by.lower().strip().startswith("amazon"): return 1, sold_by, seller_id # Amazon elif (ship_from or fulfilled_by) and sold_by: return 3, sold_by, seller_id # FBM return 4, sold_by, seller_id # Other def udf_parse_amazon_orders(asin_amazon_orders_str): """ :param asin_amazon_orders_str: 示例: '50+ bought in past month' 解析asin详情页面的月销字段, 适配usukde3个站点 """ pattern = "(\d+[k]{0,})\+" results_list = re.findall(pattern, str(asin_amazon_orders_str).lower()) if len(results_list) == 1: result = int(results_list[0].replace("k", "000").replace(" ", "")) else: result = None return result # 解析ABA词的语种 def udf_detect_phrase_reg(lang_word_map): def detect_phrase(phrase: str): # + 号替换为空格用于分词 phrase = re.sub(r'(\+)', ' ', phrase).strip() # 分词 from nltk.tokenize import word_tokenize word_list = list(filter(lambda x: len(x) >= 2, word_tokenize(phrase, "english"))) tmp_map = { "en": {"frequency": 0, "word": []}, "fr": {"frequency": 0, "word": []}, "es": {"frequency": 0, "word": []}, "de": {"frequency": 0, "word": []}, } for word in word_list: lang_rank_map: dict = lang_word_map.get(word) if lang_rank_map is not None: for lang in lang_rank_map.keys(): frequency = lang_rank_map[lang] tmp_map[lang]["frequency"] = tmp_map[lang]["frequency"] + frequency tmp_map[lang]["word"].append(word) pass # 先根据word名称个数倒序后根据分数 lang, hint_word_map = sorted(tmp_map.items(), key=lambda it: (len(it[1]['word']), it[1]['frequency']), reverse=True)[0] if hint_word_map['frequency'] == 0: return {"lang": None, "hint_word": None} else: # 如果en的频率大于0,优先设为en if tmp_map['en']['frequency'] > 0: lang = 'en' hint_word_map = tmp_map['en'] hint_word_list = hint_word_map['word'] hint_word = " ".join(hint_word_list) if len(hint_word) <= 2: return {"lang": None, "hint_word": None} return {"lang": lang, "hint_word": hint_word} pass return F.udf(detect_phrase, MapType(StringType(), StringType()))