dim_asin_volume_info.py 14.1 KB
import os
import re
import sys

import pandas as pd

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates_test import Templates
from pyspark.sql.types import StringType, BooleanType, StructType, StructField, DoubleType
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class DimAsinVolume(Templates):

    def __init__(self, site_name='us'):
        super(DimAsinVolume, self).__init__()
        self.site_name = site_name
        # self.date_type = date_type
        # self.date_info = date_info
        self.db_save = f'dim_asin_volume_info'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
        self.get_date_info_tuple()
        self.df_asin_volume = self.spark.sql(f"select 1+1;")
        self.df_save = self.spark.sql(f"select 1+1;")
        # 注册自定义函数 (UDF)
        self.u_contains_digit_udf = F.udf(self.udf_contains_digit, BooleanType())
        self.u_asin_volume_type = F.udf(self.udf_asin_volume_type, StringType())
        # 定义 UDF 的返回类型,即一个包含三个 DoubleType 字段的 StructType
        schema = StructType([
            StructField('length', DoubleType(), True),
            StructField('width', DoubleType(), True),
            StructField('height', DoubleType(), True)
        ])
        self.u_extract_dimensions = F.udf(self.udf_extract_dimensions, schema)
        self.u_extract_dimensions_others = F.udf(self.udf_extract_dimensions_others, schema)
        # 分区信息
        self.reset_partitions(partitions_num=10)
        self.partitions_by = ['site_name']

    # 定义一个函数,检查字符串中是否包含数字
    @staticmethod
    def udf_contains_digit(s):
        # return any(char.isdigit() for char in s)

        if s is None:
            return False
        return any(char.isdigit() for char in s)

    # 定义一个函数,将asin_volume进行分类
    @staticmethod
    def udf_asin_volume_type_old(x):
        # pattern = r'\b\w+\b'
        pattern = r'[a-z]+'
        matches = re.findall(pattern, x)

        # 使用集合存储匹配的单词
        type_set = set()
        for word in matches:
            if 'inches' == word or 'inch' == word:
                type_set.add('inches')
            elif 'cm' == word:
                type_set.add('cm')

        # 根据集合的长度返回结果
        if len(type_set) == 1:
            return list(type_set)[0]
        elif len(type_set) == 2:
            return ','.join(type_set)
        else:
            return 'none'

    # 定义一个函数,将asin_volume进行分类
    @staticmethod
    def udf_asin_volume_type(x):
        # pattern = r'\b\w+\b'
        pattern = r'[a-z]+'
        matches = re.findall(pattern, x)

        # 使用集合存储匹配的单词
        type_set = set()
        for word in matches:
            if word in ['inches', 'inch']:
                type_set.add('inches')
            elif word in ['cm', 'centímetros', 'centimetres']:
                type_set.add('cm')
            elif word in ['milímetros', 'millimeter', 'mm']:
                type_set.add('mm')
            elif word in ['metros']:
                type_set.add('m')

        # 根据集合的长度返回结果
        if len(type_set) == 1:
            return list(type_set)[0]
        elif len(type_set) >= 2:
            return ','.join(type_set)
        else:
            return 'none'

    @staticmethod
    def udf_extract_dimensions(volume_str, asin_volume_type):
        length, width, height = None, None, None
        dimensions = []
        if asin_volume_type == 'cm,inches':
            num_inches = volume_str.find('inch')
            num_cm = volume_str.find('cm')
            volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
        dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
        dimensions = [float(dim[0]) for dim in dimensions]
        # if asin_volume_type == 'inches':
        #     dimensions = volume_str.split(' x ')
        #     dimensions = [dim.split()[0] for dim in dimensions]
        #     dimensions = [float(dim) if dim.replace('.', '', 1).isdigit() else None for dim in dimensions]
        # else:
        #     if asin_volume_type == 'cm,inches':
        #         # 保留inches
        #         num_inches = volume_str.find('inch')
        #         num_cm = volume_str.find('cm')
        #         volume_str = volume_str[:num_inches] if num_cm > num_inches else volume_str[num_cm:num_inches]
        #
        #     dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
        #     dimensions = [float(dim[0]) for dim in dimensions]

        if len(dimensions) == 1:
            length = dimensions[0]
        elif len(dimensions) == 2:
            if asin_volume_type == 'none':
                if "l" in volume_str and "w" in volume_str:
                    length, width = dimensions
                elif "w" in volume_str and "h" in volume_str:
                    width, height = dimensions
                elif "l" in volume_str and "h" in volume_str:
                    length, height = dimensions
                elif "d" in volume_str and "w" in volume_str:
                    length, width = dimensions
                elif "d" in volume_str and "h" in volume_str:
                    length, height = dimensions
            else:
                length, width = dimensions
        elif len(dimensions) == 3:
            length, width, height = dimensions
        elif len(dimensions) >= 4:
            length, width, height = dimensions[:3]
        return (length, width, height)

    @staticmethod
    def udf_extract_dimensions_others(volume_str, asin_volume_type):
        length, width, height = None, None, None
        if asin_volume_type == 'cm':
            dimensions = re.findall(r"(\d+(\.\d+)?)", volume_str)
            dimensions = [float(dim[0]) for dim in dimensions]
            if len(dimensions) == 1:
                length = dimensions[0]
            elif len(dimensions) == 2:
                length = dimensions[0]
                width = dimensions[1]
            elif len(dimensions) >= 3:
                length, width, height = dimensions[:3]
        return (length, width, height)

    def read_data(self):
        sql = f"select asin, volume as asin_volume, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week'"  #  and date_info>='2023-15'
        # sql = f"-- select asin, volume as asin_volume, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week' and date_info>='2023-15'"  #  and date_info>='2023-15'
        print("sql:", sql)
        self.df_asin_volume = self.spark.sql(sqlQuery=sql).cache()

    def handle_data_us(self):
        self.handle_filter_dirty_data()
        # self.handle_type_inches()
        # self.handle_type_cm()
        df_inches = self.handle_asin_volume_types_to_dimensions(asin_volume_type='inches')
        df_cm = self.handle_asin_volume_types_to_dimensions(asin_volume_type='cm')
        df_cm_inches = self.handle_asin_volume_types_to_dimensions(asin_volume_type='cm,inches')
        df_none = self.handle_asin_volume_types_to_dimensions(asin_volume_type='none')
        df_none_not_null = df_none.filter(~(df_none.length.isNull() & df_none.width.isNull() & df_none.height.isNull()))
        df_none_null = df_none.filter(df_none.length.isNull() & df_none.width.isNull() & df_none.height.isNull())
        df_none_not_null = df_none_not_null.withColumn("asin_volume_type", F.lit("inches"))
        print("df_none_not_null, df_none_null:", df_none_not_null.count(), df_none_null.count())
        # self.df_save = pd.concat([df_inches, df_cm, df_cm_inches, df_none])
        # 假设 df_inches、df_cm、df_cm_inches 和 df_none 都是 PySpark DataFrame
        self.df_save = df_inches.union(df_cm).union(df_cm_inches).union(df_none_not_null).union(df_none_null)
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.drop("asin_volume_flag")
        self.df_save = self.df_save.withColumnRenamed("length", "asin_length"). \
            withColumnRenamed("width", "asin_width"). \
            withColumnRenamed("height", "asin_height")

    def handle_data_others(self):
        self.handle_filter_dirty_data()
        # 提取体积字符串中的length, width, height
        self.df_asin_volume = self.df_asin_volume.withColumn('dimensions', self.u_extract_dimensions_others('asin_volume', 'asin_volume_type'))
        self.df_save = self.df_asin_volume \
            .withColumn('asin_length', self.df_asin_volume.dimensions.getField('length')) \
            .withColumn('asin_width', self.df_asin_volume.dimensions.getField('width')) \
            .withColumn('asin_height', self.df_asin_volume.dimensions.getField('height')) \
            .drop('dimensions')
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.drop("asin_volume_flag")

    def handle_data(self):
        if self.site_name == 'us':
            self.handle_data_us()
        else:
            self.handle_data_others()

    def handle_filter_dirty_data(self):
        # 将 asin_volume 列转换为小写
        self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume", F.lower(self.df_asin_volume["asin_volume"]))
        # 使用自定义函数创建新列 asin_volume_flag
        self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume_flag", self.u_contains_digit_udf(self.df_asin_volume["asin_volume"]))
        # 假设 df 是一个 PySpark DataFrame,asin_volume_flag 是 DataFrame 中的一列
        # self.df_asin_volume.groupBy('asin_volume_flag').agg(F.count('asin_volume_flag')).show()
        # self.df_asin_volume.show()
        self.df_asin_volume = self.df_asin_volume.filter('asin_volume_flag is True')
        # self.df_asin_volume.groupBy('asin_volume_flag').agg(F.count('asin_volume_flag')).show()
        # self.df_asin_volume.show()
        self.df_asin_volume = self.df_asin_volume.withColumn("asin_volume_type", self.u_asin_volume_type(self.df_asin_volume["asin_volume"]))
        self.df_asin_volume.groupBy('asin_volume_type').agg(F.count('asin_volume_type')).show()
        self.df_asin_volume.show()
        # 假设 df 是一个 PySpark DataFrame,date_info 是 DataFrame 中的一列
        window = Window.partitionBy('asin').orderBy(F.desc('date_info'))  # 按照 date_info 列进行分区,并按照 date 列进行排序
        self.df_asin_volume = self.df_asin_volume.withColumn('row_number', F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_asin_volume = self.df_asin_volume.filter(self.df_asin_volume.row_number == 1).drop('row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列
        self.df_asin_volume.groupBy('asin_volume_type').agg(F.count('asin_volume_type')).show()
        self.df_asin_volume.show()

    def handle_asin_volume_types_to_dimensions(self, asin_volume_type='inches'):
        df = self.df_asin_volume.filter(f'asin_volume_type="{asin_volume_type}"').cache()
        # 使用 UDF 提取长宽高并添加新的列
        df = df.withColumn('dimensions', self.u_extract_dimensions('asin_volume', F.lit(asin_volume_type)))
        # 将新的列拆分成三个列并删除 dimensions 列
        df = df \
            .withColumn('length', df.dimensions.getField('length')) \
            .withColumn('width', df.dimensions.getField('width')) \
            .withColumn('height', df.dimensions.getField('height')) \
            .drop('dimensions')
        df.show(10, truncate=False)

        # # 假设 df_asin_none 是一个 PySpark DataFrame,length、width 和 height 是 DataFrame 中的列
        # df_null = df.filter(df.length.isNull() & df.width.isNull() & df.height.isNull())
        # print("asin_volume_type, df_null:", asin_volume_type, df_null.count())
        # df_null.show(50, truncate=False)
        return df

    def handle_type_inches(self):
        df_asin_inches = self.df_asin_volume.filter('asin_volume_type="inches"').cache()
        # 使用 UDF 提取长宽高并添加新的列
        df_asin_inches = df_asin_inches.withColumn('dimensions', self.u_extract_dimensions('asin_volume'))
        # 将新的列拆分成三个列并删除 dimensions 列
        df_asin_inches = df_asin_inches \
            .withColumn('length', df_asin_inches.dimensions.getField('length')) \
            .withColumn('width', df_asin_inches.dimensions.getField('width')) \
            .withColumn('height', df_asin_inches.dimensions.getField('height')) \
            .drop('dimensions')
        df_asin_inches.show()

    def handle_type_cm(self):
        df_asin_cm = self.df_asin_volume.filter('asin_volume_type="cm"').cache()
        # 使用 UDF 提取长宽高并添加新的列
        df_asin_cm = df_asin_cm.withColumn('dimensions', self.u_extract_dimensions('asin_volume'))
        # 将新的列拆分成三个列并删除 dimensions 列
        df_asin_cm = df_asin_cm \
            .withColumn('length', df_asin_cm.dimensions.getField('length')) \
            .withColumn('width', df_asin_cm.dimensions.getField('width')) \
            .withColumn('height', df_asin_cm.dimensions.getField('height')) \
            .drop('dimensions')
        df_asin_cm.show()

    def handle_type_none(self):
        df_asin_cm = self.df_asin_volume.filter('asin_volume_type="none"').cache()
        # 使用 UDF 提取长宽高并添加新的列
        df_asin_cm = df_asin_cm.withColumn('dimensions', self.u_extract_dimensions('asin_volume'))
        # 将新的列拆分成三个列并删除 dimensions 列
        df_asin_cm = df_asin_cm \
            .withColumn('length', df_asin_cm.dimensions.getField('length')) \
            .withColumn('width', df_asin_cm.dimensions.getField('width')) \
            .withColumn('height', df_asin_cm.dimensions.getField('height')) \
            .drop('dimensions')
        df_asin_cm.show()


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DimAsinVolume(site_name=site_name)
    handle_obj.run()