""" author: 汪瑞 description: 基于dim_asin_detail表,计算出asin维度下的产品体积相关信息 table_read_name: dim_asin_detail table_save_name: dim_asin_volume table_save_level: dwd version: 3.0 created_date: 2022-05-12 updated_date: 2022-12-15 """ import os import re import sys from pyspark.storagelevel import StorageLevel sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType, MapType, DoubleType class DimAsinVolume(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dim_asin_volume" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.df_save = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.week_counts = 1 if self.date_type == 'week' else len(self.year_week_tuple) self.partitions_by = ['site_name'] self.reset_partitions(10) self.u_parse_asin_volume = self.spark.udf.register('u_parse_asin_volume', self.udf_parse_asin_volume, MapType(StringType(), DoubleType(), True)) @staticmethod def udf_parse_asin_volume(asin_volume): asin_volume = str(asin_volume) pattern_first = r"\d+\.?\d*[\'\"]?[LWHD]?" pattern_sec = r"\d+\.?\d*" asin_length = '' asin_width = '' asin_high = '' asin_element_unit = '' if asin_volume.lower() not in ['null', 'none'] and "A4" not in asin_volume and ( asin_volume.count(" x ") > 0 or asin_volume.count("*") > 0) and not ( asin_volume.count('cm') > 0 and asin_volume.count('inch') > 0): element_list = re.findall(pattern_first, asin_volume) if len(element_list) == 3: is_cm = asin_volume.lower().count('cm') if (("D" in asin_volume or "L" in asin_volume) and "W" in asin_volume and "H" in asin_volume): for element in element_list: if ("D" in element or "L" in element): asin_length_list = re.findall(pattern_sec, element) if len(asin_length_list) > 0: asin_length = asin_length_list[0] elif ("W" in element): asin_width_list = re.findall(pattern_sec, element) if len(asin_width_list) > 0: asin_width = asin_width_list[0] elif ("H" in element): asin_high_list = re.findall(pattern_sec, element) if len(asin_high_list) > 0: asin_high = asin_high_list[0] if (asin_length != '' and asin_width != '' and asin_high != ''): asin_length = round(float(asin_length), 2) asin_width = round(float(asin_width), 2) asin_high = round(float(asin_high), 2) if is_cm > 0: asin_element_unit = 0 else: asin_element_unit = 1 else: asin_length = None asin_width = None asin_high = None else: asin_element1 = element_list[0].strip() asin_element1_list = re.findall(pattern_sec, asin_element1) if len(asin_element1_list) > 0: asin_element1 = asin_element1_list[0] else: asin_element1 = '' asin_element2 = element_list[1].strip() asin_element2_list = re.findall(pattern_sec, asin_element2) if len(asin_element2_list) > 0: asin_element2 = asin_element2_list[0] else: asin_element2 = '' asin_element3 = element_list[-1].strip() asin_element3_list = re.findall(pattern_sec, asin_element3) if len(asin_element3_list) > 0: asin_element3 = asin_element3_list[0] else: asin_element3 = '' if (asin_element1 != '' and asin_element2 != '' and asin_element3 != ''): asin_element1 = round(float(asin_element1), 2) asin_element2 = round(float(asin_element2), 2) asin_element3 = round(float(asin_element3), 2) asin_element_list = [asin_element1, asin_element2, asin_element3] asin_element_list.sort() asin_high = asin_element_list[0] asin_width = asin_element_list[1] asin_length = asin_element_list[-1] if is_cm > 0: asin_element_unit = 0 else: asin_element_unit = 1 else: asin_length = None asin_width = None asin_high = None if asin_element_unit != '': asin_element_unit = round(float(asin_element_unit), 2) return {"asin_length": asin_length, "asin_width": asin_width, "asin_high": asin_high, "asin_element_unit": asin_element_unit} else: return {"asin_length": None, "asin_width": None, "asin_high": None, "asin_element_unit": None} def read_data(self): print("1.读取dim_cal_asin_history_detail表") sql = f"select " \ f"asin, " \ f"asin_volume " \ f"from dim_cal_asin_history_detail where site_name='{self.site_name}'" print("sql:" + sql) self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache() self.df_asin_detail = self.df_asin_detail.na.fill({"asin_volume": "none"}) self.df_asin_detail.show(10, truncate=False) def parse_asin_volume(self): # self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", F.when( # self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_length"].isNotNull(), # self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_length"]).otherwise(F.lit(None))) # self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", F.when( # self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_width"].isNotNull(), # self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_width"]).otherwise(F.lit(None))) # self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", F.when( # self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_high"].isNotNull(), # self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_high"]).otherwise(F.lit(None))) # self.df_save = self.df_asin_detail.drop("asin_volume") try: asin_volume_map = self.u_parse_asin_volume(self.df_asin_detail.asin_volume) self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", asin_volume_map["asin_length"]) self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", asin_volume_map["asin_width"]) self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", asin_volume_map["asin_high"]) self.df_asin_detail = self.df_asin_detail.withColumn("asin_element_unit", F.when(asin_volume_map["asin_element_unit"] == 0, 'cm').when( asin_volume_map["asin_element_unit"] == 1, 'inches').otherwise(F.lit(None))) self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", F.round(F.col("asin_length"), 2)) self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", F.round(F.col("asin_width"), 2)) self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", F.round(F.col("asin_high"), 2)) except Exception as e: print(e) finally: self.df_save = self.df_asin_detail def handle_data(self): self.parse_asin_volume() self.df_save.show(10, truncate=False) self.df_save = self.df_save.withColumn("created_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \ withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null")) self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null")) self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null")) self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0)) self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DimAsinVolume(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()