dim_asin_weight_info.py 9.46 KB
import os
import sys
import re
import numpy as np

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, FloatType, StructType, StructField
from pyspark.sql.window import Window


class DimAsinWeightInfo(Templates):

    def __init__(self, site_name='us'):
        super().__init__()
        self.site_name = site_name
        # 初始化self.spark对
        self.db_save = 'dim_asin_weight_info'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}")
        self.df_asin_weight = self.spark.sql("select 1+1;")
        self.df_asin_weight_old = self.spark.sql("select 1+1;")
        self.df_save = self.spark.sql("select 1+1;")
        schema = StructType([
            StructField('weight', FloatType(), True),
            StructField('weight_type', StringType(), True),
        ])
        self.u_get_weight = F.udf(self.udf_get_weight, schema)
        # 分区参数
        self.partitions_by = ['site_name']
        self.partitions_num = 20
        # 重量类型: 2023-18之前
        self.weight_type = 'pounds' if site_name == 'us' else 'grams'

    @staticmethod
    def udf_get_weight(weight_str, site_name):
        if weight_str is None:
            # Return some default value or raise an exception.
            return (None, None)
        weight_type = 'pounds' if site_name == 'us' else 'grams'
        if 'pounds' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str)
            val = round(float(match.group(1)), 3) if site_name == 'us' and match else round(
                float(match.group(1)) * 1000 * 0.454, 3) if match else np.nan
        elif 'ounces' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str)
            val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round(
                float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else np.nan
        # elif 'kilograms' in weight_str or ' kilogramos' in weight_str:
        elif any(substring in weight_str for substring in ['kilogram', ' kg']):
            weight_str = weight_str.replace(' kg', ' kilogram')
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str)
            val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)) * 1000, 3) if match else np.nan
        # elif 'milligrams' in weight_str:
        elif any(substring in weight_str for substring in ['milligrams']):
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str)
            val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)) / 1000, 3) if match else np.nan
        elif ' gram' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str)
            val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)), 3) if match else np.nan
        elif ' g' in weight_str:
            match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str)
            # val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' else round(float(match.group(1)), 3) if match else np.nan
            val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round(
                float(match.group(1)), 3) if match else np.nan
        else:
            val = 'none'
            weight_type = 'none'
        # val = val * 1000 * 0.454 if site_name != 'us' and val != 'none' else val
        weight = val
        return (weight, weight_type)

    def read_data(self):
        # sql = f"select asin, weight, weight_str, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week'"  # and date_info>='2023-25'
        sql = f"select asin, weight, weight_str, site_name, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week' and date_info>='2023-18' and weight_str is not null"  # and date_info>='2023-25'
        print("sql:", sql)
        self.df_asin_weight = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_weight.show(10, truncate=False)

        sql = f"select asin, weight, weight_str, site_name, date_info, '{self.weight_type}' as weight_type from ods_asin_detail where site_name='{self.site_name}' and date_type='week' and date_info<='2023-17' and weight is not null"  # and date_info>='2023-25'
        print("sql:", sql)
        self.df_asin_weight_old = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_weight_old.show(10, truncate=False)

    def handle_asin_weight(self):
        print("开始处理重量数据: 2023-18周之后")

        # 将列类型转为字符串并转为小写
        self.df_asin_weight = self.df_asin_weight.withColumn("weight_str",
                                                             F.lower(F.col("weight_str").cast(StringType())))

        # 对weight_str列应用自定义函数get_weight
        # get_weight_udf = F.udf(lambda x: get_weight(x, self.site_name), StringType())
        # self.df_asin_weight = self.df_asin_weight.withColumn("weight_info", get_weight_udf(F.col("weight_str")))

        # 分割weight_info列
        # split_col = F.split(self.df_asin_weight['weight_info'], ',')
        # self.df_asin_weight = self.df_asin_weight.withColumn('weight', split_col.getItem(0))
        # self.df_asin_weight = self.df_asin_weight.withColumn('weight_type', split_col.getItem(1))
        # 提取体积字符串中的weight_info, weight_type
        self.df_asin_weight = self.df_asin_weight.withColumn('weight_detail',
                                                             self.u_get_weight('weight_str', 'site_name'))
        self.df_asin_weight = self.df_asin_weight \
            .withColumn('weight', self.df_asin_weight.weight_detail.getField('weight')) \
            .withColumn('weight_type', self.df_asin_weight.weight_detail.getField('weight_type')) \
            .drop('weight_detail')

        # 将weight列中的'none'转为null,并转为浮点数类型
        self.df_asin_weight = self.df_asin_weight.withColumn("weight",
                                                             F.when(F.col("weight") == 'none', None).otherwise(
                                                                 F.col("weight").cast(FloatType())))

        # weight列中小于等于0.001的值设为0.001
        self.df_asin_weight = self.df_asin_weight.withColumn("weight",
                                                             F.when(F.col("weight") <= 0.001, 0.001).otherwise(
                                                                 F.col("weight")))

        # 将weight_str列中的'none'转为null
        self.df_asin_weight = self.df_asin_weight.withColumn("weight_str", F.when(F.col("weight_str") == 'none', None).otherwise(F.col("weight_str")))

        # 移除weight_info列
        # self.df_asin_weight = self.df_asin_weight.drop('weight_info')

        # 添加新列date_info
        # self.df_asin_weight = self.df_asin_weight.withColumn('date_info', F.lit(f'{self.year}-{self.week}'))
        # self.df_asin_weight.show(20, truncate=False)

    def handle_asin_weight_old(self):
        print("开始处理重量数据: 2023-18周之前")
        window = Window.partitionBy(['asin']).orderBy(self.df_asin_weight_old.date_info.desc())
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn(
            "row_number", F.row_number().over(window)
        )
        self.df_asin_weight_old = self.df_asin_weight_old.withColumn('row_number', F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_asin_weight_old = self.df_asin_weight_old.filter(self.df_asin_weight_old.row_number == 1).drop('row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列
        # self.df_asin_weight_old.show(20, truncate=False)

    def handle_data(self):
        self.handle_asin_weight()
        self.handle_asin_weight_old()
        print("self.df_asin_weight.columns:", self.df_asin_weight.columns)
        print("self.df_asin_weight_old.columns:", self.df_asin_weight_old.columns)
        self.df_save = self.df_asin_weight.unionByName(self.df_asin_weight_old, allowMissingColumns=True)
        window = Window.partitionBy(['asin']).orderBy(
            self.df_save.date_info.desc(),
        )
        self.df_save = self.df_save.withColumn(
            "row_number", F.row_number().over(window)
        )
        self.df_save = self.df_save.withColumn('row_number', F.row_number().over(window))  # 使用窗口函数为每个分区的行编号
        self.df_save = self.df_save.filter(self.df_save.row_number == 1).drop('row_number')  # 只保留每个分区中 row_number 最大的行,并删除 row_number 列

        self.df_save = self.df_save.withColumn("weight", F.round(self.df_save["weight"], 4))
        # self.df_save.show(20, truncate=False)

        self.df_save = self.df_save.withColumnRenamed(
            "weight_str", "asin_weight_str"
        ).withColumnRenamed(
            "weight", "asin_weight"
        ).withColumnRenamed(
            "weight_type", "asin_weight_type"
        )


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    handle_obj = DimAsinWeightInfo(site_name=site_name)
    handle_obj.run()