""" @Author : HuangJian @Description : 关键词与Asin详情维表 @SourceTable : ①ods_search_term_(zr,sp,sb,ac,bs,er,tr) ②ods_asin_keep_date ③ods_asin_variat ④ods_asin_detail ⑤dwd_bs_category_asin @SinkTable : dim_st_asin_detail @CreateTime : 2022/11/10 9:56 @UpdateTime : 2022/11/10 9:56 """ import os import sys import datetime import traceback from datetime import date, timedelta sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql.types import IntegerType from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType class DimStAsinDetail(Templates): def __init__(self, site_name='us', date_type="day", date_info='2022-10-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f'dim_st_asin_detail' self.spark = self.create_spark_object( app_name=f"{self.db_save}: {self.site_name},{self.date_type}, {self.date_info}") self.year_week = self.get_year_week() self.year_week_tuple = self.get_last_4_week() self.df_save = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=60) self.data_type_list = ['tr', 'er', 'bs', 'ac', 'sb1', 'sb2', 'sb3', 'sp', 'zr'] # 小表拼大表 self.df_st_asin_info = self.spark.sql( f"select search_term, asin, page, page_row, 'zr' as data_type, updated_time,site_name,date_type,date_info from ods_st_rank_zr limit 0;") self.df_asin_keep_date = self.spark.sql(f"select 1+1;") self.df_asin_variat = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_bs_category = self.spark.sql("select 1+1;") # 自定义udf函数相关对象 self.u_launch_time = self.spark.udf.register("u_launch_time", self.udf_launch_time, IntegerType()) self.u_days_diff = self.spark.udf.register("u_days_diff", self.udf_days_diff, IntegerType()) self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType()) @staticmethod def udf_page_rank(page, page_1_count, page_2_count, page_row): """ 处理 zr, sp 的page_rank字段 :param page: :param page_1_count: :param page_2_count: :param page_row: :return: page_rank """ if page == 1: return page_row elif page == 2: return page_1_count + page_row else: return page_2_count + page_row def handle_data_page_rank(self, df, data_type): print(f"{data_type}--page_rank计算") u_page_rank = self.spark.udf.register('u_page_rank', self.udf_page_rank, IntegerType()) # 由于zr,sp存在重复值,改成max,而不是使用count df_page_1 = df.filter(f"page=1").groupBy(['search_term']).agg({f"page_row": "max"}) df_page_2 = df.filter(df[f'page'] == 2).groupBy(['search_term']).agg( {f"page_row": "max"}) df_page_1 = df_page_1.withColumnRenamed(f'max(page_row)', 'page_1_count') df_page_2 = df_page_2.withColumnRenamed(f'max(page_row)', 'page_2_count_old') df = df.join(df_page_1, on='search_term', how='left'). \ join(df_page_2, on='search_term', how='left') df = df.fillna(0) df = df.withColumn("page_2_count", df.page_1_count + df.page_2_count_old) df = df.withColumn(f"page_rank", u_page_rank( df[f'page'], df.page_1_count, df.page_2_count, df[f'page_row'])) # df.show(n=10, truncate=False) return df def get_last_4_week(self): # 根据当前周获取,最近的四周 print("调用get_last_4_week,当前年-周:",self.year_week) self.df_week = self.spark.sql(f"select * from dim_week_20_to_30;") df = self.df_week.toPandas() self.year, self.week = int(self.year_week.split("-")[0]), int(self.year_week.split("-")[1]) df_week = df.loc[df.year_week == self.year_week] current_id = list(df_week.id)[0] if list(df_week.id) else None id_tuple = (current_id, current_id - 1, current_id - 2, current_id - 3) df_4_week = df.loc[df.id.isin(id_tuple)] df_4_week = tuple(df_4_week.year_week) if tuple(df_4_week.year_week) else () return df_4_week def get_year_week(self): # 根据日期获取当前周 if self.date_type == "day": sql = f"select year_week from dim_date_20_to_30 where `date`='{self.date_info}'" df = self.spark.sql(sqlQuery=sql).toPandas() print(list(df.year_week)[0]) return list(df.year_week)[0] @staticmethod def udf_launch_time(launch_time,date_type,date_info): # 针对launch_time字段进行计算与当前日期的间隔天数 if "-" in str(launch_time): # print(DwdFeedBack.week_date) asin_date_list = str(launch_time).split("-") try: asin_date = datetime.date(year=int(asin_date_list[0]), month=int(asin_date_list[1]), day=int(asin_date_list[2])) week_date = '2022-10-01' if date_type == 'week': cur_year = str(date_info).split("-")[0] cur_week = str(date_info).split("-")[1] d = date(cur_year, 1, 1) d = d - timedelta(d.weekday()) dlt = timedelta(days=(cur_week) * 7) week_date = d + dlt if date_type == 'day': week_date=date_info cur_date_list = str(week_date).split("-") cur_date = datetime.date(year=int(cur_date_list[0]), month=int(cur_date_list[1]), day=int(cur_date_list[2])) days_diff = (cur_date - asin_date).days except Exception as e: print(e, traceback.format_exc()) print(launch_time, asin_date_list) days_diff = -2 else: days_diff = -1 return days_diff @staticmethod def udf_days_diff(days_diff): # 针对days_diff字段进行计算180天,判断是否为新品 if 0 <= days_diff <= 180: return 1 else: return 0 @staticmethod def udf_year_week(dt): year, week = dt.split("-")[0], dt.split("-")[1] if int(week) < 10: return f"{year}-0{week}" else: return f"{year}-{week}" def read_data(self): # 通过ods层的ods_search_term_(zr,sp,sb,ac,bs,er,tr) 得到st与asin的映射关系 for data_type in self.data_type_list: print(f"site_name: {self.site_name}, data_type: {data_type}") if data_type in ['zr', 'sp']: sql = f"select search_term, asin, page, page_row, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" df = self.spark.sql(sqlQuery=sql) # 处理page_rank df = self.handle_data_page_rank(df=df, data_type=data_type) df = df.drop('page_1_count', 'page_2_count', 'page_2_count_old') else: if data_type in ['sb1', 'sb2', 'sb3']: sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_sb " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}' and data_type={int(data_type[-1])};" else: sql = f"select search_term, asin, page, '{data_type}' as data_type,created_time, updated_time, site_name,date_type,date_info from ods_search_term_{data_type} " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}';" df = self.spark.sql(sqlQuery=sql) # print(f"site_name: {self.site_name}, data_type: {data_type}, partitions: {df.rdd.getNumPartitions()}") self.df_st_asin_info = self.df_st_asin_info.unionByName(df, allowMissingColumns=True) # 补充year_week参数,方便后面取周表去重 self.df_st_asin_info = self.df_st_asin_info.withColumn("year_week", F.lit(self.year_week)) print("self.df_st_asin_info", self.df_st_asin_info.show(10, truncate=False)) # print("self.df_save.count():", self.df_save.count ()) # 获取ods层的ods_asin_keep_date sql = f"select asin, launch_time as keepa_launch_time, site_name from ods_asin_keep_date " \ f"where state = 3 and site_name='{self.site_name}'" self.df_asin_keep_date = self.spark.sql(sqlQuery=sql) print("self.df_asin_keep_date", self.df_asin_keep_date.show(10, truncate=False)) # 获取ods的ods_asin_variat sql = f"select asin,color,`size`,style,state as is_sale from dim_asin_variation_info " \ f"where state is not null and site_name='{self.site_name}'" self.df_asin_variat = self.spark.sql(sqlQuery=sql) print("self.df_asin_variat", self.df_asin_variat.show(10, truncate=False)) print("测试打印,self.year_week_tuple:",self.year_week_tuple) # 获取ods层的ods_asin_detail,用in方案可以同时取多周,但是要考虑去重问题 sql = f"select asin,title,title_len,price,rating,total_comments,buy_box_seller_type,page_inventory,category," \ f"volume,weight,`rank` as asin_rank,launch_time,img_num,img_type,category_state,activity_type,one_two_val," \ f"three_four_val,five_six_val,eight_val,site_name,dt from ods_asin_detail " \ f"where site_name='{self.site_name}' and dt in {self.year_week_tuple} ;" self.df_asin_detail = self.spark.sql(sqlQuery=sql) print("self.df_asin_detail", self.df_asin_detail.show(10, truncate=False)) # 读取dwd_bs_category_asin表 sql = f"select asin, cate_1_id as bsr_cate_1_id, dt from selection_off_line.dwd_bs_category_asin " \ f"where site='{self.site_name}' and dt= '{self.year_week}';" self.df_bs_category = self.spark.sql(sqlQuery=sql) print("self.df_bs_category", self.df_bs_category.show(10, truncate=False)) def handle_data(self): # 因为取多周asin_detail,因此需要对asin_detail去重 self.handle_asin_detail_duplicated() # 将处理好的数据(st与Asin映射数据)与asin_detail进行关联 self.handle_asin_detail_base() # 处理判断是否为新品的标签 self.handle_asin_is_new() self.df_asin_detail = self.df_asin_detail.drop("dt").drop("keepa_launch_time").drop("days_diff").drop("site_name") self.df_save = self.df_asin_detail.select("search_term", "asin", "page", "page_row", "page_rank", "data_type", "title", "title_len", "price", "rating", "total_comments", "buy_box_seller_type", "page_inventory", "category", "volume", "weight", "color", "`size`", "style", "is_sale", "asin_rank", "launch_time", "is_asin_new", "img_num", "img_type", "category_state","bsr_cate_1_id", "activity_type", "one_two_val", "three_four_val", "five_six_val", "eight_val", "created_time", "updated_time") # 空值处理 self.hadnle_empty_column() # 分区字段补全 self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) self.df_save.show(10, truncate=False) print("self.df_save.columns:",self.df_save.columns) # 根据asin去重,取dt最大的asin保留 def handle_asin_detail_duplicated(self): self.df_asin_detail = self.df_asin_detail.withColumn( "dt_sort", self.u_year_week(self.df_asin_detail.dt) ) # 窗口内排序,按照dt降序 window = Window.partitionBy(['asin']).orderBy( self.df_asin_detail.title.asc_nulls_last(), self.df_asin_detail.dt_sort.desc() ) self.df_asin_detail = self.df_asin_detail.withColumn("sort_top", F.row_number().over(window=window)) # 取按asin分组的组内第一条,就是去重后的最新asin self.df_asin_detail = self.df_asin_detail.filter("sort_top=1") def handle_asin_detail_base(self): # 将基础属性join进行补全;ps:df_asin_detail、df_bs_category为周爬取,还需考虑如何尽可能补全日数据 self.df_asin_detail = self.df_st_asin_info. \ join(self.df_asin_detail, on='asin', how='left'). \ join(self.df_asin_variat, on='asin', how='left'). \ join(self.df_bs_category, on='asin', how='left') print("df_asin_detail:", self.df_asin_detail.show(10, truncate=False)) # 根据asin,且launch_time为空的,去找keep_date补全launch_time self.df_asin_detail = self.df_asin_detail. \ join(self.df_asin_keep_date, on='asin',how='left') #如果自身的launch_time为null则用keepa_launch_time补全,否则保留自己的launch_time print("df_asin_detail join df_asin_keep_date: ", self.df_asin_detail.show(10, truncate=False)) self.df_asin_detail = self.df_asin_detail.withColumn("launch_time_new",F.when(F.isnull("launch_time"), F.col("keepa_launch_time"))) #删除旧的launch_time,并将处理后的launch_time_new更名为launch_time self.df_asin_detail = self.df_asin_detail.drop("launch_time").withColumnRenamed("launch_time_new","launch_time") # 判断是否新上asin处理逻辑 def handle_asin_is_new(self): # 生成days_diff字段为判断is_asin_new做准备 print("处理days_diff,为判断是否asin_new做准备") self.df_asin_detail = self.df_asin_detail.withColumn("days_diff", self.u_launch_time( self.df_asin_detail.launch_time, F.lit(self.date_type), F.lit(self.date_info))) # 通过dasy_diff走自定义udf,生成is_asin_new字段(是否asin新品标记) print("处理is_asin_new标签") self.df_asin_detail = self.df_asin_detail.withColumn("is_asin_new", self.u_days_diff( self.df_asin_detail.days_diff)) print("self.df_asin_detail:", self.df_asin_detail.show(10, truncate=False)) # 空值处理 def hadnle_empty_column(self): # int类型空值处理 self.df_save = self.df_save.\ na.fill({"page_row":0,"page_rank":0, "title_len":0,"price":0.0,"rating":0.0,"buy_box_seller_type":0,"page_inventory":0, "weight":0.0,"is_sale":-1,"asin_rank":0,"is_asin_new":-1,"img_num":0,"bsr_cate_1_id":-999999}) # String类型空值处理 self.df_save = self.df_save.\ na.fill({"title":"null","category":"null","volume":"null","color":"null","size":"null","style":"null", "launch_time":"1900-01-01","img_type":"null","activity_type":"null"}) # 一些需要特殊处理的 self.df_save.withColumn("color",F.when(F.col("color")=="None",F.lit("null"))) self.df_save.withColumn("size", F.when(F.col("size") == "None", F.lit("null"))) self.df_save.withColumn("style", F.when(F.col("style") == "None", F.lit("null"))) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter/day date_info = sys.argv[3] # 参数3:年-周/年-月/年-季/年-月-日, 比如: 2022-1 handle_obj = DimStAsinDetail(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()