import os import re import sys import pandas as pd sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, BooleanType, StructType, StructField, DoubleType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class DimAsinVolume(Templates): def __init__(self, site_name='us'): super(DimAsinVolume, self).__init__() self.site_name = site_name # self.date_type = date_type # self.date_info = date_info self.db_save = f'dim_asin_volume_info' self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}") self.get_date_info_tuple() self.df_asin_volume = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") # 注册自定义函数 (UDF) self.u_contains_digit_udf = F.udf(self.udf_contains_digit, BooleanType()) self.u_asin_volume_type = F.udf(self.udf_asin_volume_type, StringType()) # 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType schema = StructType([ StructField('length', DoubleType(), True), StructField('width', DoubleType(), True), StructField('height', DoubleType(), True) ]) self.u_extract_dimensions = F.udf(self.udf_extract_dimensions, schema) self.u_extract_dimensions_others = F.udf(self.udf_extract_dimensions_others, schema) # 分区信息 self.reset_partitions(partitions_num=10) self.partitions_by = ['site_name'] # 定义一个函数,检查字符串中是否包含数字 @staticmethod def udf_contains_digit(s): # return any(char.isdigit() for char in s) if s is None: return False return any(char.isdigit() for char in s) # 定义一个函数,将asin_volume进行分类 @staticmethod def udf_asin_volume_type_old(x): # pattern = r'\b\w+\b' pattern = r'[a-z]+' matches = re.findall(pattern, x) # 使用集合存储匹配的单词 type_set = set() for word in matches: if 'inches' == word or 'inch' == word: type_set.add('inches') elif 'cm' == word: type_set.add('cm') # 根据集合的长度返回结果 if len(type_set) == 1: return list(type_set)[0] elif len(type_set) == 2: return ','.join(type_set) else: return 'none' # 定义一个函数,将asin_volume进行分类 @staticmethod def udf_asin_volume_type(x): # pattern = r'\b\w+\b' pattern = r'[a-z]+' matches = re.findall(pattern, x) # 使用集合存储匹配的单词 type_set = set() for word in matches: if word in ['inches', 'inch']: type_set.add('inches') elif word in ['cm', 'centímetros', 'centimetres']: type_set.add('cm') elif word in ['milímetros', 'millimeter', 'mm']: type_set.add('mm') elif word in ['metros']: type_set.add('m') # 根据集合的长度返回结果 if len(type_set) == 1: return list(type_set)[0] elif len(type_set) >= 2: return ','.join(type_set) else: return 'none' @staticmethod def udf_extract_dimensions(volume_str, asin_volume_type): length, width, height = None, None, None dimensions = [] if asin_volume_type == 'cm,inches': num_inches = volume_str.find('inch') num_cm = volume_str.find('cm') volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches] dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str) dimensions = [float(dim[0]) for dim in dimensions] # if asin_volume_type == 'inches': # dimensions = volume_str.split(' x ') # dimensions = [dim.split()[0] for dim in dimensions] # dimensions = [float(dim) if dim.replace('.', '', 1).isdigit() else None for dim in dimensions] # else: # if asin_volume_type == 'cm,inches': # # 保留inches # num_inches = volume_str.find('inch') # num_cm = volume_str.find('cm') # volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches] # # dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str) # dimensions = [float(dim[0]) for dim in dimensions] if len(dimensions) == 1: length = dimensions[0] elif len(dimensions) == 2: if asin_volume_type == 'none': if "l" in volume_str and "w" in volume_str: length, width = dimensions elif "w" in volume_str and "h" in volume_str: width, height = dimensions elif "l" in volume_str and "h" in volume_str: length, height = dimensions elif "d" in volume_str and "w" in volume_str: length, width = dimensions elif "d" in volume_str and "h" in volume_str: length, height = dimensions else: length, width = dimensions elif len(dimensions) == 3: length, width, height = dimensions elif len(dimensions) >= 4: length, width, height = dimensions[:3] return (length, width, height) @staticmethod def udf_extract_dimensions_others(volume_str, asin_volume_type): length, width, height = None, None, None if asin_volume_type == 'cm': dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str) dimensions = [float(dim[0]) for dim in dimensions] if len(dimensions) == 1: length = dimensions[0] elif len(dimensions) == 2: length = dimensions[0] width = dimensions[1] elif len(dimensions) >= 3: length, width, height = dimensions[:3] return (length, width, height) def read_data(self): sql = f"select asin, volume as asin_volume, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week'" # and date_info>='2023-15' # sql = f"-- select asin, volume as asin_volume, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week' and date_info>='2023-15'" # and date_info>='2023-15' print("sql:", sql) self.df_asin_volume = self.spark.sql(sqlQuery=sql).cache() def handle_data_us(self): self.handle_filter_dirty_data() # self.handle_type_inches() # self.handle_type_cm() df_inches = self.handle_asin_volume_types_to_dimensions(asin_volume_type='inches') df_cm = self.handle_asin_volume_types_to_dimensions(asin_volume_type='cm') df_cm_inches = self.handle_asin_volume_types_to_dimensions(asin_volume_type='cm,inches') df_none = self.handle_asin_volume_types_to_dimensions(asin_volume_type='none') df_none_not_null = df_none.filter(~(df_none.length.isNull() & df_none.width.isNull() & df_none.height.isNull())) df_none_null = df_none.filter(df_none.length.isNull() & df_none.width.isNull() & df_none.height.isNull()) df_none_not_null = df_none_not_null.withColumn("asin_volume_type", F.lit("inches")) print("df_none_not_null, df_none_null:", df_none_not_null.count(), df_none_null.count()) # self.df_save = pd.concat([df_inches, df_cm, df_cm_inches, df_none]) # 假设 df_inches、df_cm、df_cm_inches 和 df_none 都是 PySpark DataFrame self.df_save = df_inches.union(df_cm).union(df_cm_inches).union(df_none_not_null).union(df_none_null) self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.drop("asin_volume_flag") self.df_save = self.df_save.withColumnRenamed("length", "asin_length"). \ withColumnRenamed("width", "asin_width"). \ withColumnRenamed("height", "asin_height") def handle_data_others(self): self.handle_filter_dirty_data() # 提取体积字符串中的length, width, height self.df_asin_volume = self.df_asin_volume.withColumn('dimensions', self.u_extract_dimensions_others('asin_volume', 'asin_volume_type')) self.df_save = self.df_asin_volume \ .withColumn('asin_length', self.df_asin_volume.dimensions.getField('length')) \ .withColumn('asin_width', self.df_asin_volume.dimensions.getField('width')) \ .withColumn('asin_height', self.df_asin_volume.dimensions.getField('height')) \ .drop('dimensions') self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.drop("asin_volume_flag") def handle_data(self): if self.site_name == 'us': self.handle_data_us() else: self.handle_data_others() def handle_filter_dirty_data(self): # 将 asin_volume 列转换为小写 self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume", F.lower(self.df_asin_volume["asin_volume"])) # 使用自定义函数创建新列 asin_volume_flag self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume_flag", self.u_contains_digit_udf(self.df_asin_volume["asin_volume"])) # 假设 df 是一个 PySpark DataFrame,asin_volume_flag 是 DataFrame 中的一列 # self.df_asin_volume.groupBy('asin_volume_flag').agg(F.count('asin_volume_flag')).show() # self.df_asin_volume.show() self.df_asin_volume = self.df_asin_volume.filter('asin_volume_flag is True') # self.df_asin_volume.groupBy('asin_volume_flag').agg(F.count('asin_volume_flag')).show() # self.df_asin_volume.show() self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume_type", self.u_asin_volume_type(self.df_asin_volume["asin_volume"])) self.df_asin_volume.groupBy('asin_volume_type').agg(F.count('asin_volume_type')).show() self.df_asin_volume.show() # 假设 df 是一个 PySpark DataFrame,date_info 是 DataFrame 中的一列 window = Window.partitionBy('asin').orderBy(F.desc('date_info')) # 按照 date_info 列进行分区,并按照 date 列进行排序 self.df_asin_volume = self.df_asin_volume.withColumn('row_number', F.row_number().over(window)) # 使用窗口函数为每个分区的行编号 self.df_asin_volume = self.df_asin_volume.filter(self.df_asin_volume.row_number == 1).drop('row_number') # 只保留每个分区中 row_number 最大的行,并删除 row_number 列 self.df_asin_volume.groupBy('asin_volume_type').agg(F.count('asin_volume_type')).show() self.df_asin_volume.show() def handle_asin_volume_types_to_dimensions(self, asin_volume_type='inches'): df = self.df_asin_volume.filter(f'asin_volume_type="{asin_volume_type}"').cache() # 使用 UDF 提取长宽高并添加新的列 df = df.withColumn('dimensions', self.u_extract_dimensions('asin_volume', F.lit(asin_volume_type))) # 将新的列拆分成三个列并删除 dimensions 列 df = df \ .withColumn('length', df.dimensions.getField('length')) \ .withColumn('width', df.dimensions.getField('width')) \ .withColumn('height', df.dimensions.getField('height')) \ .drop('dimensions') df.show(10, truncate=False) # # 假设 df_asin_none 是一个 PySpark DataFrame,length、width 和 height 是 DataFrame 中的列 # df_null = df.filter(df.length.isNull() & df.width.isNull() & df.height.isNull()) # print("asin_volume_type, df_null:", asin_volume_type, df_null.count()) # df_null.show(50, truncate=False) return df def handle_type_inches(self): df_asin_inches = self.df_asin_volume.filter('asin_volume_type="inches"').cache() # 使用 UDF 提取长宽高并添加新的列 df_asin_inches = df_asin_inches.withColumn('dimensions', self.u_extract_dimensions('asin_volume')) # 将新的列拆分成三个列并删除 dimensions 列 df_asin_inches = df_asin_inches \ .withColumn('length', df_asin_inches.dimensions.getField('length')) \ .withColumn('width', df_asin_inches.dimensions.getField('width')) \ .withColumn('height', df_asin_inches.dimensions.getField('height')) \ .drop('dimensions') df_asin_inches.show() def handle_type_cm(self): df_asin_cm = self.df_asin_volume.filter('asin_volume_type="cm"').cache() # 使用 UDF 提取长宽高并添加新的列 df_asin_cm = df_asin_cm.withColumn('dimensions', self.u_extract_dimensions('asin_volume')) # 将新的列拆分成三个列并删除 dimensions 列 df_asin_cm = df_asin_cm \ .withColumn('length', df_asin_cm.dimensions.getField('length')) \ .withColumn('width', df_asin_cm.dimensions.getField('width')) \ .withColumn('height', df_asin_cm.dimensions.getField('height')) \ .drop('dimensions') df_asin_cm.show() def handle_type_none(self): df_asin_cm = self.df_asin_volume.filter('asin_volume_type="none"').cache() # 使用 UDF 提取长宽高并添加新的列 df_asin_cm = df_asin_cm.withColumn('dimensions', self.u_extract_dimensions('asin_volume')) # 将新的列拆分成三个列并删除 dimensions 列 df_asin_cm = df_asin_cm \ .withColumn('length', df_asin_cm.dimensions.getField('length')) \ .withColumn('width', df_asin_cm.dimensions.getField('width')) \ .withColumn('height', df_asin_cm.dimensions.getField('height')) \ .drop('dimensions') df_asin_cm.show() if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 handle_obj = DimAsinVolume(site_name=site_name) handle_obj.run()