import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from pyspark.storagelevel import StorageLevel from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates_test import Templates from pyspark.sql.types import StringType, IntegerType # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F class DwdStMeasure(Templates): def __init__(self, site_name='us', date_type="month", date_info='2022-1'): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save_st_asin = f'dwd_st_asin_measure' self.db_save_st = f'dwd_st_measure' self.db_save_asin = f'dwd_asin_measure' self.spark = self.create_spark_object( app_name=f"{self.db_save_st_asin}, {self.db_save_st}, {self.db_save_asin}: {self.site_name}, {self.date_type}, {self.date_info}") # self.df_date = self.get_year_week_tuple() # pandas的df对象 self.get_date_info_tuple() self.df_st_detail = self.spark.sql(f"select 1+1;") self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_st_asin_flow = self.spark.sql(f"select 1+1;") self.df_st_rate = self.spark.sql(f"select 1+1;") self.df_asin_history = self.spark.sql(f"select 1+1;") self.df_bs_report = self.spark.sql(f"select 1+1;") self.df_st_quantity = self.spark.sql(f"select 1+1;") self.df_st_asin_duplicated = self.spark.sql(f"select 1+1;") self.df_save_st_asin = self.spark.sql(f"select 1+1;") self.df_save_asin = self.spark.sql(f"select 1+1;") self.df_save_st = self.spark.sql(f"select 1+1;") self.partitions_num = 3 self.reset_partitions(partitions_num=self.partitions_num) self.partitions_by = ['site_name', 'date_type', 'date_info'] def read_data(self): print("1.1 读取dim_st_detail表") sql = f"select search_term, st_rank from dim_st_detail where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple}" print("sql:", sql) self.df_st_detail = self.spark.sql(sqlQuery=sql).cache() # self.df_st_detail.show(10, truncate=False) print("1.1 读取dim_st_asin_info表") sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin = self.df_st_asin.drop_duplicates(["search_term", "asin", "data_type", "date_info"]).cache() self.df_st_asin_duplicated = self.df_st_asin.drop_duplicates(['search_term', 'asin']).cache() print("self.df_st_asin:", self.df_st_asin.count()) print("self.df_st_asin_duplicated:", self.df_st_asin_duplicated.count()) # self.df_st_asin.show(10, truncate=False) # self.df_asin = self.df_st_asin.select("asin").drop_duplicates(["asin"]) # self.df_st = self.df_st_asin.select("search_term").drop_duplicates(["search_term"]) print("1.2 读取ods_rank_flow表") sql = f"select rank as st_asin_zr_page_rank, rank as st_asin_sp_page_rank, flow as st_asin_zr_rate, flow as st_asin_sp_rate from ods_rank_flow " \ f"where site_name='{self.site_name}'" self.df_st_asin_flow = self.spark.sql(sql).cache() # self.df_st_asin_flow.show(10, truncate=False) print("1.3 读取ods_rank_search_rate_repeat表") sql = f"select rank as st_rank, search_num as st_search_num, rate as st_search_rate, search_sum as st_search_sum " \ f"from ods_rank_search_rate_repeat where site_name='{self.site_name}';" self.df_st_rate = self.spark.sql(sql).cache() # self.df_st_rate.show(10) # 1.4 获取asin的bs_id, 卖家, 店铺等 print("1.4 读取dim_cal_asin_history_detail表") sql = f"select asin, asin_rank, bsr_cate_1_id, asin_title " \ f"from dim_cal_asin_history_detail where site_name='{self.site_name}';" self.df_asin_history = self.spark.sql(sql).cache() # self.df_asin_history.show(10) # 1.5 ods_one_category_report print("1.5 读取ods_one_category_report表") sql = f"select cate_1_id as bsr_cate_1_id, rank as asin_rank, orders as asin_bsr_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and dm='2022-11';" self.df_bs_report = self.spark.sql(sqlQuery=sql).cache() # self.df_bs_report.show(10, truncate=False) # 1.6 ods_st_quantity_being_sold print("1.6 读取ods_st_quantity_being_sold表") sql = f"select search_term, quantity_being_sold as st_quantity_being_sold from ods_st_quantity_being_sold " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple};" self.df_st_quantity = self.spark.sql(sqlQuery=sql).cache() def save_data(self): self.reset_partitions(partitions_num=50) self.save_data_common( df_save=self.df_save_st_asin, db_save=self.db_save_st_asin, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=10) self.save_data_common( df_save=self.df_save_st, db_save=self.db_save_st, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) self.reset_partitions(partitions_num=10) self.save_data_common( df_save=self.df_save_asin, db_save=self.db_save_asin, partitions_num=self.partitions_num, partitions_by=self.partitions_by ) def handle_st_zr_page1_title_rate(self): df_zr_page1 = self.df_st_asin.filter( "data_type='zr' and page=1" ) df_zr_page1 = df_zr_page1.drop_duplicates(["search_term", "asin"]) df_zr_page1 = df_zr_page1.join( self.df_asin_history ) def handle_data(self): self.handle_st_info() self.handle_st_asin_info() self.df_save_asin = self.handle_st_asin_counts(cal_type="asin") self.df_save_st = self.handle_st_asin_counts(cal_type="st") self.df_save_st = self.df_save_st.join( self.df_st_detail, on=['search_term'], how='left' ) self.handle_st_asin_ao() self.handle_st_asin_orders() self.handle_st_asin_bsr_orders() self.df_save_st_asin.show(10, truncate=False) self.df_save_st.show(10, truncate=False) self.df_save_asin.show(10, truncate=False) quit() def handle_st_info(self): # 处理在售商品数 self.df_st_quantity = self.df_st_quantity.filter("st_quantity_being_sold > 0").groupby(['search_term']).agg( {"st_quantity_being_sold": "mean"} ) self.df_st_quantity = self.df_st_quantity.withColumnRenamed( "avg(st_quantity_being_sold)", "st_quantity_being_sold" ) self.df_st_detail = self.df_st_detail.join( self.df_st_quantity, on=["search_term"], how="left" ).join( self.df_st_rate, on=["st_rank"], how="left" ) def handle_st_asin_info(self): # self.df_save_st_asin = self.df_st_asin.filter("data_type in ('zr', 'sp')").withColumn( self.df_save_st_asin = self.df_st_asin.withColumn( "page_rank_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_rank")) ) self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]). \ pivot("page_rank_data_type").agg(F.min(f"page_rank")) self.df_save_st_asin = self.df_save_st_asin. \ join(self.df_st_asin_flow.select("st_asin_zr_page_rank", "st_asin_zr_rate"), on=["st_asin_zr_page_rank"], how="left"). \ join(self.df_st_asin_flow.select("st_asin_sp_page_rank", "st_asin_sp_rate"), on=["st_asin_sp_page_rank"], how="left"). \ join(self.df_st_detail, on=["search_term"], how="inner") # self.df_save_st_asin.show(10, truncate=False) def handle_st_asin_counts(self, cal_type="asin"): print(f"计算{cal_type}_counts") cal_type_complete = "search_term" if cal_type == "st" else cal_type self.df_st_asin = self.df_st_asin.withColumn( f"{cal_type}_data_type", F.concat(F.lit(f"{cal_type}_"), self.df_st_asin.data_type, F.lit(f"_counts")) ) df = self.df_st_asin.groupby([f'{cal_type_complete}']). \ pivot(f"{cal_type}_data_type").count() df = df.fillna(0) # df.show(10, truncate=False) df = df.withColumn( f"{cal_type}_sb_counts", df[f"{cal_type}_sb1_counts"] + df[f"{cal_type}_sb2_counts"] + df[f"{cal_type}_sb3_counts"] ) df = df.withColumn( f"{cal_type}_adv_counts", df[f"{cal_type}_sb_counts"] + df[f"{cal_type}_sp_counts"] ) df = df.withColumn(f"site_name", F.lit(self.site_name)) df = df.withColumn(f"date_type", F.lit(self.date_type)) df = df.withColumn(f"date_info", F.lit(self.date_info)) # df.show(10, truncate=False) return df def handle_st_asin_ao(self): print("计算st和asin各自维度的ao") # asin_ao_val self.df_save_asin = self.df_save_asin.withColumn( "asin_ao_val", self.df_save_asin.asin_adv_counts / self.df_save_asin.asin_zr_counts ) self.df_save_asin = self.df_save_asin.fillna({"asin_ao_val": 0}) # st_ao_val和st_ao_val_rate df_asin_ao = self.df_save_asin.select("asin", "asin_ao_val") df_st_ao = self.df_save_st_asin.select("search_term", "asin").join( df_asin_ao, on=['asin'], how='left' ) df_st_ao = df_st_ao.groupby(["search_term"]).agg({"asin_ao_val": "mean"}) df_st_ao = df_st_ao.withColumnRenamed("avg(asin_ao_val)", "st_ao_val") self.df_save_st = self.df_save_st.join( df_st_ao, on=['search_term'], how='left' ) window = Window.orderBy(self.df_save_st.st_ao_val.asc()) self.df_save_st = self.df_save_st.withColumn("st_ao_val_rate", F.percent_rank().over(window=window)) def handle_st_asin_bsr_orders(self): self.df_save_st_asin = self.df_save_st_asin.join( self.df_asin_history, on=['asin'], how='left' ) self.df_save_st_asin = self.df_save_st_asin.join( self.df_bs_report, on=['asin_rank', 'bsr_cate_1_id'], how='left' ) # self.df_st_asin_duplicated.show(10, truncate=False) df_st_bsr_orders = self.df_save_st_asin.groupby(['search_term']).agg({"asin_bsr_orders": "sum"}) df_st_bsr_orders = df_st_bsr_orders.withColumnRenamed( "sum(asin_bsr_orders)", "st_bsr_orders" ) df_asin_bsr_orders = self.df_save_st_asin.select("asin", "asin_bsr_orders").drop_duplicates(['asin']) # df_st_bsr_orders.show(10, truncate=False) # df_asin_bsr_orders.show(10, truncate=False) self.df_save_st = self.df_save_st.join( df_st_bsr_orders, on='search_term', how='left' ) self.df_save_asin = self.df_save_asin.join( df_asin_bsr_orders, on='asin', how='left' ) def handle_st_asin_orders(self): print("计算zr, sp预估销量") # 1. st+asin维度的zr和sp预估销量 self.df_save_st_asin = self.df_save_st_asin.withColumn( "st_asin_zr_orders", F.ceil(self.df_save_st_asin.st_asin_zr_rate * self.df_save_st_asin.st_search_sum) ).withColumn( "st_asin_sp_orders", F.ceil(self.df_save_st_asin.st_asin_sp_rate * self.df_save_st_asin.st_search_sum) ) # self.df_save_st_asin.show(10, truncate=False) self.df_save_st_asin = self.df_save_st_asin.select( "search_term", "asin", "st_asin_zr_orders", "st_asin_sp_orders", ) self.df_save_st_asin = self.df_save_st_asin.groupby(["search_term", "asin"]).agg( { "st_asin_zr_orders": "mean", "st_asin_sp_orders": "mean", } ) self.df_save_st_asin = self.df_save_st_asin.withColumnRenamed( "avg(st_asin_zr_orders)", "st_asin_zr_orders" ).withColumnRenamed( "avg(st_asin_sp_orders)", "st_asin_sp_orders" ) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"site_name", F.lit(self.site_name)) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_type", F.lit(self.date_type)) self.df_save_st_asin = self.df_save_st_asin.withColumn(f"date_info", F.lit(self.date_info)) print("self.df_save_st_asin:", self.df_save_st_asin.count()) # self.df_save_st_asin.show(10, truncate=False) # 2. st维度的zr和sp预估销量 df_st_orders = self.df_save_st_asin.groupby(['search_term']).agg( { "st_asin_zr_orders": "sum", "st_asin_sp_orders": "sum", } ) df_st_orders = df_st_orders.withColumnRenamed( "sum(st_asin_zr_orders)", "st_zr_orders" ).withColumnRenamed( "sum(st_asin_sp_orders)", "st_sp_orders" ) self.df_save_st = self.df_save_st.join( df_st_orders, on=['search_term'], how='left' ) # 3. asin维度的zr和sp预估销量 df_asin_orders = self.df_save_st_asin.groupby(['asin']).agg( { "st_asin_zr_orders": "mean", "st_asin_sp_orders": "mean", } ) df_asin_orders = df_asin_orders.withColumnRenamed( "avg(st_asin_zr_orders)", "asin_zr_orders" ).withColumnRenamed( "avg(st_asin_sp_orders)", "asin_sp_orders" ) self.df_save_asin = self.df_save_asin.join( df_asin_orders, on=['asin'], how='left' ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:day/week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-月-日/年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwdStMeasure(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()