""" @Author : HuangJian @Description : ABA—关键词与Asin-Asin维度预聚合中间表 @SourceTable : ①dwd_st_asin_info ②dim_st_asin_info ③dim_asin_detail ④dim_seller_asin_info ⑤ @SinkTable : dws_aba_st_analytics_day @CreateTime : 2022/11/21 15:56 @UpdateTime : 2022/11/21 15:56 """ import os import sys from datetime import date, timedelta import re from functools import reduce sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from pyspark.sql.types import IntegerType from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType, DoubleType class DwsAbaStAnalyticsDay(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dws_aba_st_analytics_day" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.year_week = self.get_year_week() # 写入、分区初始化 self.df_save = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=10) # 初始化全局df self.df_aba_st_analytics = self.spark.sql(f"select 1+1;") self.df_st_asin_info = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_seller_asin_info = self.spark.sql(f"select 1+1;") self.df_st_asin_join = self.spark.sql(f"select 1+1;") self.df_st_asin_orders_info = self.spark.sql(f"select 1+1;") # 注册自定义udf函数 self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType()) self.u_get_volume = self.spark.udf.register('u_get_volume', self.udf_get_volume, DoubleType()) self.u_get_image_type = self.spark.udf.register('u_get_image_type', self.udf_get_image_type, IntegerType()) self.u_title_contains = self.spark.udf.register('u_title_contains', self.udf_title_contains, IntegerType()) def get_year_week(self): # 根据日期获取当前周 if self.date_type == "day": sql = f"select year_week from dim_date_20_to_30 where `date`='{self.date_info}'" df = self.spark.sql(sqlQuery=sql).toPandas() # print(list(df.year_week)[0]) return list(df.year_week)[0] @staticmethod def udf_year_week(dt): year, week = dt.split("-")[0], dt.split("-")[1] if int(week) < 10: return f"{year}-0{week}" else: return f"{year}-{week}" @staticmethod def udf_get_volume(volume): # print("get_volume", volume) volume = str(volume) if volume == "null": return 0.0 else: pattern = r"\d+\.?\d*" volumeList = re.findall(pattern, volume) if len(volumeList): volumeList = list(map(float, volumeList)) result = reduce((lambda x, y: x * y), volumeList) return result else: return 0.0 @staticmethod def udf_get_image_type(type_flag, image_type): str_type = str(image_type) type_flag = str(type_flag) if type_flag in str_type: return 1 else: return 0 @staticmethod def udf_title_contains(search_term, title): if str(search_term).lower() in str(title).lower(): return 1 else: return 0 def read_data(self): # 1. 获取st 销量相关dwd表 sql = f"select search_term, asin, 2 as st_asin_zr_orders, 10 as st_asin_bs_orders from dwd_st_asin_info " \ f" where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' " \ f"and st_asin_zr_page is not null" self.df_st_asin_orders_info = self.spark.sql(sqlQuery=sql).cache() # 2. 获取dwd_st_asin_info表 sql = f"select search_term, asin,page,page_row,page_rank,data_type from dim_st_asin_info " \ f" where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'" self.df_st_asin_info = self.spark.sql(sqlQuery=sql).cache() # print("self.df_st_asin_info", self.df_st_asin_info.show(10, truncate=False)) # 3. 获取asin详情基表dim_asin_detail sql = f"select asin,asin_title,asin_title_len,asin_total_comments,asin_price,asin_rating,asin_buy_box_seller_type," \ f"asin_volume,asin_weight,asin_img_type,asin_brand_name,asin_is_new,asin_is_sale,asin_rank from dim_asin_detail " \ f" where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'" self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache() # 4. 获取商家相关信息dim_seller_asin_info sql = f" select asin,account_name,country_name from dim_seller_asin_info " \ f" where site_name = '{self.site_name}' and date_type = 'week' and date_info = '{self.year_week}' " self.df_seller_asin_info = self.spark.sql(sqlQuery=sql) self.df_seller_asin_info = self.df_seller_asin_info.drop_duplicates(['asin']).cache() # print("self.df_seller_asin_info", self.df_seller_asin_info.show(10, truncate=False)) def handle_data(self): # 初始化表join,获取目标合并表 self.handle_st_asin_join() # 将asin相关得原子指标聚合得到派生指标 self.handle_asin_agg() # 处理销量、预估销量原子指标聚合 self.handle_st_orders_agg() # 数据入库前的字段处理 self.handle_column() print(self.df_save.columns) def handle_st_asin_join(self): self.df_st_asin_info = self.df_st_asin_info.drop_duplicates(['search_term', 'asin']) self.df_st_asin_join = self.df_st_asin_info.join( self.df_asin_detail, on='asin', how='left' ).join( self.df_seller_asin_info, on='asin', how='left' ).join( self.df_st_asin_orders_info, on=['search_term', 'asin'], how='left' ) self.df_st_asin_join.cache() # print("df_st_asin_join", self.df_st_asin_join.show(10, truncate=False)) def handle_asin_agg(self): # 加载需要参与计算的字段 df_st_asin_agg = self.df_st_asin_join.select( "search_term", "asin", "page", "asin_title", "asin_title_len", "asin_total_comments", "asin_price", "asin_rating", "asin_volume", "asin_weight", "asin_buy_box_seller_type", "asin_img_type", "country_name", "asin_is_new", "asin_is_sale" ) # 为计算指标打上统计标签,方便统计 df_st_asin_agg = self.handle_asin_agg_flag(df_st_asin_agg) # 多列聚合-并更名 df_st_asin_agg = self.handle_asin_group_agg(df_st_asin_agg) # 品牌,卖家数量统计 df_st_asin_agg = self.handle_asin_agg_brand_seller(df_st_asin_agg) self.df_aba_st_analytics = df_st_asin_agg # print("df_aba_st_analytics", self.df_aba_st_analytics.show(10, truncate=False)) def handle_st_orders_agg(self): # 获取计算销量所需要使用到的字段 df_st_orders_data = self.df_st_asin_join.select( "search_term", "asin", "asin_is_new", "asin_brand_name", "account_name", "st_asin_zr_orders", "st_asin_bs_orders" ).cache() # 总销量 + 预估销量 df_asin_bs_orders = df_st_orders_data.groupby(['search_term']).agg( F.sum("st_asin_bs_orders").alias("bsr_orders")) df_asin_zr_orders = df_st_orders_data.groupby(['search_term']).agg(F.sum("st_asin_zr_orders").alias("orders")) # 新品bs销量 + 预估销量 df_asin_new_bs_orders = df_st_orders_data.filter("asin_is_new=1").groupby(['search_term']).agg( F.sum("st_asin_bs_orders").alias("new_asin_bsr_orders")) df_asin_new_zr_orders = df_st_orders_data.filter("asin_is_new=1").groupby(['search_term']).agg( F.sum("st_asin_zr_orders").alias("new_asin_orders")) # 品牌销量 + 预估销量 df_asin_brand_bs_orders = df_st_orders_data.filter( "asin_brand_name != 'null' or asin_brand_name is not null").groupby( ['search_term', 'asin_brand_name']).agg(F.sum("st_asin_bs_orders").alias("asin_brand_bs_orders")) df_asin_brand_zr_orders = df_st_orders_data.filter( "asin_brand_name != 'null' or asin_brand_name is not null").groupby( ['search_term', 'asin_brand_name']).agg(F.sum("st_asin_zr_orders").alias("asin_brand_zr_orders")) # 品牌top3 bs销量+预估销量 window = Window.partitionBy(["search_term"]).orderBy( df_asin_brand_bs_orders.asin_brand_bs_orders.asc_nulls_last() ) df_asin_brand_bs_orders = df_asin_brand_bs_orders.withColumn("brand_rank", F.row_number().over(window=window)) df_asin_brand_bs_orders = df_asin_brand_bs_orders.filter("brand_rank<=3") df_top3_brand_bs_total = df_asin_brand_bs_orders.groupby(["search_term"]).agg( F.sum("asin_brand_bs_orders").alias("top3_brand_bsr_orders")) window = Window.partitionBy(["search_term"]).orderBy( df_asin_brand_zr_orders.asin_brand_zr_orders.asc_nulls_last() ) df_asin_brand_zr_orders = df_asin_brand_zr_orders.withColumn("brand_rank", F.row_number().over(window=window)) df_asin_brand_zr_orders = df_asin_brand_zr_orders.filter("brand_rank<=3") df_top3_brand_zr_total = df_asin_brand_zr_orders.groupby(["search_term"]).agg( F.sum("asin_brand_zr_orders").alias("top3_brand_orders")) # 卖家销量 + 预估销量 df_asin_seller_bs_orders = df_st_orders_data.groupby(['search_term', 'account_name']).agg( F.sum("st_asin_bs_orders").alias("asin_seller_bs_orders")) df_asin_seller_zr_orders = df_st_orders_data.groupby(['search_term', 'account_name']).agg( F.sum("st_asin_zr_orders").alias("asin_seller_zr_orders")) # 卖家top3 bs销量+预估销量 window = Window.partitionBy(["search_term"]).orderBy( df_asin_seller_bs_orders.asin_seller_bs_orders.asc_nulls_last() ) df_asin_seller_bs_orders = df_asin_seller_bs_orders.withColumn("seller_rank", F.row_number().over(window=window)) df_asin_seller_bs_orders = df_asin_seller_bs_orders.filter("seller_rank<=3") df_top3_seller_bs_total = df_asin_seller_bs_orders.groupby(["search_term"]).agg( F.sum("asin_seller_bs_orders").alias("top3_seller_bsr_orders")) window = Window.partitionBy(["search_term"]).orderBy( df_asin_seller_zr_orders.asin_seller_zr_orders.asc_nulls_last() ) df_asin_seller_zr_orders = df_asin_seller_zr_orders.withColumn("seller_rank", F.row_number().over(window=window)) df_asin_seller_zr_orders = df_asin_seller_zr_orders.filter("seller_rank<=3") df_top3_seller_zr_total = df_asin_seller_zr_orders.groupby(["search_term"]).agg( F.sum("asin_seller_zr_orders").alias("top3_seller_orders")) # 关联销量各项指标 self.df_aba_st_analytics = self.df_aba_st_analytics. \ join(df_asin_bs_orders, on=['search_term'], how='left'). \ join(df_asin_zr_orders, on=['search_term'], how='left'). \ join(df_asin_new_bs_orders, on=['search_term'], how='left'). \ join(df_asin_new_zr_orders, on=['search_term'], how='left'). \ join(df_top3_brand_bs_total, on=['search_term'], how='left'). \ join(df_top3_brand_zr_total, on=['search_term'], how='left'). \ join(df_top3_seller_bs_total, on=['search_term'], how='left'). \ join(df_top3_seller_zr_total, on=['search_term'], how='left') def handle_column(self): # 测试用字段填充 self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("st_num", F.lit(1)) self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("quantity_being_sold", F.lit(999)) self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("search_volume", F.lit(999)) self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("st_adv_num", F.lit(999)) self.df_aba_st_analytics = self.df_aba_st_analytics.withColumn("st_zr_num", F.lit(999)) # 列选择 self.df_save = self.df_aba_st_analytics.select( "search_term", "st_num", "orders", "bsr_orders", "search_volume", "quantity_being_sold", "new_asin_num", "total_asin_num", "new_asin_orders", "new_asin_bsr_orders", "st_adv_num", "st_zr_num", "title_st_one_num", "title_page_one_total", "asin_price_total", "having_price_num", "asin_comment_total", "having_comment_num", "asin_rating_total", "having_rating_num", "asin_weight_total", "having_weight_num", "asin_volume_total", "having_volume_num", "asin_title_len_total", "having_title_num", "is_A_num", "is_video_num", "is_FBM_num", "is_CN_num", "is_Amazon_num", "asin_brand_num", "asin_account_num", "top3_seller_orders", "top3_seller_bsr_orders", "top3_brand_orders", "top3_brand_bsr_orders" ) # 空值处理 self.df_save = self.df_save.na.fill( { "search_term": 0, "st_num": 0, "orders": 0, "bsr_orders": 0, "search_volume": 0, "quantity_being_sold": 0, "new_asin_num": 0, "total_asin_num": 0, "new_asin_orders": 0, "new_asin_bsr_orders": 0, "st_adv_num": 0, "st_zr_num": 0, "title_st_one_num": 0, "title_page_one_total": 0, "asin_price_total": 0, "having_price_num": 0, "asin_comment_total": 0, "having_comment_num": 0, "asin_rating_total": 0, "having_rating_num": 0, "asin_weight_total": 0, "having_weight_num": 0, "asin_volume_total": 0, "having_volume_num": 0, "asin_title_len_total": 0, "having_title_num": 0, "is_A_num": 0, "is_video_num": 0, "is_FBM_num": 0, "is_CN_num": 0, "is_Amazon_num": 0, "asin_brand_num": 0, "asin_account_num": 0, "top3_seller_orders": 0, "top3_seller_bsr_orders": 0, "top3_brand_orders": 0, "top3_brand_bsr_orders": 0 }) # 预留字段补全 self.df_save = self.df_save.withColumn("re_double_field1", F.lit(0.0)) self.df_save = self.df_save.withColumn("re_double_field2", F.lit(0.0)) self.df_save = self.df_save.withColumn("re_double_field3", F.lit(0.0)) self.df_save = self.df_save.withColumn("re_int_field1", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field2", F.lit(0)) self.df_save = self.df_save.withColumn("re_int_field3", F.lit(0)) # 分区字段补全 self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) def handle_asin_agg_flag(self, df_st_asin_agg): # 为统计指标打上标签方便计算 # asin标题在关键词第一页数量 df_st_asin_agg = df_st_asin_agg.withColumn("title_st_one_flag", F.when(F.col("page") == 1, self.u_title_contains( F.col("search_term"), F.col("asin_title"))). otherwise(F.lit(0))) # A+产品标签 df_st_asin_agg = df_st_asin_agg.withColumn("is_A_flag", self.u_get_image_type(F.lit('3'), F.col("asin_img_type"))) # 视频产品标签 df_st_asin_agg = df_st_asin_agg.withColumn("is_video_flag", self.u_get_image_type(F.lit('2'), F.col("asin_img_type"))) # FBM产品标签 df_st_asin_agg = df_st_asin_agg.withColumn("is_FBM_flag", F.when(F.col("asin_buy_box_seller_type") == 3, F.lit(1)).otherwise( F.lit(0))) # Amazon自营产品标签 df_st_asin_agg = df_st_asin_agg.withColumn("is_Amazon_flag", F.when(F.col("asin_buy_box_seller_type") == 1, F.lit(1)).otherwise( F.lit(0))) # 中国卖家标签 df_st_asin_agg = df_st_asin_agg.withColumn("is_CN_flag", F.when(F.col("country_name") == 'CN', F.lit(1)).otherwise(F.lit(0))) # asin体积从5.12"D x 6.69"W x 1.38"H中提取并计算计算 df_st_asin_agg = df_st_asin_agg.withColumn("asin_volume_val", self.u_get_volume(F.col("asin_volume"))) # 是否新品统计标签 df_st_asin_agg = df_st_asin_agg.withColumn("asin_is_new_flag", F.when(F.col("asin_is_new") == 1, F.lit(1)).otherwise(F.lit(0))) # 第一页标题得统计标签 df_st_asin_agg = df_st_asin_agg.withColumn("title_page_one_flag", F.when((F.col("page") == 1) & (F.col("asin_title_len") > 0.0), F.lit(1)).otherwise(F.lit(0))) # 计算 售价、分数、星级、重量、体积、标题 统计分母(为保证不扩大误差,匹配上以上数据且数值>0的方为计数分母) df_st_asin_agg = df_st_asin_agg.withColumn("price_flag", F.when(F.col("asin_price") > 0.0, F.lit(1)).otherwise(F.lit(0))) df_st_asin_agg = df_st_asin_agg.withColumn("rating_flag", F.when(F.col("asin_rating") > 0.0, F.lit(1)).otherwise(F.lit(0))) df_st_asin_agg = df_st_asin_agg.withColumn("weight_flag", F.when(F.col("asin_weight") > 0.0, F.lit(1)).otherwise(F.lit(0))) df_st_asin_agg = df_st_asin_agg.withColumn("comments_flag", F.when(F.col("asin_total_comments") > 0.0, F.lit(1)).otherwise( F.lit(0))) df_st_asin_agg = df_st_asin_agg.withColumn("volume_flag", F.when(F.col("asin_volume_val") > 0.0, F.lit(1)).otherwise(F.lit(0))) # 统计有asin标题标签 df_st_asin_agg = df_st_asin_agg.withColumn("title_flag", F.when(F.col("asin_title_len") > 0.0, F.lit(1)).otherwise(F.lit(0))) return df_st_asin_agg def handle_asin_group_agg(self, df_st_asin_agg): # 将统计好的flag聚合得到计算聚合总数和数量 df_st_asin_agg = df_st_asin_agg.groupby(['search_term']) \ .agg( F.count("asin").alias("total_asin_num"), F.sum("asin_title_len").alias("asin_title_len_total"), F.sum("asin_total_comments").alias("asin_comment_total"), F.sum("asin_price").alias("asin_price_total"), F.sum("asin_rating").alias("asin_rating_total"), F.sum("asin_volume_val").alias("asin_volume_total"), F.sum("asin_weight").alias("asin_weight_total"), F.sum("is_A_flag").alias("is_A_num"), F.sum("is_video_flag").alias("is_video_num"), F.sum("is_FBM_flag").alias("is_FBM_num"), F.sum("is_Amazon_flag").alias("is_Amazon_num"), F.sum("is_CN_flag").alias("is_CN_num"), F.sum("title_st_one_flag").alias("title_st_one_num"), F.sum("asin_is_new_flag").alias("new_asin_num"), F.sum("title_page_one_flag").alias("title_page_one_total"), F.sum("price_flag").alias("having_price_num"), F.sum("rating_flag").alias("having_rating_num"), F.sum("weight_flag").alias("having_weight_num"), F.sum("comments_flag").alias("having_comment_num"), F.sum("volume_flag").alias("having_volume_num"), F.sum("title_flag").alias("having_title_num") ) return df_st_asin_agg def handle_asin_agg_brand_seller(self, df_st_asin_agg): # 品牌数量 df_brand_count = self.df_st_asin_join.select("search_term", "asin_brand_name") df_brand_count = df_brand_count.filter(" asin_brand_name is not null or asin_brand_name != 'null' ") df_brand_count = df_brand_count.groupby(['search_term']) \ .agg(F.count_distinct("asin_brand_name").alias("asin_brand_num")) # 卖家数量 df_account_count = self.df_st_asin_join.select("search_term", "account_name") df_account_count = df_account_count.filter(" account_name is not null or account_name != 'null' ") df_account_count = df_account_count.groupby(['search_term']) \ .agg(F.count_distinct("account_name").alias("asin_account_num")) # 关联补入到聚合df_st_asin_agg df_st_asin_agg = df_st_asin_agg. \ join(df_brand_count, on=['search_term'], how='left'). \ join(df_account_count, on=['search_term'], how='left') return df_st_asin_agg if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwsAbaStAnalyticsDay(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()