import os import re import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from textblob import Word class DwdStMeasure(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save_st_asin = f'dwd_st_asin_measure' self.db_save_st = f'dwd_st_measure' self.db_save_asin = f'dwd_asin_measure' self.spark = self.create_spark_object( app_name=f"{self.db_save_st_asin}, {self.db_save_st}, {self.db_save_asin}: {self.site_name}, {self.date_type}, {self.date_info}") # self.df_date = self.get_year_week_tuple() # pandas的df对象 self.get_date_info_tuple() self.get_year_week_tuple() self.get_year_month_days_dict(year=int(self.year)) self.orders_transform_rate = self.get_orders_transform_rate() # 获取月销-->日销,周销 self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_st_asin_flow = self.spark.sql(f"select 1+1;") self.df_st = self.spark.sql(f"select 1+1;") self.df_brand_analytics = self.spark.sql(f"select 1+1;") self.df_st_rate = self.spark.sql(f"select 1+1;") self.df_st_quantity = self.spark.sql(f"select 1+1;") self.df_asin_bs = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_bs_report = self.spark.sql(f"select 1+1;") self.df_st_asin_duplicated = self.spark.sql(f"select 1+1;") self.df_save_st_asin = self.spark.sql(f"select 1+1;") self.df_save_asin = self.spark.sql(f"select 1+1;") self.df_save_st = self.spark.sql(f"select 1+1;") self.df_asin_stable = self.spark.sql(f"select 1+1;") self.df_asin_price_weight = self.spark.sql(f"select 1+1;") self.df_st_templates = self.spark.sql("select st_zr_counts, st_sp_counts, st_sb1_counts,st_sb2_counts,st_sb3_counts,st_ac_counts,st_bs_counts,st_er_counts,st_tr_counts from dwd_st_measure limit 0") self.df_asin_templates = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType()) def get_orders_transform_rate(self): month_days = self.year_month_days_dict[int(self.month)] if self.date_type in ['day', 'week']: if self.date_type == 'day': return 1 / month_days if self.date_type == 'week': return 7 / month_days else: return 1 @staticmethod def udf_is_title_appear(search_term, title): english_prepositions = ["aboard", "about", "above", "across", "after", "against", "along", "amid", "among", "around", "as", "at", "before", "behind", "below", "beneath", "beside", "between", "beyond", "but", "by", "concerning", "considering", "despite", "down", "during", "except", "for", "from", "in", "inside", "into", "like", "near", "of", "off", "on", "onto", "out", "outside", "over", "past", "regarding", "round", "since", "through", "to", "toward", "under", "underneath", "until", "unto", "up", "upon", "with", "within", "without"] symbol_list = [',', '。', '?', '!', ':', '?', '!', '-', '%', '|', ';', '·', '…', '~', '&', '@', '#', '、', '…', '~', '&', '@', '#', '“', '”', '‘', '’', '〝', '〞', '"', "'", '"', ''', '´', ''', '(', ')', '【', '】', '《', '》', '<', '>', '﹝', '﹞', '<', '>', '«', '»', '‹', '›', '〔', '〕', '〈', '〉', '{', '}', '[', ']', '「', '」', '{', '}', '〖', '〗', '『', '』', '︵', '︷', '︹', '︿', '︽', '﹁', '﹃', '︻', '︗', '/', '\\', '︶', '︸', '︺', '﹀', '︾', '﹂', '﹄', '︼', '︘', '/', '|', '\', '_', '_', '﹏', '﹍', '﹎', '``', '¦', '¡', '^', '\xad', '¨', 'ˊ', '¯', ' ̄', '﹋', '﹉', '﹊', 'ˋ', '︴', '¿', 'ˇ'] # 小写 search_term = str(search_term).lower() title = f" {str(title).lower()} " # 1. 去掉特殊符号 # search_term = re.sub(r'[,:()]', '', search_term) # 去掉特殊符号 # title = re.sub(r'[,:()]', '', title) # 去掉特殊符号 # for symbol in symbol_list: # if symbol in title: # title = title.replace(symbol, "") # if symbol in search_term: # search_term = search_term.replace(symbol, "") # 改成正则去掉特殊符号 symbols = "".join(symbol_list) # 将列表中的所有字符连接成一个字符串 search_term = re.sub('[' + symbols + ']', '', search_term) # 去掉特殊符号 title = re.sub('[' + symbols + ']', '', title) # 去掉特殊符号 # 2. 去掉介词(关键词去掉就行) st_list = [f" {st} " for st in search_term.split(" ") if st not in english_prepositions] # 去掉介词 # 3. 复数一起匹配 for st in st_list: # if st in symbol_list: # st = st.replace(symbol, "") if st not in title: if Word(st) not in title: return 0 return 1 # 旧版 # if str(search_term).lower() in str(title).lower(): # return 1 # else: # return 0 def read_data(self): print("self.year, self.month:", self.year, self.month) print("1 读取st+asin两个维度: dim_st_asin_info表和ods_rank_flow表") print("1.1 读取dim_st_asin_info表") if self.date_type == 'month_old': # self.get_year_week_tuple() if int(self.month) <= 9 and int(self.year) <= 2022: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='month' and date_info ='{self.date_info}'" else: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" else: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" # else: # if (int(self.year) == 2022 and int(self.month) < 10) or int(self.year) <= 2021: # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" # else: # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin.show(10, truncate=False) # self.df_st_asin.filter("search_term='abiie high chair'").show(100, truncate=False) # quit() # self.df_st_asin = self.df_st_asin.drop_duplicates(["search_term", "asin", "data_type", "date_info"]).cache() # self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin']).cache() # print("self.df_st_asin:", self.df_st_asin.count()) # print("self.df_st_asin_duplicated:", self.df_st_asin_duplicated.count()) # self.df_st_asin.show(10, truncate=False) # self.df_asin = self.df_st_asin.select("asin").drop_duplicates(["asin"]) # self.df_st = self.df_st_asin.select("search_term").drop_duplicates(["search_term"]) print("1.2 读取ods_rank_flow表") sql = f"select rank as page_rank, flow from ods_rank_flow " \ f"where site_name='{self.site_name}'" self.df_st_asin_flow = self.spark.sql(sql).cache() # self.df_st_asin_flow.persist(StorageLevel.MEMORY_ONLY) self.df_st_asin_flow.show(10, truncate=False) print("2 读取st维度: dim_st_detail表和ods_brand_analytics表") print("self.year, self.month:", self.year, self.month) print("2.1 读取dim_st_detail和ods_brand_analytics表") sql = f"select search_term, st_rank, st_search_sum from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';" print("sql:", sql) self.df_st = self.spark.sql(sqlQuery=sql) self.df_st.persist(StorageLevel.MEMORY_ONLY) self.df_st.show(10, truncate=False) # 统计词频 print("2.2 读取ods_brand_analytics表") # sql = f"select search_term, date_info from ods_brand_analytics where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" sql = f"select search_term, date_info from ods_brand_analytics where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" print("sql:", sql) self.df_brand_analytics = self.spark.sql(sqlQuery=sql) self.df_brand_analytics.persist(StorageLevel.MEMORY_ONLY) self.df_brand_analytics.show(10, truncate=False) print("3 读取asin维度: dim_asin_bs_info+dim_asin_detail表") print("3.1 读取dim_asin_bs_info表") sql = f"select asin, asin_bs_cate_1_rank, asin_bs_cate_1_id " \ f"from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';" self.df_asin_bs = self.spark.sql(sql).cache() self.df_asin_bs.show(10) sql = f"select asin, asin_title, asin_price " \ f"from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';" self.df_asin_detail = self.spark.sql(sql).cache() self.df_asin_detail.show(10) print("4 读取bsr维度: ods_one_category_report表") print("4.1 读取ods_one_category_report表") if int(self.year) == 2022 and int(self.month) < 3: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders*{self.orders_transform_rate}) as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';" else: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders*{self.orders_transform_rate}) as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';" print("sql:", sql) self.df_bs_report = self.spark.sql(sqlQuery=sql) self.df_bs_report.persist(StorageLevel.MEMORY_ONLY) self.df_bs_report.show(10, truncate=False) print("5 读取asin维度-体积信息: dim_asin_volume_info表") sql = f"select asin, asin_length * asin_width * asin_height as asin_volume, asin_weight from dim_asin_stable_info where site_name='{self.site_name}'" print("sql:", sql) self.df_asin_stable = self.spark.sql(sqlQuery=sql).cache() self.df_asin_stable.show(10, truncate=False) def save_data(self): self.reset_partitions(partitions_num=50) self.save_data_common( df_save=self.df_save_st_asin, db_save=self.db_save_st_asin, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=5) self.save_data_common( df_save=self.df_save_st, db_save=self.db_save_st, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=10) self.save_data_common( df_save=self.df_save_asin, db_save=self.db_save_asin, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) def handle_data(self): self.handle_join() self.df_save_asin = self.handle_st_asin_counts(cal_type="asin", df_templates=self.df_asin_templates) self.df_save_st = self.handle_st_asin_counts(cal_type="st", df_templates=self.df_st_templates) self.handle_st_zr_page1_title_rate() self.handle_st_asin_orders() # 预估销量和bsr销量 self.handle_st_asin_ao() self.handle_st_num() self.handle_st_weight_price_volume() # self.df_save_st.filter("search_term='abiie high chair'").show(10, truncate=False) # self.df_save_st_asin.filter("search_term='abiie high chair'").show(100, truncate=False) # self.df_save_st_asin.show(10, truncate=False) # self.df_save_st.show(10, truncate=False) del self.df_st_asin_duplicated del self.df_st_asin # self.df_save_asin.show(10, truncate=False) # quit() def handle_st_attributes(self, df, attributes_type='asin_volume'): # 定义窗口函数 window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}")) # 计算百分比排名并筛选 <= 0.25 的记录 df = df.select("search_term", f"{attributes_type}").filter(f'{attributes_type} is not null') \ .withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \ .filter(f'{attributes_type}_percent_rank <= 0.25') \ # 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录 window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank")) df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \ .filter(f'{attributes_type}_row_number = 1') # 显示结果 df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number") df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent") df.show(10, truncate=False) return df def handle_st_weight_price_volume(self): # self.df_st_asin_duplicated = self.df_st_asin_duplicated.drop_duplicates(['search_term', 'asin']).cache() df_st_asin = self.df_st_asin_duplicated.select('search_term', 'asin').cache() df_asin_label = self.df_asin_detail.select("asin", "asin_price", "asin_weight", "asin_volume").cache() df_st_asin = df_st_asin.join( df_asin_label, on='asin', how='inner' ) df_st_volume = self.handle_st_attributes(df=df_st_asin, attributes_type='asin_volume') df_st_price = self.handle_st_attributes(df=df_st_asin, attributes_type='asin_price') df_st_weight = self.handle_st_attributes(df=df_st_asin, attributes_type='asin_weight') df_st_min = df_st_asin.groupby(['search_term']).agg( F.min("asin_volume").alias('st_volume_min'), F.min("asin_price").alias('st_price_min'), F.min("asin_weight").alias('st_weight_min') ) df_st_min = df_st_min.join( df_st_volume, on='search_term', how='left' ).join( df_st_price, on='search_term', how='left' ).join( df_st_weight, on='search_term', how='left' ) df_st_min = df_st_min.withColumn( "st_volume_avg", 1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min ).withColumn( "st_price_avg", 1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min ).withColumn( "st_weight_avg", 1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min ) df_st_min.show(10, truncate=False) self.df_save_st = self.df_save_st.join( df_st_min, on='search_term', how='left' ) def handle_join(self): # st+asin self.df_st_asin = self.df_st_asin.join( self.df_st_asin_flow, on=['page_rank'], how='left' ) # st -- dim_st_detail已经有 # asin self.df_asin_bs = self.df_asin_bs.join( self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left' ) self.df_asin_detail = self.df_asin_detail.join( self.df_asin_bs, on='asin', how='left' ).join( self.df_asin_stable, on='asin', how='left' ) # 合并 self.df_st_asin = self.df_st_asin.join( self.df_st, on=['search_term'], how='left' ).join( self.df_asin_detail, on=['asin'], how='left' ) # self.df_st_asin.show(10, truncate=False) # self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']) self.df_st_asin = self.df_st_asin.cache() self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache() # self.df_st_asin.persist(StorageLevel.MEMORY_ONLY) def handle_st_asin_counts(self, cal_type="asin", df_templates=None): print(f"计算{cal_type}_counts") cal_type_complete = "search_term" if cal_type == "st" else cal_type self.df_st_asin_duplicated = self.df_st_asin_duplicated.withColumn( f"{cal_type}_data_type", F.concat(F.lit(f"{cal_type}_"), self.df_st_asin_duplicated.data_type, F.lit(f"_counts")) ) df = self.df_st_asin_duplicated.groupby([f'{cal_type_complete}']). \ pivot(f"{cal_type}_data_type").count() df = df_templates.unionByName(df, allowMissingColumns=True) # 防止爬虫数据没有导致程序运行出错 df = df.fillna(0) # df.show(10, truncate=False) df = df.withColumn( f"{cal_type}_sb_counts", df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"] ) df = df.withColumn( f"{cal_type}_adv_counts", df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"] ) df = df.withColumn(f"site_name", F.lit(self.site_name)) df = df.withColumn(f"date_type", F.lit(self.date_type)) df = df.withColumn(f"date_info", F.lit(self.date_info)) # df.show(10, truncate=False) return df def handle_st_zr_page1_title_rate(self): print("计算关键词的zr类型page=1的去重asin的标题密度") df_zr_page1 = self.df_st_asin.filter( "data_type='zr' and page=1" ) df_zr_page1 = df_zr_page1.select("search_term", "asin", "asin_title").drop_duplicates(["search_term", "asin"]) df_zr_page1 = df_zr_page1.withColumn( "st_asin_in_title_flag", self.u_is_title_appear(df_zr_page1.search_term, df_zr_page1.asin_title) ) # df_zr_page1.show(10, truncate=False) df_zr_page1 = df_zr_page1.groupby(['search_term']).agg( { "search_term": "count", "st_asin_in_title_flag": "sum", } ) df_zr_page1 = df_zr_page1.withColumnRenamed( "sum(st_asin_in_title_flag)", "st_zr_page1_title_appear_counts" ).withColumnRenamed( "count(search_term)", "st_zr_page1_title_counts" ) df_zr_page1 = df_zr_page1.withColumn( "st_zr_page1_title_appear_rate", df_zr_page1.st_zr_page1_title_appear_counts / df_zr_page1.st_zr_page1_title_counts ) self.df_save_st = self.df_save_st.join( df_zr_page1, on=['search_term'], how='left' ) # df_zr_page1.show(10, truncate=False) # quit() del df_zr_page1 def handle_st_asin_orders(self): # 预估销量+bsr销量 print("1. 预估销量:zr, sp的销量") # 1.1 st+asin self.df_st_asin = self.df_st_asin.withColumn( "st_asin_orders", F.ceil(self.df_st_asin.flow * self.df_st_asin.st_search_sum * self.orders_transform_rate) ) self.df_save_st_asin = self.df_st_asin.withColumn( "st_asin_orders_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_orders")) ) self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]). \ pivot("st_asin_orders_data_type").agg(F.mean(f"st_asin_orders")) self.df_save_st_asin = self.df_save_st_asin.select( "search_term", "asin", "st_asin_zr_orders", "st_asin_sp_orders" ) # self.df_save_st_asin = self.df_save_st_asin.cache() self.df_save_st_asin.persist(StorageLevel.MEMORY_ONLY) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"site_name", F.lit(self.site_name)) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_type", F.lit(self.date_type)) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_info", F.lit(self.date_info)) # self.df_save_st_asin.show(10, truncate=False) # 1.2 st维度的zr和sp预估销量 df_st_orders = self.df_save_st_asin.groupby(['search_term']).agg( F.sum('st_asin_zr_orders').alias("st_zr_orders"), F.sum('st_asin_sp_orders').alias("st_sp_orders"), # F.sum('st_asin_zr_orders').alias("st_zr_orders"), # F.sum('st_asin_sp_orders').alias("st_zr_orders"), ) # df_st_orders = df_st_orders.withColumnRenamed( # "sum(st_asin_zr_orders)", "st_zr_orders" # ).withColumnRenamed( # "sum(st_asin_sp_orders)", "st_sp_orders" # ) self.df_save_st = self.df_save_st.join( df_st_orders, on=['search_term'], how='left' ) # 1.3 asin维度的zr和sp预估销量 df_asin_orders = self.df_save_st_asin.groupby(['asin']).agg( F.mean('st_asin_zr_orders').alias("asin_zr_orders"), F.mean('st_asin_sp_orders').alias("asin_sp_orders"), F.sum('st_asin_zr_orders').alias("asin_zr_orders_sum"), F.sum('st_asin_sp_orders').alias("asin_sp_orders_sum"), ) # df_asin_orders = df_asin_orders.withColumnRenamed( # "avg(st_asin_zr_orders)", "asin_zr_orders" # ).withColumnRenamed( # "avg(st_asin_sp_orders)", "asin_sp_orders" # ).withColumnRenamed( # "sum(st_asin_zr_orders)", "asin_zr_orders_sum" # ).withColumnRenamed( # "sum(st_asin_sp_orders)", "asin_sp_orders_sum" # ) self.df_save_asin = self.df_save_asin.join( df_asin_orders, on=['asin'], how='left' ) # 向上取整 self.df_save_asin = self.df_save_asin.withColumn( "asin_zr_orders", F.ceil(self.df_save_asin.asin_zr_orders) ).withColumn( "asin_sp_orders", F.ceil(self.df_save_asin.asin_sp_orders) ).withColumn( "asin_zr_orders_sum", F.ceil(self.df_save_asin.asin_zr_orders_sum) ).withColumn( "asin_sp_orders_sum", F.ceil(self.df_save_asin.asin_sp_orders_sum) ) print("2. bsr销量") # 2.1 st_bsr_orders df_st_bsr_orders = self.df_st_asin.select("search_term", "asin", "asin_bsr_orders").drop_duplicates(["search_term", "asin"]) df_st_bsr_orders = df_st_bsr_orders.groupby(['search_term']).agg({"asin_bsr_orders": "sum"}) df_st_bsr_orders = df_st_bsr_orders.withColumnRenamed( "sum(asin_bsr_orders)", "st_bsr_orders" ) # 2.2 asin_bsr_orders df_asin_bsr_orders = self.df_st_asin.select("asin", "asin_bsr_orders").drop_duplicates(['asin']) # df_st_bsr_orders.show(10, truncate=False) # df_asin_bsr_orders.show(10, truncate=False) self.df_save_st = self.df_save_st.join( df_st_bsr_orders, on='search_term', how='left' ) self.df_save_asin = self.df_save_asin.join( df_asin_bsr_orders, on='asin', how='left' ) def handle_st_asin_ao(self): print("计算st和asin各自维度的ao") # asin_ao_val和asin_ao_val_rate self.df_save_asin = self.df_save_asin.withColumn( "asin_ao_val", self.df_save_asin.asin_adv_counts / self.df_save_asin.asin_zr_counts ) # self.df_save_asin = self.df_save_asin.fillna({"asin_ao_val": 0}) # 不要把null置为0, null值产生原因是zr类型没有搜到对应的搜索词 window = Window.orderBy(self.df_save_asin.asin_ao_val.asc_nulls_last()) self.df_save_asin = self.df_save_asin.withColumn("asin_ao_val_rate", F.percent_rank().over(window=window)) # st_ao_val和st_ao_val_rate df_asin_ao = self.df_save_asin.select("asin", "asin_ao_val") df_st_ao = self.df_st_asin_duplicated.filter("data_type='zr'").select("search_term", "asin").join( df_asin_ao, on=['asin'], how='left' ) # 新增关键词对应asin的ao升序排序,前4——20的均值 window = Window.partitionBy(['search_term']).orderBy(df_st_ao.asin_ao_val.asc_nulls_last()) # df_st_ao = df_st_ao.withColumn("asin_ao_val_rank", F.dense_rank().over(window=window)) df_st_ao = df_st_ao.withColumn("asin_ao_val_rank", F.row_number().over(window=window)) df_st_ao_4_20 = df_st_ao.filter("asin_ao_val_rank between 4 and 20").select("search_term", "asin_ao_val") df_st_ao_4_20 = df_st_ao_4_20.groupby(["search_term"]).agg(F.mean(df_st_ao_4_20.asin_ao_val).alias("st_4_20_ao_avg")) # df_st_ao.filter("search_term='donna reed'").show(100, truncate=False) # df_st_ao_4_20.filter("search_term='donna reed'").show(100, truncate=False) # quit() df_st_ao = df_st_ao.groupby(["search_term"]).agg({"asin_ao_val": "mean"}) df_st_ao = df_st_ao.withColumnRenamed("avg(asin_ao_val)", "st_ao_val") # 新增关键词对应zr排名在4-20asin的ao均值 # df_st_ao_4_20 = self.df_save_asin.filter("asin_ao_val_rank between 4 and 20").select("search_term", "asin_ao_val") # df_st_ao_4_20 = df_st_ao_4_20.groupby(["search_term"]).agg(F.mean(df_st_ao_4_20.asin_ao_val).alias("st_4_20_ao_avg")) # df_st_ao_4_20 = self.df_st_asin_duplicated.filter("data_type='zr' and page_rank<=20").select("search_term", "asin").join( # df_asin_ao, on=['asin'], how='left' # ) # df_st_ao_4_20 = df_st_ao_4_20.groupby(["search_term"]).agg(F.mean(df_st_ao_4_20.asin_ao_val).alias("st_4_20_ao_avg")) self.df_save_st = self.df_save_st.join( df_st_ao, on=['search_term'], how='left' ).join( df_st_ao_4_20, on=['search_term'], how='left' ) window = Window.orderBy(self.df_save_st.st_ao_val.asc()) self.df_save_st = self.df_save_st.withColumn("st_ao_val_rate", F.percent_rank().over(window=window)) self.df_save_asin = self.df_save_asin.drop("asin_ao_val_rank") def handle_st_num(self): df_num = self.df_brand_analytics.drop_duplicates(['search_term', 'date_info']) df_num = df_num.groupby(['search_term']).count() df_num = df_num.withColumnRenamed("count", "st_num") # self.df_save_st = self.df_save_st.withColumn("st_num", F.lit(1)) self.df_save_st = self.df_save_st.join( df_num, on=['search_term'], how='left' ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdStMeasure(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()