dim_st_asin_detail.py 16.4 KB
"""
   @Author      : HuangJian
   @Description : 关键词与Asin详情维表
   @SourceTable :
                  ①ods_search_term_(zr,sp,sb,ac,bs,er,tr)
                  ②ods_asin_keep_date
                  ③ods_asin_variat
                  ④ods_asin_detail
                  ⑤dwd_bs_category_asin
   @SinkTable   : dim_st_asin_detail
   @CreateTime  : 2022/11/10 9:56
   @UpdateTime  : 2022/11/10 9:56
"""

import os
import sys
import datetime
import traceback
from datetime import date, timedelta

sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.types import IntegerType
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, IntegerType


class DimStAsinDetail(Templates):

    def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.db_save = f'dim_st_asin_detail'
        self.spark = self.create_spark_object(
            app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}")
        self.year_week = self.get_year_week()
        self.year_week_tuple = self.get_last_4_week()
        self.df_save = self.spark.sql(f"select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=60)
        self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr']  # 小表拼大表
        self.df_st_asin_info = self.spark.sql(
            f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_st_rank_zr limit 0;")
        self.df_asin_keep_date = self.spark.sql(f"select 1+1;")
        self.df_asin_variat = self.spark.sql(f"select 1+1;")
        self.df_asin_detail = self.spark.sql(f"select 1+1;")
        self.df_bs_category = self.spark.sql("select 1+1;")

        # 自定义udf函数相关对象
        self.u_launch_time = self.spark.udf.register("u_launch_time", self.udf_launch_time, IntegerType())
        self.u_days_diff = self.spark.udf.register("u_days_diff", self.udf_days_diff, IntegerType())
        self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType())

    @staticmethod
    def udf_page_rank(page, page_1_count, page_2_count, page_row):
        """
        处理 zr, sp 的page_rank字段
        :param page:
        :param page_1_count:
        :param page_2_count:
        :param page_row:
        :return: page_rank
        """
        if page == 1:
            return page_row
        elif page == 2:
            return page_1_count + page_row
        else:
            return page_2_count + page_row

    def handle_data_page_rank(self, df, data_type):
        print(f"{data_type}--page_rank计算")
        u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType())
        # 由于zr,sp存在重复值,改成max,而不是使用count
        df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"})
        df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg(
            {f"page_row": "max"})
        df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count')
        df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old')
        df = df.join(df_page_1, on='search_term', how='left'). \
            join(df_page_2, on='search_term', how='left')
        df = df.fillna(0)
        df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old)
        df = df.withColumn(f"page_rank", u_page_rank(
            df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row']))
        # df.show(n=10, truncate=False)
        return df

    def get_last_4_week(self):
        # 根据当前周获取,最近的四周
        print("调用get_last_4_week,当前年-周:",self.year_week)
        self.df_week = self.spark.sql(f"select * from dim_week_20_to_30;")
        df = self.df_week.toPandas()
        self.year, self.week = int(self.year_week.split("-")[0]), int(self.year_week.split("-")[1])
        df_week = df.loc[df.year_week == self.year_week]
        current_id = list(df_week.id)[0] if list(df_week.id) else None
        id_tuple = (current_id, current_id - 1, current_id - 2, current_id - 3)
        df_4_week = df.loc[df.id.isin(id_tuple)]
        df_4_week = tuple(df_4_week.year_week) if tuple(df_4_week.year_week) else ()
        return df_4_week

    def get_year_week(self):
        # 根据日期获取当前周
        if self.date_type == "day":
            sql = f"select year_week from dim_date_20_to_30 where `date`='{self.date_info}'"
            df = self.spark.sql(sqlQuery=sql).toPandas()
            print(list(df.year_week)[0])
            return list(df.year_week)[0]

    @staticmethod
    def udf_launch_time(launch_time,date_type,date_info):
        # 针对launch_time字段进行计算与当前日期的间隔天数
        if "-" in str(launch_time):
            # print(DwdFeedBack.week_date)
            asin_date_list = str(launch_time).split("-")
            try:
                asin_date = datetime.date(year=int(asin_date_list[0]),
                                          month=int(asin_date_list[1]),
                                          day=int(asin_date_list[2]))
                week_date = '2022-10-01'
                if date_type == 'week':
                    cur_year = str(date_info).split("-")[0]
                    cur_week = str(date_info).split("-")[1]
                    d = date(cur_year, 1, 1)
                    d = d - timedelta(d.weekday())
                    dlt = timedelta(days=(cur_week) * 7)
                    week_date = d + dlt

                if date_type == 'day':
                    week_date=date_info
                cur_date_list = str(week_date).split("-")
                cur_date = datetime.date(year=int(cur_date_list[0]),
                                         month=int(cur_date_list[1]),
                                         day=int(cur_date_list[2]))
                days_diff = (cur_date - asin_date).days
            except Exception as e:
                print(e, traceback.format_exc())
                print(launch_time, asin_date_list)
                days_diff = -2
        else:
            days_diff = -1
        return days_diff

    @staticmethod
    def udf_days_diff(days_diff):
        # 针对days_diff字段进行计算180天,判断是否为新品
        if 0 <= days_diff <= 180:
            return 1
        else:
            return 0

    @staticmethod
    def udf_year_week(dt):
        year, week = dt.split("-")[0], dt.split("-")[1]
        if int(week) < 10:
            return f"{year}-0{week}"
        else:
            return f"{year}-{week}"

    def read_data(self):
        # 通过ods层的ods_search_term_(zr,sp,sb,ac,bs,er,tr) 得到st与asin的映射关系
        for data_type in self.data_type_list:
            print(f"site_name: {self.site_name}, data_type: {data_type}")
            if data_type in ['zr', 'sp']:
                sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
                      f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
                df = self.spark.sql(sqlQuery=sql)
                # 处理page_rank
                df = self.handle_data_page_rank(df=df, data_type=data_type)
                df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old')
            else:
                if data_type in ['sb1', 'sb2', 'sb3']:
                    sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_sb " \
                          f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};"
                else:
                    sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \
                          f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';"
                df = self.spark.sql(sqlQuery=sql)
            # print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}")
            self.df_st_asin_info = self.df_st_asin_info.unionByName(df, allowMissingColumns=True)
        # 补充year_week参数,方便后面取周表去重
        self.df_st_asin_info = self.df_st_asin_info.withColumn("year_week", F.lit(self.year_week))
        print("self.df_st_asin_info", self.df_st_asin_info.show(10, truncate=False))
        # print("self.df_save.count():", self.df_save.count ())

        # 获取ods层的ods_asin_keep_date
        sql = f"select asin, launch_time as keepa_launch_time, site_name from ods_asin_keep_date " \
              f"where state = 3 and site_name='{self.site_name}'"
        self.df_asin_keep_date = self.spark.sql(sqlQuery=sql)
        print("self.df_asin_keep_date", self.df_asin_keep_date.show(10, truncate=False))

        # 获取ods的ods_asin_variat
        sql = f"select asin,color,`size`,style,state as is_sale from dim_asin_variation_info " \
              f"where state is not null and site_name='{self.site_name}'"
        self.df_asin_variat = self.spark.sql(sqlQuery=sql)
        print("self.df_asin_variat", self.df_asin_variat.show(10, truncate=False))

        print("测试打印,self.year_week_tuple:",self.year_week_tuple)
        # 获取ods层的ods_asin_detail,用in方案可以同时取多周,但是要考虑去重问题
        sql = f"select asin,title,title_len,price,rating,total_comments,buy_box_seller_type,page_inventory,category," \
              f"volume,weight,`rank` as asin_rank,launch_time,img_num,img_type,category_state,activity_type,one_two_val," \
              f"three_four_val,five_six_val,eight_val,site_name,dt from ods_asin_detail " \
              f"where site_name='{self.site_name}' and dt in {self.year_week_tuple} ;"
        self.df_asin_detail = self.spark.sql(sqlQuery=sql)
        print("self.df_asin_detail", self.df_asin_detail.show(10, truncate=False))

        # 读取dwd_bs_category_asin表
        sql = f"select asin, cate_1_id as bsr_cate_1_id, dt from selection_off_line.dwd_bs_category_asin " \
              f"where site='{self.site_name}' and dt= '{self.year_week}';"
        self.df_bs_category = self.spark.sql(sqlQuery=sql)
        print("self.df_bs_category", self.df_bs_category.show(10, truncate=False))


    def handle_data(self):
        # 因为取多周asin_detail,因此需要对asin_detail去重
        self.handle_asin_detail_duplicated()

        # 将处理好的数据(st与Asin映射数据)与asin_detail进行关联
        self.handle_asin_detail_base()

        # 处理判断是否为新品的标签
        self.handle_asin_is_new()

        self.df_asin_detail = self.df_asin_detail.drop("dt").drop("keepa_launch_time").drop("days_diff").drop("site_name")
        self.df_save = self.df_asin_detail.select("search_term", "asin", "page", "page_row", "page_rank",
                                                         "data_type", "title", "title_len", "price", "rating",
                                                         "total_comments", "buy_box_seller_type", "page_inventory",
                                                         "category", "volume", "weight", "color", "`size`", "style",
                                                         "is_sale", "asin_rank", "launch_time", "is_asin_new",
                                                         "img_num", "img_type", "category_state","bsr_cate_1_id", "activity_type",
                                                         "one_two_val", "three_four_val", "five_six_val", "eight_val",
                                                         "created_time", "updated_time")
        # 空值处理
        self.hadnle_empty_column()

        # 分区字段补全
        self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name))
        self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type))
        self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info))
        self.df_save.show(10, truncate=False)
        print("self.df_save.columns:",self.df_save.columns)

    # 根据asin去重,取dt最大的asin保留
    def handle_asin_detail_duplicated(self):
        self.df_asin_detail = self.df_asin_detail.withColumn(
            "dt_sort", self.u_year_week(self.df_asin_detail.dt)
        )

        # 窗口内排序,按照dt降序
        window = Window.partitionBy(['asin']).orderBy(
            self.df_asin_detail.title.asc_nulls_last(),
            self.df_asin_detail.dt_sort.desc()
        )
        self.df_asin_detail = self.df_asin_detail.withColumn("sort_top", F.row_number().over(window=window))
        # 取按asin分组的组内第一条,就是去重后的最新asin
        self.df_asin_detail = self.df_asin_detail.filter("sort_top=1")

    def handle_asin_detail_base(self):
        # 将基础属性join进行补全;ps:df_asin_detail、df_bs_category为周爬取,还需考虑如何尽可能补全日数据
        self.df_asin_detail = self.df_st_asin_info. \
            join(self.df_asin_detail, on='asin', how='left'). \
            join(self.df_asin_variat, on='asin', how='left'). \
            join(self.df_bs_category, on='asin', how='left')
        print("df_asin_detail:", self.df_asin_detail.show(10, truncate=False))

        # 根据asin,且launch_time为空的,去找keep_date补全launch_time
        self.df_asin_detail = self.df_asin_detail. \
            join(self.df_asin_keep_date, on='asin',how='left')
        #如果自身的launch_time为null则用keepa_launch_time补全,否则保留自己的launch_time
        print("df_asin_detail join df_asin_keep_date: ", self.df_asin_detail.show(10, truncate=False))
        self.df_asin_detail = self.df_asin_detail.withColumn("launch_time_new",F.when(F.isnull("launch_time"), F.col("keepa_launch_time")))

        #删除旧的launch_time,并将处理后的launch_time_new更名为launch_time
        self.df_asin_detail = self.df_asin_detail.drop("launch_time").withColumnRenamed("launch_time_new","launch_time")

    # 判断是否新上asin处理逻辑
    def handle_asin_is_new(self):
        # 生成days_diff字段为判断is_asin_new做准备
        print("处理days_diff,为判断是否asin_new做准备")
        self.df_asin_detail = self.df_asin_detail.withColumn("days_diff", self.u_launch_time(
            self.df_asin_detail.launch_time, F.lit(self.date_type), F.lit(self.date_info)))
        # 通过dasy_diff走自定义udf,生成is_asin_new字段(是否asin新品标记)
        print("处理is_asin_new标签")
        self.df_asin_detail = self.df_asin_detail.withColumn("is_asin_new", self.u_days_diff(
            self.df_asin_detail.days_diff))
        print("self.df_asin_detail:", self.df_asin_detail.show(10, truncate=False))

    # 空值处理
    def hadnle_empty_column(self):
        # int类型空值处理
        self.df_save = self.df_save.\
            na.fill({"page_row":0,"page_rank":0, "title_len":0,"price":0.0,"rating":0.0,"buy_box_seller_type":0,"page_inventory":0,
                     "weight":0.0,"is_sale":-1,"asin_rank":0,"is_asin_new":-1,"img_num":0,"bsr_cate_1_id":-999999})

        # String类型空值处理
        self.df_save = self.df_save.\
            na.fill({"title":"null","category":"null","volume":"null","color":"null","size":"null","style":"null",
                     "launch_time":"1900-01-01","img_type":"null","activity_type":"null"})

        # 一些需要特殊处理的
        self.df_save.withColumn("color",F.when(F.col("color")=="None",F.lit("null")))
        self.df_save.withColumn("size", F.when(F.col("size") == "None", F.lit("null")))
        self.df_save.withColumn("style", F.when(F.col("style") == "None", F.lit("null")))

if __name__ == '__main__':
    site_name = sys.argv[1]  # 参数1:站点
    date_type = sys.argv[2]  # 参数2:类型:week/4_week/month/quarter/day
    date_info = sys.argv[3]  # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1
    handle_obj = DimStAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info)
    handle_obj.run()