dim_asin_volume.py 10.3 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194
"""
author: 汪瑞
description: 基于dim_asin_detail表,计算出asin维度下的产品体积相关信息
table_read_name: dim_asin_detail
table_save_name: dim_asin_volume
table_save_level: dwd
version: 3.0
created_date: 2022-05-12
updated_date: 2022-12-15
"""

import os
import re
import sys

from pyspark.storagelevel import StorageLevel

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
# from AmazonSpider.pyspark_job.utils.templates import Templates
# 分组排序的udf窗口函数
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType, MapType, DoubleType


class DimAsinVolume(Templates):

    def __init__(self, site_name="us", date_type="week", date_info="2022-1"):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f"dim_asin_volume"
        self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}")
        self.df_date = self.get_year_week_tuple()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.week_counts = 1 if self.date_type == 'week' else len(self.year_week_tuple)
        self.partitions_by = ['site_name']
        self.reset_partitions(10)
        self.u_parse_asin_volume = self.spark.udf.register('u_parse_asin_volume', self.udf_parse_asin_volume,
                                                           MapType(StringType(), DoubleType(), True))

    @staticmethod
    def udf_parse_asin_volume(asin_volume):
        asin_volume = str(asin_volume)
        pattern_first = r"\d+\.?\d*[\'\"]?[LWHD]?"
        pattern_sec = r"\d+\.?\d*"
        asin_length = ''
        asin_width = ''
        asin_high = ''
        asin_element_unit = ''
        if asin_volume.lower() not in ['null', 'none'] and "A4" not in asin_volume and (
                asin_volume.count(" x ") > 0 or asin_volume.count("*") > 0) and not (
                asin_volume.count('cm') > 0 and asin_volume.count('inch') > 0):
            element_list = re.findall(pattern_first, asin_volume)
            if len(element_list) == 3:
                is_cm = asin_volume.lower().count('cm')
                if (("D" in asin_volume or "L" in asin_volume) and "W" in asin_volume and "H" in asin_volume):
                    for element in element_list:
                        if ("D" in element or "L" in element):
                            asin_length_list = re.findall(pattern_sec, element)
                            if len(asin_length_list) > 0:
                                asin_length = asin_length_list[0]
                        elif ("W" in element):
                            asin_width_list = re.findall(pattern_sec, element)
                            if len(asin_width_list) > 0:
                                asin_width = asin_width_list[0]
                        elif ("H" in element):
                            asin_high_list = re.findall(pattern_sec, element)
                            if len(asin_high_list) > 0:
                                asin_high = asin_high_list[0]
                    if (asin_length != '' and asin_width != '' and asin_high != ''):
                        asin_length = round(float(asin_length), 2)
                        asin_width = round(float(asin_width), 2)
                        asin_high = round(float(asin_high), 2)
                        if is_cm > 0:
                            asin_element_unit = 0
                        else:
                            asin_element_unit = 1
                    else:
                        asin_length = None
                        asin_width = None
                        asin_high = None
                else:
                    asin_element1 = element_list[0].strip()
                    asin_element1_list = re.findall(pattern_sec, asin_element1)
                    if len(asin_element1_list) > 0:
                        asin_element1 = asin_element1_list[0]
                    else:
                        asin_element1 = ''
                    asin_element2 = element_list[1].strip()
                    asin_element2_list = re.findall(pattern_sec, asin_element2)
                    if len(asin_element2_list) > 0:
                        asin_element2 = asin_element2_list[0]
                    else:
                        asin_element2 = ''
                    asin_element3 = element_list[-1].strip()
                    asin_element3_list = re.findall(pattern_sec, asin_element3)
                    if len(asin_element3_list) > 0:
                        asin_element3 = asin_element3_list[0]
                    else:
                        asin_element3 = ''
                    if (asin_element1 != '' and asin_element2 != '' and asin_element3 != ''):
                        asin_element1 = round(float(asin_element1), 2)
                        asin_element2 = round(float(asin_element2), 2)
                        asin_element3 = round(float(asin_element3), 2)
                        asin_element_list = [asin_element1, asin_element2, asin_element3]
                        asin_element_list.sort()
                        asin_high = asin_element_list[0]
                        asin_width = asin_element_list[1]
                        asin_length = asin_element_list[-1]
                        if is_cm > 0:
                            asin_element_unit = 0
                        else:
                            asin_element_unit = 1
                    else:
                        asin_length = None
                        asin_width = None
                        asin_high = None
                if asin_element_unit != '':
                    asin_element_unit = round(float(asin_element_unit), 2)
            return {"asin_length": asin_length, "asin_width": asin_width, "asin_high": asin_high,
                    "asin_element_unit": asin_element_unit}
        else:
            return {"asin_length": None, "asin_width": None, "asin_high": None,
                    "asin_element_unit": None}

    def read_data(self):
        print("1.读取dim_cal_asin_history_detail表")
        sql = f"select " \
              f"asin, " \
              f"asin_volume " \
              f"from dim_cal_asin_history_detail where site_name='{self.site_name}'"
        print("sql:" + sql)
        self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache()
        self.df_asin_detail = self.df_asin_detail.na.fill({"asin_volume": "none"})
        self.df_asin_detail.show(10, truncate=False)

    def parse_asin_volume(self):
        # self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", F.when(
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_length"].isNotNull(),
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_length"]).otherwise(F.lit(None)))
        # self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", F.when(
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_width"].isNotNull(),
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_width"]).otherwise(F.lit(None)))
        # self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", F.when(
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_high"].isNotNull(),
        #     self.u_parse_asin_volume(self.df_asin_detail.asin_volume)["asin_high"]).otherwise(F.lit(None)))
        # self.df_save = self.df_asin_detail.drop("asin_volume")
        try:
            asin_volume_map = self.u_parse_asin_volume(self.df_asin_detail.asin_volume)
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_length",
                                                                 asin_volume_map["asin_length"])
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_width",
                                                                 asin_volume_map["asin_width"])
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_high",
                                                                 asin_volume_map["asin_high"])
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_element_unit",
                                                                 F.when(asin_volume_map["asin_element_unit"] == 0,
                                                                        'cm').when(
                                                                     asin_volume_map["asin_element_unit"] == 1,
                                                                     'inches').otherwise(F.lit(None)))
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_length", F.round(F.col("asin_length"), 2))
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_width", F.round(F.col("asin_width"), 2))
            self.df_asin_detail = self.df_asin_detail.withColumn("asin_high", F.round(F.col("asin_high"), 2))
        except Exception as e:
            print(e)
        finally:
            self.df_save = self.df_asin_detail

    def handle_data(self):
        self.parse_asin_volume()
        self.df_save.show(10, truncate=False)
        self.df_save = self.df_save.withColumn("created_time",
                                               F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')). \
            withColumn("updated_time", F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))
        self.df_save = self.df_save.withColumn("re_string_field1", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field2", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_string_field3", F.lit("null"))
        self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0))
        self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0))
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))


if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季, 比如: 2022-1
    handle_obj = DimAsinVolume(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()