import os import sys import re import numpy as np sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql import functions as F from pyspark.sql.types import StringType, FloatType, StructType, StructField from pyspark.sql.window import Window class DimAsinWeightInfo(Templates): def __init__(self, site_name='us'): super().__init__() self.site_name = site_name # 初始化self.spark对 self.db_save = 'dim_asin_weight_info' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_asin_weight = self.spark.sql("select 1+1;") self.df_asin_weight_old = self.spark.sql("select 1+1;") self.df_save = self.spark.sql("select 1+1;") schema = StructType([ StructField('weight', FloatType(), True), StructField('weight_type', StringType(), True), ]) self.u_get_weight = F.udf(self.udf_get_weight, schema) # 分区参数 self.partitions_by = ['site_name'] self.partitions_num = 20 # 重量类型: 2023-18之前 self.weight_type = 'pounds' if site_name == 'us' else 'grams' @staticmethod def udf_get_weight(weight_str, site_name): if weight_str is None: # Return some default value or raise an exception. return (None, None) weight_type = 'pounds' if site_name == 'us' else 'grams' if 'pounds' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}pounds", weight_str) val = round(float(match.group(1)), 3) if site_name == 'us' and match else round( float(match.group(1)) * 1000 * 0.454, 3) if match else np.nan elif 'ounces' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}ounces", weight_str) val = round(float(match.group(1)) / 16, 3) if site_name == 'us' and match else round( float(match.group(1)) / 16 * 1000 * 0.454, 3) if match else np.nan # elif 'kilograms' in weight_str or ' kilogramos' in weight_str: elif any(substring in weight_str for substring in ['kilogram', ' kg']): weight_str = weight_str.replace(' kg', ' kilogram') match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}kilogram", weight_str) val = round(float(match.group(1)) / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)) * 1000, 3) if match else np.nan # elif 'milligrams' in weight_str: elif any(substring in weight_str for substring in ['milligrams']): match = re.search(r"(\d+\.{0,}\d{0,})\D{0,}milligrams", weight_str) val = round(float(match.group(1)) / 1000 / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)) / 1000, 3) if match else np.nan elif ' gram' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} gram", weight_str) val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)), 3) if match else np.nan elif ' g' in weight_str: match = re.search(r"(\d+\.{0,}\d{0,})\D{0,} g", weight_str) # val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' else round(float(match.group(1)), 3) if match else np.nan val = round(float(match.group(1)) / 1000 / 0.454, 3) if site_name == 'us' and match else round( float(match.group(1)), 3) if match else np.nan else: val = 'none' weight_type = 'none' # val = val * 1000 * 0.454 if site_name != 'us' and val != 'none' else val weight = val return (weight, weight_type) def read_data(self): # sql = f"select asin, weight, weight_str, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week'" # and date_info>='2023-25' sql = f"select asin, weight, weight_str, site_name, date_info from ods_asin_detail where site_name='{self.site_name}' and date_type='week' and date_info>='2023-18' and weight_str is not null" # and date_info>='2023-25' print("sql:", sql) self.df_asin_weight = self.spark.sql(sqlQuery=sql).cache() self.df_asin_weight.show(10, truncate=False) sql = f"select asin, weight, weight_str, site_name, date_info, '{self.weight_type}' as weight_type from ods_asin_detail where site_name='{self.site_name}' and date_type='week' and date_info<='2023-17' and weight is not null" # and date_info>='2023-25' print("sql:", sql) self.df_asin_weight_old = self.spark.sql(sqlQuery=sql).cache() self.df_asin_weight_old.show(10, truncate=False) def handle_asin_weight(self): print("开始处理重量数据: 2023-18周之后") # 将列类型转为字符串并转为小写 self.df_asin_weight = self.df_asin_weight.withColumn("weight_str", F.lower(F.col("weight_str").cast(StringType()))) # 对weight_str列应用自定义函数get_weight # get_weight_udf = F.udf(lambda x: get_weight(x, self.site_name), StringType()) # self.df_asin_weight = self.df_asin_weight.withColumn("weight_info", get_weight_udf(F.col("weight_str"))) # 分割weight_info列 # split_col = F.split(self.df_asin_weight['weight_info'], ',') # self.df_asin_weight = self.df_asin_weight.withColumn('weight', split_col.getItem(0)) # self.df_asin_weight = self.df_asin_weight.withColumn('weight_type', split_col.getItem(1)) # 提取体积字符串中的weight_info, weight_type self.df_asin_weight = self.df_asin_weight.withColumn('weight_detail', self.u_get_weight('weight_str', 'site_name')) self.df_asin_weight = self.df_asin_weight \ .withColumn('weight', self.df_asin_weight.weight_detail.getField('weight')) \ .withColumn('weight_type', self.df_asin_weight.weight_detail.getField('weight_type')) \ .drop('weight_detail') # 将weight列中的'none'转为null,并转为浮点数类型 self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") == 'none', None).otherwise( F.col("weight").cast(FloatType()))) # weight列中小于等于0.001的值设为0.001 self.df_asin_weight = self.df_asin_weight.withColumn("weight", F.when(F.col("weight") <= 0.001, 0.001).otherwise( F.col("weight"))) # 将weight_str列中的'none'转为null self.df_asin_weight = self.df_asin_weight.withColumn("weight_str", F.when(F.col("weight_str") == 'none', None).otherwise(F.col("weight_str"))) # 移除weight_info列 # self.df_asin_weight = self.df_asin_weight.drop('weight_info') # 添加新列date_info # self.df_asin_weight = self.df_asin_weight.withColumn('date_info', F.lit(f'{self.year}-{self.week}')) # self.df_asin_weight.show(20, truncate=False) def handle_asin_weight_old(self): print("开始处理重量数据: 2023-18周之前") window = Window.partitionBy(['asin']).orderBy(self.df_asin_weight_old.date_info.desc()) self.df_asin_weight_old = self.df_asin_weight_old.withColumn( "row_number", F.row_number().over(window) ) self.df_asin_weight_old = self.df_asin_weight_old.withColumn('row_number', F.row_number().over(window)) # 使用窗口函数为每个分区的行编号 self.df_asin_weight_old = self.df_asin_weight_old.filter(self.df_asin_weight_old.row_number == 1).drop('row_number') # 只保留每个分区中 row_number 最大的行,并删除 row_number 列 # self.df_asin_weight_old.show(20, truncate=False) def handle_data(self): self.handle_asin_weight() self.handle_asin_weight_old() print("self.df_asin_weight.columns:", self.df_asin_weight.columns) print("self.df_asin_weight_old.columns:", self.df_asin_weight_old.columns) self.df_save = self.df_asin_weight.unionByName(self.df_asin_weight_old, allowMissingColumns=True) window = Window.partitionBy(['asin']).orderBy( self.df_save.date_info.desc(), ) self.df_save = self.df_save.withColumn( "row_number", F.row_number().over(window) ) self.df_save = self.df_save.withColumn('row_number', F.row_number().over(window)) # 使用窗口函数为每个分区的行编号 self.df_save = self.df_save.filter(self.df_save.row_number == 1).drop('row_number') # 只保留每个分区中 row_number 最大的行,并删除 row_number 列 self.df_save = self.df_save.withColumn("weight", F.round(self.df_save["weight"], 4)) # self.df_save.show(20, truncate=False) self.df_save = self.df_save.withColumnRenamed( "weight_str", "asin_weight_str" ).withColumnRenamed( "weight", "asin_weight" ).withColumnRenamed( "weight_type", "asin_weight_type" ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 handle_obj = DimAsinWeightInfo(site_name=site_name) handle_obj.run()