""" 1. 计算上升词,热搜词,新出词 2. quantity_being_sold在售商品数 """ import os import sys import pandas as pd sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class DwdStInfo(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-1'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.date_info_last = str() self.date_info_last2 = str() self.db_save = f'dwd_st_info' self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}, {self.date_type}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.df_save = self.spark.sql(f"select 1+1;") self.df_st_info = self.spark.sql(f"select 1+1;") self.df_repeat = self.spark.sql(f"select 1+1;") self.df_st_asin_title = self.spark.sql(f"select 1+1;") self.df_st_info_current = self.spark.sql(f"select 1+1;") self.df_st_info_last = self.spark.sql(f"select 1+1;") # 上周/月/季度 self.df_st_info_last2 = self.spark.sql(f"select 1+1;") # 上上周/月/季度 self.df_st_info_duplicated = self.spark.sql(f"select 1+1;") # 不在当前周/月/季度 self.df_st_zr_page1_counts = self.spark.sql(f"select 1+1;") self.date_info_tuple = tuple() self.u_is_first = self.spark.udf.register("u_is_first", self.udf_is_first, IntegerType()) self.u_is_ascending = self.spark.udf.register("u_is_ascending", self.udf_is_ascending, IntegerType()) self.u_is_search = self.spark.udf.register("u_is_search", self.udf_is_search, IntegerType()) self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType()) self.reset_partitions(partitions_num=3) self.partitions_by = ['site_name', 'date_type', 'date_info'] self.get_date_info_tuple() @staticmethod def udf_is_first(x): """针对flag字段判断是否为当前周新出的关键词""" if x: return 0 else: return 1 @staticmethod def udf_is_ascending(x): if x >= 0.5: return 1 else: return 0 @staticmethod def udf_is_search(x): if x >= 0.8: return 1 else: return 0 @staticmethod def udf_is_title_appear(search_term, title): if search_term.lower() in title.lower(): return 1 else: return 0 def read_data(self): print("1.1 读取dim_st_info表") if self.date_type == '4_week': sql = f"select * from dim_st_info where (site_name='{self.site_name}' and date_type='month' and date_info in {self.date_info_tuple}) or " \ f"(site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}');" else: sql = f"select * from dim_st_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple};" print("sql:", sql) self.df_st_info = self.spark.sql(sql).cache() self.df_st_info.show(10) print("1.2 读取ods_rank_search_rate_repeat表") sql = f"select rank as st_rank_avg, search_num as st_search_sum, rate as st_search_rate, search_sum as st_search_sum " \ f"from ods_rank_search_rate_repeat where site_name='{self.site_name}';" self.df_repeat = self.spark.sql(sql).cache() self.df_repeat.show(10) print("1.3 读取ods_search_term_rank_zr和ods_asin_detail表") # sql = f"select * from dim_st_asin_base_info left join" \ # f"where site_name='{self.site_name}' and dt in {self.year_week_tuple} and data_type='zr' and page=1 ;" # self.df_st_asin_base_info = self.spark.sql(sql).cache() # self.df_st_asin_base_info.show(10) sql = f""" SELECT a.search_term, a.asin, b.title FROM ods_search_term_rank_zr a left join ods_asin_detail b on a.asin=b.asin and a.dt=b.dt where a.site_name ='{self.site_name}' and b.site_name ='{self.site_name}' and a.page=1 and a.dt in {self.year_week_tuple} and b.dt in {self.year_week_tuple}""" self.df_st_asin_title = self.spark.sql(sql).cache() self.df_st_asin_title = self.df_st_asin_title.drop_duplicates(['search_term', 'asin']) self.df_st_asin_title.show(10) def handle_data(self): self.handle_st_first() self.handle_st_ascending() self.handle_st_search() self.handle_st_asin_title() self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) self.df_save.show(10) def get_date_info_tuple(self): df_week_start = self.df_date.loc[(self.df_date.year_week == '2020-44')] id_start = list(df_week_start.id)[0] if list(df_week_start.id) else 0 if self.date_type in ['week', '4_week']: df_week_current = self.df_date.loc[self.df_date.year_week == self.date_info] elif self.date_type == 'month': df_week_current = self.df_date.loc[self.df_date.year_month == self.date_info] elif self.date_type == 'quarter': df_week_current = self.df_date.loc[self.df_date.year_quarter == self.date_info] else: print("date_type输入错误, 退出") df_week_current = pd.DataFrame() id_current_max = max(list(df_week_current.id)) if list(df_week_current.id) else 0 df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max)] if self.date_type == 'week': self.date_info_tuple = tuple(df_week_all.year_week) if self.date_type == "4_week": df_week_all = self.df_date.loc[(self.df_date.id >= id_start) & (self.df_date.id <= id_current_max - 5)] self.date_info_tuple = tuple(set(df_week_all.year_month)) if self.date_type == 'month': self.date_info_tuple = tuple(set(df_week_all.year_month)) if self.date_type == 'quarter': self.date_info_tuple = tuple(set(df_week_all.year_quarter)) def handle_st_first(self): print("新出词(当前周/4周/月/季度,第1次出现)") # 匹配上周/月/季度 df_week_current = self.df_date.loc[(self.df_date.year_week == self.date_info)] id_current = list(df_week_current.id)[0] if list(df_week_current.id) else 0 id_last = id_current - 1 id_last2 = id_current - 2 df_week_last = self.df_date.loc[self.df_date.id == id_last] df_week_last2 = self.df_date.loc[self.df_date.id == id_last2] self.date_info_last = list(df_week_last.year_week)[0] if list(df_week_last.year_week) else '' self.date_info_last2 = list(df_week_last2.year_week)[0] if list(df_week_last2.year_week) else '' self.df_st_info_current = self.df_st_info.filter(f"date_info='{self.date_info}'") self.df_st_info_last = self.df_st_info.filter(f"date_info='{self.date_info_last}'").select("search_term", "st_rank_avg").withColumnRenamed("st_rank_avg", "st_rank_avg_last") self.df_st_info_last2 = self.df_st_info.filter(f"date_info='{self.date_info_last2}'") self.df_st_info_duplicated = self.df_st_info.filter(f"date_info!='{self.date_info}'") self.df_st_info_duplicated = self.df_st_info_duplicated.select('search_term').dropDuplicates( ['search_term']).withColumn("st_is_first_text", F.lit(0)) self.df_st_info_current = self.df_st_info_current.join( self.df_st_info_duplicated, on='search_term', how='left' ) self.df_st_info_current = self.df_st_info_current.fillna( {"st_is_first_text": 1} ) self.df_st_info_current.show(10, truncate=False) def handle_st_ascending(self): print("上升词(相邻2周/月/季度,上升超过50%的排名)") self.df_st_info_current = self.df_st_info_current.join( self.df_st_info_last, on='search_term', how='left' ) self.df_st_info_current = self.df_st_info_current.na.fill({'st_rank_avg_last': 0}) self.df_st_info_current = self.df_st_info_current.withColumn( "st_is_ascending_text_rate", (self.df_st_info_current.st_rank_avg_last - self.df_st_info_current.st_rank_avg) / self.df_st_info_current.st_rank_avg_last) self.df_st_info_current = self.df_st_info_current.na.fill({'st_is_ascending_text_rate': -1}) self.df_st_info_current = self.df_st_info_current.withColumn( "st_is_ascending_text", self.u_is_ascending(self.df_st_info_current.st_is_ascending_text_rate)) # self.df_st_info_current.select("search_term", "st_rank", "st_rank_last", "st_is_ascending_text_rate", "st_is_ascending_text").show(10, truncate=False) self.df_st_info_current = self.df_st_info_current.drop("st_rank_avg_last") self.df_st_info_current.show(10, truncate=False) def handle_st_search(self): print("热搜词(历史出现占比>=80%)") df_counts = self.df_st_info.groupby(['search_term']).agg(F.count_distinct("date_info").alias("st_week_appear_counts")) df_distinct = self.df_st_info.drop_duplicates(["date_info"]) df_distinct.select("search_term", "date_info").show(20, truncate=False) self.df_st_info_current = self.df_st_info_current.join( df_counts, on='search_term', how='left' ) self.df_st_info_current = self.df_st_info_current.withColumn(f"st_week_counts", F.lit(len(df_distinct.to_pandas_on_spark().date_info.to_numpy()))) self.df_st_info_current.show(20, truncate=False) self.df_st_info_current = self.df_st_info_current.withColumn( "st_is_search_text_rate", self.df_st_info_current[f"st_week_appear_counts"] / self.df_st_info_current[f"st_week_counts"]) self.df_st_info_current = self.df_st_info_current.withColumn( "st_is_search_text", self.u_is_search(self.df_st_info_current.st_is_search_text_rate)) self.df_st_info_current.show(10, truncate=False) def handle_st_quantity(self): pass def handle_st_asin_title(self): # 只针对page=1的zr类型数据进行统计 self.df_st_asin_title = self.df_st_asin_title.withColumn( "st_asin_in_tile_flag", self.u_is_title_appear(self.df_st_asin_title.search_term, self.df_st_asin_title.title) ) df_st_zr_page1_counts = self.df_st_asin_title.groupby("search_term").count() df_st_zr_page1_counts = df_st_zr_page1_counts.withColumnRenamed("count", "st_zr_page1_counts") df_st_zr_page1_in_title_counts = self.df_st_asin_title.filter("st_asin_in_tile_flag=1").groupby("search_term").count() df_st_zr_page1_in_title_counts = df_st_zr_page1_in_title_counts.withColumnRenamed("count", "st_zr_page1_in_title_counts") self.df_st_zr_page1_counts = df_st_zr_page1_counts.join( df_st_zr_page1_in_title_counts, on='search_term', how='left' ) self.df_st_zr_page1_counts = self.df_st_zr_page1_counts.fillna(0) self.df_st_zr_page1_counts = self.df_st_zr_page1_counts.withColumn( "st_zr_page1_in_title_rate", self.df_st_zr_page1_counts.st_zr_page1_in_title_counts / self.df_st_zr_page1_counts.st_zr_page1_counts ) self.df_st_zr_page1_counts.show(10, truncate=False) self.df_save = self.df_st_info_current.join( self.df_st_zr_page1_counts, on='search_term', how='left' ) def handle_data_st_sold(self): print("在售商品数") if self.year >= 2022: df_quantity = self.df_brand_week.filter("quantity_being_sold>0").select("search_term", "quantity_being_sold") df_quantity = df_quantity.groupby(['search_term']).agg({"quantity_being_sold": "mean"}) df_quantity = df_quantity.withColumnRenamed("avg(quantity_being_sold)", "st_quantity_being_sold") df_quantity = df_quantity.withColumn("st_quantity_being_sold", F.ceil(df_quantity.st_quantity_being_sold)) # 向上取整 self.df_brand_current = self.df_brand_current.join(df_quantity, on='search_term', how='left') self.df_brand_current = self.df_brand_current.fillna({"st_quantity_being_sold": 0}) else: self.df_brand_current = self.df_brand_current.withColumn("st_quantity_being_sold", F.lit(0)) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdStInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()