""" 1. 热搜词,上升词,新出词,在售商品数等 2. 预估销量 3. bs销量, bs的category_id 4,st_ao_val """ """ author: 方星钧(ffman) description: 基于dwd层等表,计算出search_term和asin维度的基础信息表(包括预估销量) table_read_name: dwd_st_counts系列, dwd_st_info系列, dwd_st_asin_info系列, dwd_asin_bs_info table_save_name: dwt_st_info系列 table_save_level: dwt version: 1.0 created_date: 2022-06-20 updated_date: 2022-06-20 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType class DwtStInfo(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_st_info" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.df_save = self.spark.sql(f"select 1+1;") self.df_st_info = self.spark.sql(f"select 1+1;") self.df_st_counts = self.spark.sql(f"select 1+1;") self.df_st_asin_info = self.spark.sql(f"select 1+1;") self.df_asin_bs_info = self.spark.sql(f"select 1+1;") self.df_asin_detail_info = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(1) if self.date_type in ["week", "4_week"]: self.partitions_type = "dt" elif self.date_type in ["month"]: self.partitions_type = "dm" elif self.date_type in ["quarter"]: self.partitions_type = "dq" self.u_get_asin_top = self.spark.udf.register("u_get_asin_top", self.udf_get_asin_top, StringType()) self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType()) self.current_date = '2022-10-16' print(self.current_date) @staticmethod def udf_year_week(dt): year, week = dt.split("-")[0], dt.split("-")[1] if int(week) < 10: return f"{year}-0{week}" else: return f"{year}-{week}" @staticmethod def udf_get_asin_top(asin1, value1, asin2, value2, asin3, value3, flag): """通过分享转化比大小顺序找到对应的asin顺序,从而找到bs分类id""" if max(value1, value2, value3) == value1: asin_top1 = asin1 if max(value2, value3) == value2: asin_top2 = asin2 asin_top3 = asin3 else: asin_top2 = asin3 asin_top3 = asin2 elif max(value1, value2, value3) == value2: asin_top1 = asin2 if max(value1, value3) == value1: asin_top2 = asin1 asin_top3 = asin3 else: asin_top2 = asin3 asin_top3 = asin1 else: asin_top1 = asin3 if max(value1, value2) == value1: asin_top2 = asin1 asin_top3 = asin2 else: asin_top2 = asin2 asin_top3 = asin1 if flag == 1: return asin_top1 elif flag == 2: return asin_top2 else: return asin_top3 def read_data(self): print("1.1 读取dim_asin_history_info表") sql = f"select asin, asin_bs_cate_current_id, asin_bs_orders, " \ f"asin_launch_time, asin_price as asin1_price, asin_rating as asin1_rating, " \ f"asin_total_comments as asin1_total_comments from dim_asin_history_info " \ f"where site_name='{self.site_name}';" # f"where site_name='{self.site_name}' and dt in '{self.year_week_tuple}'" self.df_asin_bs_info = self.spark.sql(sql).cache() self.df_asin_bs_info.show(10, truncate=False) print("1.2 读取dwd_st_info系列表") sql = f"select * from dwd_st_info " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" self.df_st_info = self.spark.sql(sql).cache() self.df_st_info.show(10, truncate=False) print("1.3 读取dwd_st_counts系列表") sql = f"select search_term, st_ao_val, st_ao_val_rank, st_ao_val_rate, st_zr_counts, st_sp_counts, " \ f"st_sb_counts, st_sb1_counts, st_sb2_counts, st_sb3_counts, st_adv_counts, " \ f"st_ac_counts, st_bs_counts, st_er_counts, st_tr_counts " \ f" from dwd_st_counts " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_counts = self.spark.sql(sql).cache() self.df_st_counts.show(10, truncate=False) print("1.4 读取dwd_st_asin_info系列表") sql = f"select search_term, asin, st_asin_zr_orders as st_asin_orders, st_asin_zr_orders_sum as st_asin_orders_sum from dwd_st_asin_info " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_asin_info = self.spark.sql(sql).cache() self.df_st_asin_info.show(10, truncate=False) def handle_data(self): self.handle_data_st_ao_val() self.handle_data_asin_bs() self.handle_data_st_orders() self.handle_data_st_cate_current_id() self.handle_data_asin_detail() self.df_save = self.df_st_info self.df_save.show(10, truncate=False) # quit() def handle_data_st_ao_val(self): self.df_st_info = self.df_st_info.join( self.df_st_counts, on="search_term", how="left" ) def handle_data_asin_bs(self): """ 1. 对self.df_asin_bs_info对象,选择asin最新一周的数据,并删掉不需要的字段 2. 获取asin """ # self.df_asin_bs_info = self.df_asin_bs_info.withColumn("dt_sort", self.u_year_week("dt")) # # self.df_asin_bs_info.filter("asin='B00E4WOQU0'").show() # 这里没有问题 # window = Window.partitionBy(["asin"]).orderBy( # self.df_asin_bs_info.asin_bs_cate_current_id.asc_nulls_last(), # self.df_asin_bs_info.dt_sort.desc(), # ) # self.df_asin_bs_info = self.df_asin_bs_info.withColumn("dt_rank", F.row_number().over(window=window)) # # select("asin", "asin_bs_cate_current_id", "asin_bs_orders") # self.df_asin_bs_info = self.df_asin_bs_info.filter("dt_rank=1") # self.df_asin_bs_info = self.df_asin_bs_info.drop("dt", "dt_sort", "dt_rank") # 上面是修改后的注释内容 # self.df_asin_bs_info.filter("asin='B00E4WOQU0'").show() # 获取新品的判定 self.df_asin_bs_info = self.df_asin_bs_info.withColumn("current_date", F.lit(self.current_date)) self.df_asin_bs_info = self.df_asin_bs_info.withColumn("days_diff", F.datediff("current_date", "asin_launch_time")) self.df_asin_bs_info = self.df_asin_bs_info.withColumn( "asin_new_flag", F.when( self.df_asin_bs_info.days_diff > 180, 0 ).when( self.df_asin_bs_info.days_diff > 0, 1 ).otherwise(2) ) self.df_asin_bs_info.show(10, truncate=False) def handle_data_st_orders(self): """ 计算关键词维度的st_asin_bs_orders_sum和st_asin_orders_sum """ self.df_st_asin_info = self.df_st_asin_info.join( self.df_asin_bs_info.select("asin", "asin_bs_orders", "asin_new_flag"), on="asin", how="left" ) # df_st_search_sum = self.df_st_asin_info.groupby(['search_term']). \ # agg({"st_search_sum": "max"}) # df_st_search_sum = df_st_search_sum.withColumnRenamed("max(st_search_sum)", "st_search_sum") self.df_st_asin_info = self.df_st_asin_info.withColumnRenamed("asin_bs_orders", "st_asin_bs_orders") df_st_asin_bs_orders_sum = self.df_st_asin_info.groupby(['search_term']). \ agg({"st_asin_bs_orders": "sum"}) df_st_asin_bs_orders_sum = df_st_asin_bs_orders_sum.withColumnRenamed("sum(st_asin_bs_orders)", "st_asin_bs_orders_sum") df_st_asin_orders_sum = self.df_st_asin_info.groupby(['search_term']). \ agg({"st_asin_orders_sum": "max", "asin": "count"}) # df_st_asin_orders_sum.show(10, truncate=False) df_st_asin_orders_sum = df_st_asin_orders_sum.withColumnRenamed("max(st_asin_orders_sum)", "st_asin_orders_sum") df_st_asin_orders_sum = df_st_asin_orders_sum.withColumnRenamed("count(asin)", "st_asin_counts") df_st_asin_new_orders_sum = self.df_st_asin_info.filter("asin_new_flag = 1").groupby(['search_term']). \ agg({"st_asin_orders": "sum", "asin": "count"}) df_st_asin_new_orders_sum = df_st_asin_new_orders_sum.withColumnRenamed("sum(st_asin_orders)", "st_asin_new_orders_sum") df_st_asin_new_orders_sum = df_st_asin_new_orders_sum.withColumnRenamed("count(asin)", "st_asin_new_counts") # df_st_asin_new_orders_sum.show(10, truncate=False) self.df_st_info = self.df_st_info.join( df_st_asin_bs_orders_sum, on="search_term", how="left" ).join( df_st_asin_orders_sum, on="search_term", how="left" ).join( df_st_asin_new_orders_sum, on="search_term", how="left" ) self.df_st_info = self.df_st_info.withColumn("st_asin_new_orders_rate", self.df_st_info.st_asin_new_orders_sum/self.df_st_info.st_asin_orders_sum) self.df_st_info = self.df_st_info.withColumn("st_asin_new_counts_rate", self.df_st_info.st_asin_new_counts/self.df_st_info.st_asin_counts) def handle_data_st_cate_current_id(self): """ 计算关键词维度的bs榜单的当前分类id(关键词通过3个asin,找到bs的当前分类id) """ self.df_st_info = self.df_st_info.withColumn( "st_asin_top1", self.u_get_asin_top( "st_asin1", "st_conversion_share1", "st_asin2", "st_conversion_share2", "st_asin3", "st_conversion_share3", F.lit(1) ) ).withColumn( "st_asin_top2", self.u_get_asin_top( "st_asin1", "st_conversion_share1", "st_asin2", "st_conversion_share2", "st_asin3", "st_conversion_share3", F.lit(2) ) ).withColumn( "st_asin_top3", self.u_get_asin_top( "st_asin1", "st_conversion_share1", "st_asin2", "st_conversion_share2", "st_asin3", "st_conversion_share3", F.lit(3) ) ) # self.df_st_info.show(10, truncate=False) df1 = self.df_st_info.select("search_term", "st_asin_top1").withColumnRenamed("st_asin_top1", "asin").withColumn( "type", F.lit(1)) df2 = self.df_st_info.select("search_term", "st_asin_top2").withColumnRenamed("st_asin_top2", "asin").withColumn( "type", F.lit(2)) df3 = self.df_st_info.select("search_term", "st_asin_top3").withColumnRenamed("st_asin_top3", "asin").withColumn( "type", F.lit(3)) df = df1.unionByName(df2, allowMissingColumns=True).unionByName(df3, allowMissingColumns=True) df = df.join(self.df_asin_bs_info.select("asin", "asin_bs_cate_current_id"), on='asin', how="left") # df.show(10, truncate=False) # df.filter("asin='B00E4WOQU0'").show() window = Window.partitionBy(["search_term"]).orderBy( df.type.asc_nulls_last() ) df = df.withColumn("type_rank", F.row_number().over(window=window)). \ select("search_term", "asin_bs_cate_current_id").filter("type_rank=1") # df.show(10, truncate=False) # df.filter("asin='B00E4WOQU0'").show() self.df_st_info = self.df_st_info.join(df, on="search_term", how="left") self.df_st_info = self.df_st_info.withColumnRenamed("asin_bs_cate_current_id", "st_asin_bs_cate_current_id") def handle_data_asin_detail(self): # self.df_st_info = self.df_st_info.join(self.df_asin_detail_info, on="st_asin1", how="left") self.df_st_info = self.df_st_info.join( self.df_asin_bs_info.select("asin", "asin_bs_orders", "asin1_price", "asin1_rating", "asin1_total_comments").withColumnRenamed("asin", "st_asin1"), on="st_asin1", how="left" ).withColumnRenamed("asin_bs_orders", "st_asin1_bs_orders") # self.df_asin_bs_info.select("asin", "asin_bs_orders").withColumnRenamed("asin","st_asin1"), on = "st_asin1", how = "left" if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtStInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()