dim_st_asin_detail.py 16.4 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
"""
   @Author      : HuangJian
   @Description : 关键词与Asin详情维表
   @SourceTable :
                  ①ods_search_term_(zr,sp,sb,ac,bs,er,tr)
                  ②ods_asin_keep_date
                  ③ods_asin_variat
                  ④ods_asin_detail
                  ⑤dwd_bs_category_asin
   @SinkTable   : dim_st_asin_detail
   @CreateTime  : 2022/11/10 9:56
   @UpdateTime  : 2022/11/10 9:56
"""

import os
import sys
import datetime
import traceback
from datetime import date, timedelta

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType


class DimStAsinDetail(Templates):

    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dim_st_asin_detail'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.year_week = self.get_year_week()
        self.year_week_tuple = self.get_last_4_week()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=60)
        self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr']  # 小表拼大表
        self.df_st_asin_info = self.spark.sql(
            f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_st_rank_zr limit 0;")
        self.df_asin_keep_date = self.spark.sql(f"select 1+1;")
        self.df_asin_variat = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_bs_category = self.spark.sql("select 1+1;")

        # 自定义udf函数相关对象
        self.u_launch_time = self.spark.udf.register("u_launch_time", self.udf_launch_time, IntegerType())
        self.u_days_diff = self.spark.udf.register("u_days_diff", self.udf_days_diff, IntegerType())
        self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType())

    @staticmethod
    def udf_page_rank(page, page_1_count, page_2_count, page_row):
        """
        处理 zr, sp 的page_rank字段
        :param page:
        :param page_1_count:
        :param page_2_count:
        :param page_row:
        :return: page_rank
        """
        if page == 1:
            return page_row
        elif page == 2:
            return page_1_count + page_row
        else:
            return page_2_count + page_row

    def handle_data_page_rank(self, df, data_type):
        print(f"{data_type}--page_rank计算")
        u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType())
        # 由于zr,sp存在重复值,改成max,而不是使用count
        df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"})
        df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg(
            {f"page_row": "max"})
        df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count')
        df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old')
        df = df.join(df_page_1, on='search_term', how='left'). \
            join(df_page_2, on='search_term', how='left')
        df = df.fillna(0)
        df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old)
        df = df.withColumn(f"page_rank", u_page_rank(
            df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row']))
        # df.show(n=10, truncate=False)
        return df

    def get_last_4_week(self):
        # 根据当前周获取,最近的四周
        print("调用get_last_4_week,当前年-周:",self.year_week)
        self.df_week = self.spark.sql(f"select * from dim_week_20_to_30;")
        df = self.df_week.toPandas()
        self.year, self.week = int(self.year_week.split("-")[0]), int(self.year_week.split("-")[1])
        df_week = df.loc[df.year_week == self.year_week]
        current_id = list(df_week.id)[0] if list(df_week.id) else None
        id_tuple = (current_id, current_id - 1, current_id - 2, current_id - 3)
        df_4_week = df.loc[df.id.isin(id_tuple)]
        df_4_week = tuple(df_4_week.year_week) if tuple(df_4_week.year_week) else ()
        return df_4_week

    def get_year_week(self):
        # 根据日期获取当前周
        if self.date_type == "day":
            sql = f"select year_week from dim_date_20_to_30 where `date`='{self.date_info}'"
            df = self.spark.sql(sqlQuery=sql).toPandas()
            print(list(df.year_week)[0])
            return list(df.year_week)[0]

    @staticmethod
    def udf_launch_time(launch_time,date_type,date_info):
        # 针对launch_time字段进行计算与当前日期的间隔天数
        if "-" in str(launch_time):
            # print(DwdFeedBack.week_date)
            asin_date_list = str(launch_time).split("-")
            try:
                asin_date = datetime.date(year=int(asin_date_list[0]),
                                          month=int(asin_date_list[1]),
                                          day=int(asin_date_list[2]))
                week_date = '2022-10-01'
                if date_type == 'week':
                    cur_year = str(date_info).split("-")[0]
                    cur_week = str(date_info).split("-")[1]
                    d = date(cur_year, 1, 1)
                    d = d - timedelta(d.weekday())
                    dlt = timedelta(days=(cur_week) * 7)
                    week_date = d + dlt

                if date_type == 'day':
                    week_date=date_info
                cur_date_list = str(week_date).split("-")
                cur_date = datetime.date(year=int(cur_date_list[0]),
                                         month=int(cur_date_list[1]),
                                         day=int(cur_date_list[2]))
                days_diff = (cur_date - asin_date).days
            except Exception as e:
                print(e, traceback.format_exc())
                print(launch_time, asin_date_list)
                days_diff = -2
        else:
            days_diff = -1
        return days_diff

    @staticmethod
    def udf_days_diff(days_diff):
        # 针对days_diff字段进行计算180天,判断是否为新品
        if 0 <= days_diff <= 180:
            return 1
        else:
            return 0

    @staticmethod
    def udf_year_week(dt):
        year, week = dt.split("-")[0], dt.split("-")[1]
        if int(week) < 10:
            return f"{year}-0{week}"
        else:
            return f"{year}-{week}"

    def read_data(self):
        # 通过ods层的ods_search_term_(zr,sp,sb,ac,bs,er,tr) 得到st与asin的映射关系
        for data_type in self.data_type_list:
            print(f"site_name: {self.site_name}, data_type: {data_type}")
            if data_type in ['zr', 'sp']:
                sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
                      f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
                df = self.spark.sql(sqlQuery=sql)
                # 处理page_rank
                df = self.handle_data_page_rank(df=df, data_type=data_type)
                df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old')
            else:
                if data_type in ['sb1', 'sb2', 'sb3']:
                    sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_sb " \
                          f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};"
                else:
                    sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
                          f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
                df = self.spark.sql(sqlQuery=sql)
            # print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}")
            self.df_st_asin_info = self.df_st_asin_info.unionByName(df, allowMissingColumns=True)
        # 补充year_week参数,方便后面取周表去重
        self.df_st_asin_info = self.df_st_asin_info.withColumn("year_week", F.lit(self.year_week))
        print("self.df_st_asin_info", self.df_st_asin_info.show(10, truncate=False))
        # print("self.df_save.count():", self.df_save.count ())

        # 获取ods层的ods_asin_keep_date
        sql = f"select asin, launch_time as keepa_launch_time, site_name from ods_asin_keep_date " \
              f"where state = 3 and site_name='{self.site_name}'"
        self.df_asin_keep_date = self.spark.sql(sqlQuery=sql)
        print("self.df_asin_keep_date", self.df_asin_keep_date.show(10, truncate=False))

        # 获取ods的ods_asin_variat
        sql = f"select asin,color,`size`,style,state as is_sale from dim_asin_variation_info " \
              f"where state is not null and site_name='{self.site_name}'"
        self.df_asin_variat = self.spark.sql(sqlQuery=sql)
        print("self.df_asin_variat", self.df_asin_variat.show(10, truncate=False))

        print("测试打印,self.year_week_tuple:",self.year_week_tuple)
        # 获取ods层的ods_asin_detail,用in方案可以同时取多周,但是要考虑去重问题
        sql = f"select asin,title,title_len,price,rating,total_comments,buy_box_seller_type,page_inventory,category," \
              f"volume,weight,`rank` as asin_rank,launch_time,img_num,img_type,category_state,activity_type,one_two_val," \
              f"three_four_val,five_six_val,eight_val,site_name,dt from ods_asin_detail " \
              f"where site_name='{self.site_name}' and dt in {self.year_week_tuple} ;"
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        print("self.df_asin_detail", self.df_asin_detail.show(10, truncate=False))

        # 读取dwd_bs_category_asin表
        sql = f"select asin, cate_1_id as bsr_cate_1_id, dt from selection_off_line.dwd_bs_category_asin " \
              f"where site='{self.site_name}' and dt= '{self.year_week}';"
        self.df_bs_category = self.spark.sql(sqlQuery=sql)
        print("self.df_bs_category", self.df_bs_category.show(10, truncate=False))


    def handle_data(self):
        # 因为取多周asin_detail,因此需要对asin_detail去重
        self.handle_asin_detail_duplicated()

        # 将处理好的数据(st与Asin映射数据)与asin_detail进行关联
        self.handle_asin_detail_base()

        # 处理判断是否为新品的标签
        self.handle_asin_is_new()

        self.df_asin_detail = self.df_asin_detail.drop("dt").drop("keepa_launch_time").drop("days_diff").drop("site_name")
        self.df_save = self.df_asin_detail.select("search_term", "asin", "page", "page_row", "page_rank",
                                                         "data_type", "title", "title_len", "price", "rating",
                                                         "total_comments", "buy_box_seller_type", "page_inventory",
                                                         "category", "volume", "weight", "color", "`size`", "style",
                                                         "is_sale", "asin_rank", "launch_time", "is_asin_new",
                                                         "img_num", "img_type", "category_state","bsr_cate_1_id", "activity_type",
                                                         "one_two_val", "three_four_val", "five_six_val", "eight_val",
                                                         "created_time", "updated_time")
        # 空值处理
        self.hadnle_empty_column()

        # 分区字段补全
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save.show(10, truncate=False)
        print("self.df_save.columns:",self.df_save.columns)

    # 根据asin去重,取dt最大的asin保留
    def handle_asin_detail_duplicated(self):
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "dt_sort", self.u_year_week(self.df_asin_detail.dt)
        )

        # 窗口内排序,按照dt降序
        window = Window.partitionBy(['asin']).orderBy(
            self.df_asin_detail.title.asc_nulls_last(),
            self.df_asin_detail.dt_sort.desc()
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("sort_top", F.row_number().over(window=window))
        # 取按asin分组的组内第一条,就是去重后的最新asin
        self.df_asin_detail = self.df_asin_detail.filter("sort_top=1")

    def handle_asin_detail_base(self):
        # 将基础属性join进行补全;ps:df_asin_detail、df_bs_category为周爬取,还需考虑如何尽可能补全日数据
        self.df_asin_detail = self.df_st_asin_info. \
            join(self.df_asin_detail, on='asin', how='left'). \
            join(self.df_asin_variat, on='asin', how='left'). \
            join(self.df_bs_category, on='asin', how='left')
        print("df_asin_detail:", self.df_asin_detail.show(10, truncate=False))

        # 根据asin,且launch_time为空的,去找keep_date补全launch_time
        self.df_asin_detail = self.df_asin_detail. \
            join(self.df_asin_keep_date, on='asin',how='left')
        #如果自身的launch_time为null则用keepa_launch_time补全,否则保留自己的launch_time
        print("df_asin_detail join df_asin_keep_date: ", self.df_asin_detail.show(10, truncate=False))
        self.df_asin_detail = self.df_asin_detail.withColumn("launch_time_new",F.when(F.isnull("launch_time"), F.col("keepa_launch_time")))

        #删除旧的launch_time,并将处理后的launch_time_new更名为launch_time
        self.df_asin_detail = self.df_asin_detail.drop("launch_time").withColumnRenamed("launch_time_new","launch_time")

    # 判断是否新上asin处理逻辑
    def handle_asin_is_new(self):
        # 生成days_diff字段为判断is_asin_new做准备
        print("处理days_diff,为判断是否asin_new做准备")
        self.df_asin_detail = self.df_asin_detail.withColumn("days_diff", self.u_launch_time(
            self.df_asin_detail.launch_time, F.lit(self.date_type), F.lit(self.date_info)))
        # 通过dasy_diff走自定义udf,生成is_asin_new字段(是否asin新品标记)
        print("处理is_asin_new标签")
        self.df_asin_detail = self.df_asin_detail.withColumn("is_asin_new", self.u_days_diff(
            self.df_asin_detail.days_diff))
        print("self.df_asin_detail:", self.df_asin_detail.show(10, truncate=False))

    # 空值处理
    def hadnle_empty_column(self):
        # int类型空值处理
        self.df_save = self.df_save.\
            na.fill({"page_row":0,"page_rank":0, "title_len":0,"price":0.0,"rating":0.0,"buy_box_seller_type":0,"page_inventory":0,
                     "weight":0.0,"is_sale":-1,"asin_rank":0,"is_asin_new":-1,"img_num":0,"bsr_cate_1_id":-999999})

        # String类型空值处理
        self.df_save = self.df_save.\
            na.fill({"title":"null","category":"null","volume":"null","color":"null","size":"null","style":"null",
                     "launch_time":"1900-01-01","img_type":"null","activity_type":"null"})

        # 一些需要特殊处理的
        self.df_save.withColumn("color",F.when(F.col("color")=="None",F.lit("null")))
        self.df_save.withColumn("size", F.when(F.col("size") == "None", F.lit("null")))
        self.df_save.withColumn("style", F.when(F.col("style") == "None", F.lit("null")))

if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimStAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()