import os import re import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from textblob import Word class DwdStMeasure(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-01'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save_st_asin = f'dwd_st_asin_measure' self.db_save_st = f'dwd_st_measure' self.db_save_asin = f'dwd_asin_measure' self.spark = self.create_spark_object( app_name=f"{self.db_save_st_asin}, {self.db_save_st}, {self.db_save_asin}: {self.site_name}, {self.date_type}, {self.date_info}") # self.df_date = self.get_year_week_tuple() # pandas的df对象 self.get_date_info_tuple() self.get_year_week_tuple() self.get_year_month_days_dict(year=int(self.year)) self.orders_transform_rate = self.get_orders_transform_rate() # 获取月销-->日销,周销 self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_st_asin_flow = self.spark.sql(f"select 1+1;") self.df_st = self.spark.sql(f"select 1+1;") self.df_brand_analytics = self.spark.sql(f"select 1+1;") self.df_st_rate = self.spark.sql(f"select 1+1;") self.df_st_quantity = self.spark.sql(f"select 1+1;") self.df_asin_bs = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_bs_report = self.spark.sql(f"select 1+1;") self.df_st_asin_duplicated = self.spark.sql(f"select 1+1;") self.df_save_st_asin = self.spark.sql(f"select 1+1;") self.df_save_asin = self.spark.sql(f"select 1+1;") self.df_save_st = self.spark.sql(f"select 1+1;") self.df_asin_stable = self.spark.sql(f"select 1+1;") self.df_asin_price_weight = self.spark.sql(f"select 1+1;") self.df_asin_amazon_orders = self.spark.sql(f"select 1+1;") self.df_asin_self = self.spark.sql(f"select 1+1;") self.df_st_templates = self.spark.sql("select st_zr_counts, st_sp_counts, st_sb1_counts,st_sb2_counts,st_sb3_counts,st_ac_counts,st_bs_counts,st_er_counts,st_tr_counts from dwd_st_measure limit 0") self.df_asin_templates = self.spark.sql("select asin_zr_counts, asin_sp_counts, asin_sb1_counts,asin_sb2_counts,asin_sb3_counts,asin_ac_counts,asin_bs_counts,asin_er_counts,asin_tr_counts from dwd_asin_measure limit 0") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType()) def get_orders_transform_rate(self): month_days = self.year_month_days_dict[int(self.month)] if self.date_type in ['day', 'week']: if self.date_type == 'day': return 1 / month_days if self.date_type == 'week': return 7 / month_days else: return 1 @staticmethod def udf_is_title_appear(search_term, title): english_prepositions = ["aboard", "about", "above", "across", "after", "against", "along", "amid", "among", "around", "as", "at", "before", "behind", "below", "beneath", "beside", "between", "beyond", "but", "by", "concerning", "considering", "despite", "down", "during", "except", "for", "from", "in", "inside", "into", "like", "near", "of", "off", "on", "onto", "out", "outside", "over", "past", "regarding", "round", "since", "through", "to", "toward", "under", "underneath", "until", "unto", "up", "upon", "with", "within", "without"] symbol_list = [',', '。', '?', '!', ':', '?', '!', '-', '%', '|', ';', '·', '…', '~', '&', '@', '#', '、', '…', '~', '&', '@', '#', '“', '”', '‘', '’', '〝', '〞', '"', "'", '"', ''', '´', ''', '(', ')', '【', '】', '《', '》', '<', '>', '﹝', '﹞', '<', '>', '«', '»', '‹', '›', '〔', '〕', '〈', '〉', '{', '}', '[', ']', '「', '」', '{', '}', '〖', '〗', '『', '』', '︵', '︷', '︹', '︿', '︽', '﹁', '﹃', '︻', '︗', '/', '\\', '︶', '︸', '︺', '﹀', '︾', '﹂', '﹄', '︼', '︘', '/', '|', '\', '_', '_', '﹏', '﹍', '﹎', '``', '¦', '¡', '^', '\xad', '¨', 'ˊ', '¯', ' ̄', '﹋', '﹉', '﹊', 'ˋ', '︴', '¿', 'ˇ'] # 小写 search_term = str(search_term).lower().replace(",", " ").replace(":", " ").replace(";", " ") # 新增逗号匹配 title = f" {str(title).lower().replace(',', ' ').replace(';', ' ').replace(':', ' ')} " # 1. 去掉特殊符号 # search_term = re.sub(r'[,:()]', '', search_term) # 去掉特殊符号 # title = re.sub(r'[,:()]', '', title) # 去掉特殊符号 # for symbol in symbol_list: # if symbol in title: # title = title.replace(symbol, "") # if symbol in search_term: # search_term = search_term.replace(symbol, "") # 改成正则去掉特殊符号 symbols = "".join(symbol_list) # 将列表中的所有字符连接成一个字符串 search_term = re.sub('[' + symbols + ']', '', search_term) # 去掉特殊符号 title = re.sub('[' + symbols + ']', '', title) # 去掉特殊符号 # 2. 去掉介词(关键词去掉就行) st_list = [f" {st} " for st in search_term.split(" ") if st not in english_prepositions] # 去掉介词 # 3. 复数一起匹配 for st in st_list: # if st in symbol_list: # st = st.replace(symbol, "") if st not in title: if Word(st) not in title: return 0 return 1 # 旧版 # if str(search_term).lower() in str(title).lower(): # return 1 # else: # return 0 def read_data(self): print("self.year, self.month:", self.year, self.month) print("1 读取st+asin两个维度: dim_st_asin_info表和ods_rank_flow表") print("1.1 读取dim_st_asin_info表") if self.date_type == 'month_old': # self.get_year_week_tuple() if int(self.month) <= 9 and int(self.year) <= 2022: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='month' and date_info ='{self.date_info}'" else: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" else: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" if date_type in ['month', 'month_week'] and ((self.site_name == 'us' and date_info >= '2023-10') or (self.site_name in ['uk', 'de'] and self.date_info >= '2024-05')): sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='month' and date_info ='{self.date_info}'" # else: # if (int(self.year) == 2022 and int(self.month) < 10) or int(self.year) <= 2021: # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" # else: # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin.show(10, truncate=False) # self.df_st_asin.filter("search_term='abiie high chair'").show(100, truncate=False) # quit() # self.df_st_asin = self.df_st_asin.drop_duplicates(["search_term", "asin", "data_type", "date_info"]).cache() # self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin']).cache() # print("self.df_st_asin:", self.df_st_asin.count()) # print("self.df_st_asin_duplicated:", self.df_st_asin_duplicated.count()) # self.df_st_asin.show(10, truncate=False) # self.df_asin = self.df_st_asin.select("asin").drop_duplicates(["asin"]) # self.df_st = self.df_st_asin.select("search_term").drop_duplicates(["search_term"]) print("1.2 读取ods_rank_flow表") sql = f"select rank as page_rank, flow from ods_rank_flow " \ f"where site_name='{self.site_name}'" self.df_st_asin_flow = self.spark.sql(sql).cache() # self.df_st_asin_flow.persist(StorageLevel.MEMORY_ONLY) self.df_st_asin_flow.show(10, truncate=False) print("2 读取st维度: dim_st_detail表和ods_brand_analytics表") print("self.year, self.month:", self.year, self.month) print("2.1 读取dim_st_detail和ods_brand_analytics表") sql = f"select search_term, st_rank, st_search_sum from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info ='{self.date_info}';" print("sql:", sql) self.df_st = self.spark.sql(sqlQuery=sql) self.df_st.persist(StorageLevel.MEMORY_ONLY) self.df_st.show(10, truncate=False) # 统计词频 print("2.2 读取ods_brand_analytics表") # sql = f"select search_term, date_info from ods_brand_analytics where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" sql = f"select search_term, date_info from ods_brand_analytics where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" print("sql:", sql) self.df_brand_analytics = self.spark.sql(sqlQuery=sql) self.df_brand_analytics.persist(StorageLevel.MEMORY_ONLY) self.df_brand_analytics.show(10, truncate=False) print("3 读取asin维度: dim_asin_bs_info+dim_asin_detail表") print("3.1 读取dim_asin_bs_info表") sql = f"select asin, asin_bs_cate_1_rank, asin_bs_cate_1_id " \ f"from dim_asin_bs_info where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';" print("sql:", sql) self.df_asin_bs = self.spark.sql(sql).cache() self.df_asin_bs.show(10) sql = f"select asin, asin_title, asin_price, parent_asin " \ f"from dim_asin_detail where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}';" print("sql:", sql) self.df_asin_detail = self.spark.sql(sql).cache() self.df_asin_detail.show(10) print("4 读取bsr维度: ods_one_category_report表") print("4.1 读取ods_one_category_report表") if int(self.year) == 2022 and int(self.month) < 3: sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders*{self.orders_transform_rate}) as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='2022-12';" else: month = f"0{str(self.month)}" if len(str(self.month)) == 1 else str(self.month) sql = f"select category_id as asin_bs_cate_1_id, rank as asin_bs_cate_1_rank, ceil(orders*{self.orders_transform_rate}) as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{month}';" print("sql:", sql) self.df_bs_report = self.spark.sql(sqlQuery=sql) self.df_bs_report.persist(StorageLevel.MEMORY_ONLY) self.df_bs_report.show(10, truncate=False) print("5 读取asin维度-体积信息: dim_asin_stable_info表") sql = f"select asin, asin_length * asin_width * asin_height as asin_volume, asin_weight from dim_asin_stable_info where site_name='{self.site_name}'" print("sql:", sql) self.df_asin_stable = self.spark.sql(sqlQuery=sql).cache() self.df_asin_stable.show(10, truncate=False) print("6 读取asin维度-月销数据: dim_asin_amorders_info表") sql = f"select asin, asin_amazon_orders from dim_asin_amorders_info where site_name='{self.site_name}' and date_type='{self.date_type.replace('_old', '')}' and date_info='{self.date_info}'" print("sql:", sql) self.df_asin_amazon_orders = self.spark.sql(sqlQuery=sql).cache() self.df_asin_amazon_orders.show(10, truncate=False) print("7 读取asin维度-内部asin: ods_self_asin") sql = f"select asin, 1 as is_self_asin from ods_self_asin where site_name='{self.site_name}' group by asin" print("sql:", sql) self.df_asin_self = self.spark.sql(sqlQuery=sql) self.df_asin_self = F.broadcast(self.df_asin_self) self.df_asin_self.show(10, truncate=False) def save_data(self): self.reset_partitions(partitions_num=50) self.save_data_common( df_save=self.df_save_st_asin, db_save=self.db_save_st_asin, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=5) self.save_data_common( df_save=self.df_save_st, db_save=self.db_save_st, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=10) self.save_data_common( df_save=self.df_save_asin, db_save=self.db_save_asin, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) def handle_data(self): self.handle_join() self.df_save_asin = self.handle_st_asin_counts(cal_type="asin", df_templates=self.df_asin_templates, page=3) self.df_save_st = self.handle_st_asin_counts(cal_type="st", df_templates=self.df_st_templates) # self.handle_st_zr_page1_title_rate() self.handle_st_zr_sp_page123_title_rate(data_type='zr', page_type='page1') self.handle_st_zr_sp_page123_title_rate(data_type='zr', page_type='page123') self.handle_st_zr_sp_page123_title_rate(data_type='sp', page_type='page123') self.handle_st_asin_orders() # 预估销量和bsr销量 self.handle_asin_ao_and_zr_flow_proportion() self.handle_st_ao_and_zr_flow_proportion() self.handle_st_num() self.handle_st_weight_price_volume() # self.df_save_st.filter("search_term='abiie high chair'").show(10, truncate=False) # self.df_save_st_asin.filter("search_term='abiie high chair'").show(100, truncate=False) # self.df_save_st_asin.show(10, truncate=False) # self.df_save_st.show(10, truncate=False) del self.df_st_asin_duplicated del self.df_st_asin # self.df_save_asin.show(10, truncate=False) # quit() def handle_st_attributes_avg(self, df_st_asin, attributes_type, st_type): # 根据基准值,计算平均值 df_st_asin = df_st_asin.select("search_term", f"{attributes_type}").filter(f'{attributes_type} > 0') # 过滤大于基准值的几率 df_st_asin = df_st_asin.filter(F.col(f"{attributes_type}") <= F.col(st_type)) df_st_avg = df_st_asin.groupby(['search_term']).agg( F.round(F.avg(f"{attributes_type}"), 4).alias(f'{st_type.replace("_std", "_avg")}') ) return df_st_avg def handle_st_attributes_std(self, df, attributes_type='asin_volume'): # 计算基准值 # 定义窗口函数 window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}")) # 计算百分比排名并筛选 <= 0.25 的记录 df = df.select("search_term", f"{attributes_type}").filter(f'{attributes_type} > 0') \ .withColumn(f"{attributes_type}_percent_rank", F.percent_rank().over(window)) \ .filter(f'{attributes_type}_percent_rank <= 0.25') \ # 使用 row_number() 方法获取每个 search_term 的最大百分比排名记录 window = Window.partitionBy(['search_term']).orderBy(F.desc(f"{attributes_type}_percent_rank")) df = df.withColumn(f"{attributes_type}_row_number", F.row_number().over(window)) \ .filter(f'{attributes_type}_row_number = 1') # 显示结果 df = df.drop(f"{attributes_type}_percent_rank", f"{attributes_type}_row_number") df = df.withColumnRenamed(f"{attributes_type}", f"{attributes_type.replace('asin', 'st')}_25_percent") df.show(10, truncate=False) return df def handle_st_weight_price_volume(self): # self.df_st_asin_duplicated = self.df_st_asin_duplicated.drop_duplicates(['search_term', 'asin']).cache() df_st_asin = self.df_st_asin_duplicated.select('search_term', 'asin').drop_duplicates(['search_term', 'asin']).cache() df_asin_label = self.df_asin_detail.select("asin", "asin_price", "asin_weight", "asin_volume").cache() df_st_asin = df_st_asin.join( df_asin_label, on='asin', how='inner' ) # df_st_asin.filter("search_term='airpods'").show(100, truncate=False) # # quit() # 取四分位值 df_st_volume = self.handle_st_attributes_std(df=df_st_asin, attributes_type='asin_volume') df_st_price = self.handle_st_attributes_std(df=df_st_asin, attributes_type='asin_price') df_st_weight = self.handle_st_attributes_std(df=df_st_asin, attributes_type='asin_weight') # 取最小值 df_st_min = df_st_asin.groupby(['search_term']).agg( F.round(F.min("asin_volume"), 4).alias('st_volume_min'), F.round(F.min("asin_price"), 4).alias('st_price_min'), F.round(F.min("asin_weight"), 4).alias('st_weight_min') ) df_st_min = df_st_min.join( df_st_volume, on='search_term', how='left' ).join( df_st_price, on='search_term', how='left' ).join( df_st_weight, on='search_term', how='left' ) # 基准值 df_st_std = df_st_min.withColumn( "st_volume_std", F.round(1.5 * (df_st_min.st_volume_25_percent - df_st_min.st_volume_min) + df_st_min.st_volume_min, 4) ).withColumn( "st_price_std", F.round(1.5 * (df_st_min.st_price_25_percent - df_st_min.st_price_min) + df_st_min.st_price_min, 4) ).withColumn( "st_weight_std", F.round(1.5 * (df_st_min.st_weight_25_percent - df_st_min.st_weight_min) + df_st_min.st_weight_min, 4) ) # df_st_min.show(10, truncate=False) # 四分位平均值 df_st_asin = df_st_asin.join( df_st_std, on="search_term", how="left" ) df_st_volume_avg = self.handle_st_attributes_avg(df_st_asin=df_st_asin, attributes_type='asin_volume', st_type="st_volume_std") df_st_price_avg = self.handle_st_attributes_avg(df_st_asin=df_st_asin, attributes_type='asin_price', st_type="st_price_std") df_st_weight_avg = self.handle_st_attributes_avg(df_st_asin=df_st_asin, attributes_type='asin_weight', st_type="st_weight_std") df_st_avg = df_st_std.join( df_st_volume_avg, on='search_term', how='left' ).join( df_st_price_avg, on='search_term', how='left' ).join( df_st_weight_avg, on='search_term', how='left' ) # df_st_avg.show(10, truncate=False) # df_st_avg.filter("search_term='airpods'").show(10, truncate=False) self.df_save_st = self.df_save_st.join( df_st_avg, on='search_term', how='left' ) def handle_join(self): # st+asin self.df_st_asin = self.df_st_asin.join( self.df_st_asin_flow, on=['page_rank'], how='left' ) # st -- dim_st_detail已经有 # asin self.df_asin_bs = self.df_asin_bs.join( self.df_bs_report, on=['asin_bs_cate_1_rank', 'asin_bs_cate_1_id'], how='left' ) self.df_asin_detail = self.df_asin_detail.join( self.df_asin_bs, on='asin', how='left' ).join( self.df_asin_stable, on='asin', how='left' ) # 合并 self.df_st_asin = self.df_st_asin.join( self.df_st, on=['search_term'], how='left' ).join( self.df_asin_detail, on=['asin'], how='left' ) window = Window.partitionBy(['search_term', 'asin', 'data_type']).orderBy('page') self.df_st_asin = self.df_st_asin.withColumn("rk", F.row_number().over(window=window)) self.df_st_asin_duplicated = self.df_st_asin.filter("rk=1").drop("rk").cache() # self.df_st_asin.show(10, truncate=False) # self.df_st_asin = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']) # self.df_st_asin = self.df_st_asin.cache() # self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin', 'data_type']).cache() # self.df_st_asin.persist(StorageLevel.MEMORY_ONLY) def handle_st_asin_counts(self, cal_type="asin", df_templates=None, page=3): print(f"计算{cal_type}_counts") cal_type_complete = "search_term" if cal_type == "st" else cal_type self.df_st_asin_duplicated = self.df_st_asin_duplicated.filter(f"page <= {page}").withColumn( f"{cal_type}_data_type", F.concat(F.lit(f"{cal_type}_"), self.df_st_asin_duplicated.data_type, F.lit(f"_counts")) ) df = self.df_st_asin_duplicated.groupby([f'{cal_type_complete}']). \ pivot(f"{cal_type}_data_type").count() df = df_templates.unionByName(df, allowMissingColumns=True) # 防止爬虫数据没有导致程序运行出错 df = df.fillna(0) # df.show(10, truncate=False) df = df.withColumn( f"{cal_type}_sb_counts", df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"] ) df = df.withColumn( f"{cal_type}_adv_counts", df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"] ) if cal_type == "asin": df_st_asin_duplicated = self.df_st_asin_duplicated.drop_duplicates(['search_term', 'asin']) df_st_asin_agg = df_st_asin_duplicated.groupby(['asin']).agg( F.count('search_term').alias("asin_st_counts") ) df = df.join( df_st_asin_agg, on=['asin'], how='left' ) elif cal_type == "st": df_st_asin_agg = self.df_st_asin_duplicated.select("search_term", "asin").join( self.df_asin_self, on='asin', how='left' ).withColumn( "is_self_asin", F.when(F.col("is_self_asin").isNotNull(), F.col("is_self_asin")).otherwise(F.lit(0)) ).groupby(['search_term']).agg( F.sum('is_self_asin').alias("st_self_asin_counts"), F.count('asin').alias("st_total_asin_counts") ).withColumn( 'st_self_asin_proportion', F.round(F.col('st_self_asin_counts') / F.col('st_total_asin_counts'), 4) ) df = df.join( df_st_asin_agg, on=['search_term'], how='left' ) df = df.withColumn(f"site_name", F.lit(self.site_name)) df = df.withColumn(f"date_type", F.lit(self.date_type)) df = df.withColumn(f"date_info", F.lit(self.date_info)) # df.show(10, truncate=False) return df def handle_st_zr_sp_page123_title_rate(self, data_type='zr', page_type="page1"): params = ' and page=1' if page_type == 'page1' else "" df = self.df_st_asin.filter(f"data_type='{data_type}' {params}") df = df.select("search_term", "asin", "asin_title").drop_duplicates(["search_term", "asin"]) df = df.withColumn( "st_asin_in_title_flag", self.u_is_title_appear(df.search_term, df.asin_title) ) # 测试--保留需要的数据 # s_str = """transformers # barbie # ring doorbell # iphone 14 pro max case # nintendo switch # prime day deals today 2023 # the boys # lube # marvelous mrs maisel # hydrogen peroxide""" # s_tuple = tuple(s_str.split("\n")) # df_csv = df.filter(f"search_term in {s_tuple}").toPandas() # df_csv.to_csv(rf"/root/{data_type}_{page_type}.csv", index=False) df = df.groupby(['search_term']).agg( { "search_term": "count", "st_asin_in_title_flag": "sum", } ) df = df.withColumnRenamed( "sum(st_asin_in_title_flag)", f"st_{data_type}_{page_type}_title_appear_counts" ).withColumnRenamed( "count(search_term)", f"st_{data_type}_{page_type}_title_counts" ) df = df.withColumn( f"st_{data_type}_{page_type}_title_appear_rate", # df.st_zr_page1_title_appear_counts / df.st_zr_page1_title_counts F.round(F.col(f"st_{data_type}_{page_type}_title_appear_counts") / F.col(f"st_{data_type}_{page_type}_title_counts"), 4) ) self.df_save_st = self.df_save_st.join( df, on=['search_term'], how='left' ) def handle_st_zr_page1_title_rate(self): print("计算关键词的zr类型page=1的去重asin的标题密度") df_zr_page1 = self.df_st_asin.filter( "data_type='zr' and page=1" ) df_zr_page1 = df_zr_page1.select("search_term", "asin", "asin_title").drop_duplicates(["search_term", "asin"]) df_zr_page1 = df_zr_page1.withColumn( "st_asin_in_title_flag", self.u_is_title_appear(df_zr_page1.search_term, df_zr_page1.asin_title) ) # df_zr_page1.show(10, truncate=False) df_zr_page1 = df_zr_page1.groupby(['search_term']).agg( { "search_term": "count", "st_asin_in_title_flag": "sum", } ) df_zr_page1 = df_zr_page1.withColumnRenamed( "sum(st_asin_in_title_flag)", "st_zr_page1_title_appear_counts" ).withColumnRenamed( "count(search_term)", "st_zr_page1_title_counts" ) df_zr_page1 = df_zr_page1.withColumn( "st_zr_page1_title_appear_rate", df_zr_page1.st_zr_page1_title_appear_counts / df_zr_page1.st_zr_page1_title_counts ) self.df_save_st = self.df_save_st.join( df_zr_page1, on=['search_term'], how='left' ) # df_zr_page1.show(10, truncate=False) # quit() del df_zr_page1 def handle_st_asin_orders(self): # 预估销量+bsr销量 print("1. 预估销量:zr, sp的销量") # 1.1 st+asin self.df_st_asin = self.df_st_asin.withColumn( "st_asin_orders", F.ceil(self.df_st_asin.flow * self.df_st_asin.st_search_sum * self.orders_transform_rate) ) self.df_save_st_asin = self.df_st_asin.withColumn( "st_asin_orders_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_orders")) ) self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]). \ pivot("st_asin_orders_data_type").agg(F.mean(f"st_asin_orders")) self.df_save_st_asin = self.df_save_st_asin.select( "search_term", "asin", "st_asin_zr_orders", "st_asin_sp_orders" ) # self.df_save_st_asin = self.df_save_st_asin.cache() self.df_save_st_asin.persist(StorageLevel.MEMORY_ONLY) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"site_name", F.lit(self.site_name)) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_type", F.lit(self.date_type)) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_info", F.lit(self.date_info)) # self.df_save_st_asin.show(10, truncate=False) # 1.2 st维度的zr和sp预估销量 df_st_orders = self.df_save_st_asin.groupby(['search_term']).agg( F.sum('st_asin_zr_orders').alias("st_zr_orders"), F.sum('st_asin_sp_orders').alias("st_sp_orders"), # F.sum('st_asin_zr_orders').alias("st_zr_orders"), # F.sum('st_asin_sp_orders').alias("st_zr_orders"), ) # df_st_orders = df_st_orders.withColumnRenamed( # "sum(st_asin_zr_orders)", "st_zr_orders" # ).withColumnRenamed( # "sum(st_asin_sp_orders)", "st_sp_orders" # ) self.df_save_st = self.df_save_st.join( df_st_orders, on=['search_term'], how='left' ) # 1.3 asin维度的zr和sp预估销量 df_asin_orders = self.df_save_st_asin.groupby(['asin']).agg( F.mean('st_asin_zr_orders').alias("asin_zr_orders"), F.mean('st_asin_sp_orders').alias("asin_sp_orders"), F.sum('st_asin_zr_orders').alias("asin_zr_orders_sum"), F.sum('st_asin_sp_orders').alias("asin_sp_orders_sum"), ) # df_asin_orders = df_asin_orders.withColumnRenamed( # "avg(st_asin_zr_orders)", "asin_zr_orders" # ).withColumnRenamed( # "avg(st_asin_sp_orders)", "asin_sp_orders" # ).withColumnRenamed( # "sum(st_asin_zr_orders)", "asin_zr_orders_sum" # ).withColumnRenamed( # "sum(st_asin_sp_orders)", "asin_sp_orders_sum" # ) self.df_save_asin = self.df_save_asin.join( df_asin_orders, on=['asin'], how='left' ) # 向上取整 self.df_save_asin = self.df_save_asin.withColumn( "asin_zr_orders", F.ceil(self.df_save_asin.asin_zr_orders) ).withColumn( "asin_sp_orders", F.ceil(self.df_save_asin.asin_sp_orders) ).withColumn( "asin_zr_orders_sum", F.ceil(self.df_save_asin.asin_zr_orders_sum) ).withColumn( "asin_sp_orders_sum", F.ceil(self.df_save_asin.asin_sp_orders_sum) ) print("2. bsr销量") # 2.1 st_bsr_orders df_st_bsr_orders = self.df_st_asin.select("search_term", "asin", "asin_bsr_orders").drop_duplicates(["search_term", "asin"]) df_st_bsr_orders = df_st_bsr_orders.groupby(['search_term']).agg({"asin_bsr_orders": "sum"}) df_st_bsr_orders = df_st_bsr_orders.withColumnRenamed( "sum(asin_bsr_orders)", "st_bsr_orders" ) # 2.2 asin_bsr_orders df_asin_bsr_orders = self.df_st_asin.select("asin", "asin_bsr_orders").drop_duplicates(['asin']) # df_st_bsr_orders.show(10, truncate=False) # df_asin_bsr_orders.show(10, truncate=False) self.df_save_st = self.df_save_st.join( df_st_bsr_orders, on='search_term', how='left' ) self.df_save_asin = self.df_save_asin.join( df_asin_bsr_orders, on='asin', how='left' ).join( self.df_asin_amazon_orders, on='asin', how='left' ) def handle_asin_ao_and_zr_flow_proportion(self): print("计算asin维度的ao+zr流量占比") # 1.计算asin的ao值 self.df_save_asin = self.df_save_asin.withColumn( "asin_ao_val", F.round(self.df_save_asin.asin_adv_counts / self.df_save_asin.asin_zr_counts, 3) ) # 2.计算asin的ao竞争比例 window = Window.orderBy(self.df_save_asin.asin_ao_val.asc_nulls_last()) self.df_save_asin = self.df_save_asin.withColumn( "asin_ao_val_rate", F.round(F.percent_rank().over(window=window), 4) ) # 3.计算asin的自然流量占比 self.df_save_asin = self.df_save_asin.withColumn( "asin_zr_flow_proportion", F.when(F.col("asin_st_counts").isNotNull(), F.round(F.col("asin_zr_counts") / F.col("asin_st_counts"), 3)) ) self.df_save_asin = self.df_save_asin.join( self.df_asin_detail.select("asin", "parent_asin"), on='asin', how='left' ) # 4.计算asin的母体ao值和母体zr流量占比 df_asin_variation = self.df_save_asin.filter("parent_asin is not null").select("parent_asin", "asin_zr_counts", "asin_st_counts", "asin_adv_counts") df_asin_variation_agg = df_asin_variation.groupby(['parent_asin']).agg( F.sum("asin_zr_counts").alias("sum_asin_zr_counts"), F.sum("asin_st_counts").alias("sum_asin_st_counts"), F.sum("asin_adv_counts").alias("sum_asin_adv_counts") ).withColumn( "asin_flow_proportion_matrix", F.when( F.col("sum_asin_st_counts").isNotNull(), F.round(F.col("sum_asin_zr_counts") / F.col("sum_asin_st_counts"), 3) ) ).withColumn( "asin_ao_val_matrix", F.when( F.col("sum_asin_zr_counts").isNotNull(), F.round(F.col("sum_asin_adv_counts") / F.col("sum_asin_zr_counts"), 3) ) ).drop("sum_asin_zr_counts", "sum_asin_st_counts", "sum_asin_adv_counts") self.df_save_asin = self.df_save_asin.join( df_asin_variation_agg, on=['parent_asin'], how='left' ) # 5.若母体自然流量占比为null,则用asin的自然流量占比替代,ao同理 self.df_save_asin = self.df_save_asin.withColumn( "asin_flow_proportion_matrix", F.coalesce(F.col("asin_flow_proportion_matrix"), F.col("asin_zr_flow_proportion")) ).withColumn( "asin_ao_val_matrix", F.coalesce(F.col("asin_ao_val_matrix"), F.col("asin_ao_val")) ) self.df_save_asin.show(10, truncate=False) self.df_save_asin = self.df_save_asin.drop("parent_asin") def handle_st_ao_and_zr_flow_proportion(self): print("计算st维度的ao+zr流量占比") # 1.得到asin的ao值和zr流量占比 df_asin_ao_and_zr_flow_proportion = self.df_save_asin.select("asin", "asin_ao_val", "asin_zr_flow_proportion", "asin_ao_val_matrix", "asin_flow_proportion_matrix") df_st_ao_and_zr_flow_proportion = self.df_st_asin_duplicated.filter("data_type='zr'").select("search_term", "asin", "page").join( df_asin_ao_and_zr_flow_proportion, on=['asin'], how='left' ) # 2.新增asin的ao值升序排序,计算排名4到20的均值 window = Window.partitionBy(['search_term']).orderBy(df_st_ao_and_zr_flow_proportion.asin_ao_val.asc_nulls_last()) df_st_ao_4_20 = df_st_ao_and_zr_flow_proportion.withColumn( "asin_ao_val_rank", F.row_number().over(window=window) ).filter( "asin_ao_val_rank between 4 and 20" ).select( "search_term", "asin_ao_val" ) df_st_ao_4_20 = df_st_ao_4_20\ .groupby(["search_term"])\ .agg(F.round(F.mean(df_st_ao_4_20.asin_ao_val), 3).alias("st_4_20_ao_avg")) # 3.计算st的ao值和zr流量占比--首页zr位asin的平均值 df_st_ao = df_st_ao_and_zr_flow_proportion\ .filter("page=1 and asin_ao_val is not null")\ .groupby(["search_term"])\ .agg(F.round(F.mean("asin_ao_val"), 3).alias("st_ao_val")) df_st_zr_flow_proportion = df_st_ao_and_zr_flow_proportion\ .filter("page=1 and asin_zr_flow_proportion is not null")\ .groupby(["search_term"])\ .agg(F.round(F.mean("asin_zr_flow_proportion"), 3).alias("st_zr_flow_proportion")) self.df_save_st = self.df_save_st.join( df_st_ao, on=['search_term'], how='left' ).join( df_st_zr_flow_proportion, on=['search_term'], how='left' ).join( df_st_ao_4_20, on=['search_term'], how='left' ) # 4.计算st的ao竞争比例 window = Window.orderBy(self.df_save_st.st_ao_val.asc()) self.df_save_st = self.df_save_st.withColumn( "st_ao_val_rate", F.round(F.percent_rank().over(window=window), 4) ) # 5.计算st的母体ao值和母体zr流量占比--首页zr位asin的母体平均值 df_st_ao_val_matrix = df_st_ao_and_zr_flow_proportion\ .filter("page=1 and asin_ao_val_matrix is not null")\ .groupby(["search_term"])\ .agg(F.round(F.mean("asin_ao_val_matrix"), 3).alias("st_ao_val_matrix")) df_st_flow_proportion_matrix = df_st_ao_and_zr_flow_proportion\ .filter("page=1 and asin_flow_proportion_matrix is not null")\ .groupby(["search_term"])\ .agg(F.round(F.mean("asin_flow_proportion_matrix"), 3).alias("st_flow_proportion_matrix")) self.df_save_st = self.df_save_st.join( df_st_ao_val_matrix, on=['search_term'], how='left' ).join( df_st_flow_proportion_matrix, on=['search_term'], how='left' ) self.df_save_st.show(10, truncate=False) def handle_st_num(self): df_num = self.df_brand_analytics.drop_duplicates(['search_term', 'date_info']) df_num = df_num.groupby(['search_term']).count() df_num = df_num.withColumnRenamed("count", "st_num") # self.df_save_st = self.df_save_st.withColumn("st_num", F.lit(1)) self.df_save_st = self.df_save_st.join( df_num, on=['search_term'], how='left' ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdStMeasure(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()