import re from pyspark.sql import SparkSession from pyspark.sql import functions as F def udf_get_package_quantity(title): if title != '': title = str(title).lower() title = title.replace(' ', ' ') eligible_list = [] thousand_bit_symmbol_count = 0 eligible_value_map = {"one": 1, "two": 2, "three": 3, "four": 4, "five": 5, "six": 6, "seven": 7, "eight": 8, "nine": 9, "ten": 10} if title not in ['null', 'none']: special_pattern = r'\b(\d+(,\d*){2,})(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)\b' keyword_pattern = r'()*(pc|pcs|piece|pieces|pack|packs|pk|pack of|pairs|pair|pair of|pairs of|count|ct|counts|set of|set|sets|box|boxes|box of|case of|carton of|bulk|total|sheets|sheet|wrap|wraps|roll|rolls)\b' special_mate_list = re.findall(special_pattern, title) keyword_mate_list = re.findall(keyword_pattern, title) if len(special_mate_list) == 1 and len(keyword_mate_list) < 2: mate_elements = str(special_mate_list[0][0]).split(',') for element in mate_elements: if element.isnumeric() and int(element[0]) > 0 and int(element) < 10000: eligible_list.append(int(element)) else: patterns = [ r'\b(\d{1,3}(,\d{3})*|one|two|three|four|five|six|seven|eight|nine|ten)(( )+|(-|s| -|s | s)?)(bulk|total|pc|pcs|piece|pieces|set|pack|packs|pairs|pk|pair|count|ct|counts|sets|sheets|sheet|wrap|wraps|roll|rolls|box|boxes|quantity)\b', r'\b(set|pack|pair|pairs|box|case|carton|quantity) of (\d{1,3}(,\d{3})*|one|two|three|four|five|six|seven|eight|nine|ten)\b', # r'\b([1-9][0-9,]*|one|two|three|four|five|six|seven|eight|nine|ten)-(pc|pack|piece|pcs|pieces|pack|pk|pairs|pair|count|ct|sheets)\b', # r'\b([1-9][0-9,]*|one|two|three|four|five|six|seven|eight|nine|ten)(pcs|pc|piece|pieces|pk|packs|pack|pair|pairs|ct)\b', r'\b(total|count|quantity)(( )+|(-|s| -|s | s|:|\()?)(\d{1,3}(,\d{3})*|one|two|three|four|five|six|seven|eight|nine|ten)\b' ] for i in range(len(patterns)): title_list = re.findall(patterns[i], title) if len(title_list) > 0: for title_element in title_list: if i == 0: eligible_element = title_element[0] elif i == 1: eligible_element = title_element[1] else: eligible_element = title_element[4] if eligible_element in eligible_value_map.keys(): eligible_element_value = eligible_value_map[eligible_element] eligible_list.append(int(eligible_element_value)) else: if str(eligible_element).count(',') > 0: thousand_bit_symmbol_count = thousand_bit_symmbol_count + 1 eligible_element = str(eligible_element).replace(',', '') if (not str(eligible_element).startswith('0')) and ( int(eligible_element) < 10000) and ( int(eligible_element) >= 0): eligible_list.append(int(eligible_element)) if len(eligible_list) > 0: if thousand_bit_symmbol_count >= 2 and len(eligible_list) - thousand_bit_symmbol_count > 0: return min(eligible_list) else: return max(eligible_list) if __name__ == '__main__': title = "Washing Machine Cleaner Descaler 48 Pack - Value Size Deep Cleaning Tablets HE Front Loader & Top Load Washer, Septic Safe Eco-Friendly Deodorizer, Clean Inside Laundry Drum Tub Seal - 48 Count" variat_attribute = "" spark = SparkSession.builder \ .appName("test") \ .getOrCreate() data = [("B0BDP6GN3W", title, variat_attribute)] columns = ["asin", "asin_title", "variat_attribute"] df = spark.createDataFrame([data], columns) df.show() df.withColumn("title_package_quantity", udf_get_package_quantity(df["asin_title"])) df.withColumn("variat_package_quantity", udf_get_package_quantity()) df.withColumn("package_quantity", F.expr(""" CASE WHEN (title_package_quantity is null or title_package_quantity=1) and variat_package_quantity>1 THEN variat_package_quantity WHEN title_package_quantity > 1 THEN variat_package_quantity ELSE 1 END """)) df.show()