""" author: 方星钧(ffman) description: 基于dim_st_asin_base_info等表,计算出search_term和asin维度的基础信息表(包括预估销量) table_read_name: dim_st_asin_info, ods_rank_flow table_save_name: dwd_st_asin_info table_save_level: dwd version: 3.0 created_date: 2022-05-12 updated_date: 2022-12-15 """ import os import sys from pyspark.storagelevel import StorageLevel sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates #from AmazonSpider.pyspark_job.utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType class DwdStAsinInfo(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwd_st_asin_info" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.df_save = self.spark.sql(f"select 1+1;") # self.df_save_std = self.spark.sql(f"select * from {self.db_save} limit 0;") self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_rank_flow = self.spark.sql(f"select 1+1;") self.df_st = self.spark.sql(f"select 1+1;") self.df_zr_page1_counts = self.spark.sql(f"select 1+1;") self.week_counts = 1 if self.date_type == 'week' else len(self.year_week_tuple) self.partitions_by = ['site_name', 'date_type', 'date_info'] if self.date_type in ["week"]: self.reset_partitions(200) elif self.date_type in ["month", "4_week"]: self.reset_partitions(400) elif self.date_type in ["quarter"]: self.reset_partitions(600) self.u_week_counts_flag = self.spark.udf.register('u_week_counts_flag', self.udf_week_counts_flag, IntegerType()) self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType()) self.u_st_type = self.spark.udf.register('u_st_type', self.udf_st_type, StringType()) @staticmethod def udf_week_counts_flag(zr_page1_counts, week_counts): if zr_page1_counts == week_counts: return 1 else: return 0 @staticmethod def udf_year_week(dt): year, week = dt.split("-")[0], dt.split("-")[1] if int(week) < 10: return f"{year}-0{week}" else: return f"{year}-{week}" @staticmethod def udf_st_type(st_asin_zr_rate, zr_page1_flag, st_search_num, st_click_share1, st_conversion_share1, st_click_share2, st_conversion_share2, st_click_share3, st_conversion_share3): st_click_share_sum = st_click_share1 + st_click_share2 + st_click_share3 st_conversion_share_sum = st_conversion_share1 + st_conversion_share2 + st_conversion_share3 st_type_list = [] if st_asin_zr_rate >= 0.05: st_type_list.append('1') # 主要流量词 if zr_page1_flag: if st_search_num < 10000: st_type_list.append('2') # 精准长尾词 else: st_type_list.append('3') # 精准流量词 if (st_conversion_share_sum - st_click_share_sum) / st_click_share_sum >= 0.2: st_type_list.append('4') # 转化优质词 else: st_type_list.append('5') # 转化平稳词 if (st_click_share_sum - st_conversion_share_sum) / st_click_share_sum >= 0.2: st_type_list.append('6') # 转化流失词 if st_conversion_share_sum > 0: st_type_list.append('7') # 出单词 if st_click_share_sum > 0 and st_conversion_share_sum == 0: st_type_list.append('8') # 无效曝光词 return ",".join(st_type_list) if st_type_list else '' def read_data(self): print("1.1 读取dim_st_asin_info表") sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin = self.df_st_asin.withColumnRenamed("updated_time", "updated_at") self.df_st_asin.show(10, truncate=False) print("1.2 读取ods_rank_flow表") sql = f"select rank as st_asin_zr_page_rank, rank as st_asin_sp_page_rank, flow as st_asin_zr_rate, flow as st_asin_sp_rate from ods_rank_flow " \ f"where site_name='{self.site_name}'" self.df_rank_flow = self.spark.sql(sql).cache() self.df_rank_flow.show(10, truncate=False) print("1.3 读取dim_st_info表") # sql = f"select search_term, st_rank, st_rank_avg, st_search_num, st_search_rate, st_search_sum from dim_st_info " \ # f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info in {self.date_info_tuple};" sql = f"select search_term, st_rank, st_rank as st_rank_avg, st_search_num, st_search_rate, st_search_sum, " \ f"st_click_share1, st_conversion_share1, st_click_share2, st_conversion_share2, st_click_share3, st_conversion_share3 from dim_st_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st = self.spark.sql(sql).cache() self.df_st = self.df_st.fillna(0) self.df_st.show(10, truncate=False) def handle_data(self): self.handle_st_zr_page1_counts() self.handle_st_duplicated() self.handle_st_asin_pivot() self.handle_st_asin_orders() self.handle_st_type() self.handle_st_dtypes() self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) # self.df_save.show(20, truncate=False) def handle_st_zr_page1_counts(self): print("2.1 计算zr类型下,关键词对应的asin在选择的历史周中page=1出现的次数,如果全出现=1,否则=0") self.df_zr_page1_counts = self.df_st_asin.filter("data_type='zr' and page=1").\ groupby(['search_term', 'asin']).\ agg(F.count_distinct("date_info").alias("zr_page1_counts")) self.df_zr_page1_counts = self.df_zr_page1_counts.withColumn("week_counts", F.lit(self.week_counts)) self.df_zr_page1_counts = self.df_zr_page1_counts.withColumn( "zr_page1_flag", self.u_week_counts_flag("zr_page1_counts", "zr_page1_counts") ) # self.df_zr_page1_counts.show(10, truncate=False) def handle_st_duplicated(self): print("2.2 根据search_term,asin,data_type进行去重, page_rank选择最小值") window = Window.partitionBy(['search_term', 'asin', 'data_type']).orderBy( self.df_st_asin.page_rank.asc(), self.df_st_asin.date_info.desc(), ) self.df_st_asin = self.df_st_asin. \ withColumn("page_rank_top", F.row_number().over(window=window)) # print("self.df_st_asin, 开窗去重前:", self.df_st_asin_base_info.count()) self.df_st_asin = self.df_st_asin.filter("page_rank_top=1") # print("self.df_st_asin, 开窗去重后:", self.df_st_asin.count()) self.df_st_asin = self.df_st_asin.cache() # self.df_st_asin_base_info.show(10, truncate=False) def handle_st_asin_pivot(self): print(f"2.3 根据search_term和asin进行透视表") self.df_st_asin = self.df_st_asin. \ withColumn("updated_at_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_updated_at"))). \ withColumn("page_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page"))). \ withColumn("page_row_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_row"))). \ withColumn("page_rank_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_rank"))) df1 = self.df_st_asin.select("search_term", "asin", "updated_at_data_type", "updated_at"). \ withColumnRenamed("updated_at_data_type", "pivot_key"). \ withColumnRenamed("updated_at", "pivot_value") df2 = self.df_st_asin.select("search_term", "asin", "page_data_type", "page") # page_row和page_rank: 只有zr,sp才有 self.df_st_asin = self.df_st_asin.filter("data_type in ('zr', 'sp')") df3 = self.df_st_asin.select("search_term", "asin", "page_row_data_type", "page_row") df4 = self.df_st_asin.select("search_term", "asin", "page_rank_data_type", "page_rank") self.df_save = df1.union(df2).union(df3).union(df4) self.df_save = self.df_save.groupby(["search_term", "asin"]). \ pivot(f"pivot_key").agg(F.min(f"pivot_value")). \ join(self.df_zr_page1_counts, on=["search_term", "asin"], how="left"). \ join(self.df_rank_flow.select("st_asin_zr_page_rank", "st_asin_zr_rate"), on=["st_asin_zr_page_rank"], how="left"). \ join(self.df_rank_flow.select("st_asin_sp_page_rank", "st_asin_sp_rate"), on=["st_asin_sp_page_rank"], how="left"). \ join(self.df_st_info, on=["search_term"], how="inner") # ["search_term", "dt"] self.df_save = self.df_save.fillna( { "st_asin_zr_rate": 0, "st_asin_sp_rate": 0 } ) def handle_st_asin_orders(self): print("2.4 计算zr, sp预估销量") self.df_save = self.df_save.withColumn( "st_asin_zr_orders", F.ceil(self.df_save.st_asin_zr_rate * self.df_save.st_search_sum) ).withColumn( "st_asin_sp_orders", F.ceil(self.df_save.st_asin_sp_rate * self.df_save.st_search_sum) ) self.df_save = self.df_save.withColumn( "asin_st_zr_orders", self.df_save.st_asin_zr_orders ).withColumn( "asin_st_sp_orders", self.df_save.st_asin_sp_orders ) df_asin_st_zr_orders_sum = self.df_save.groupby(['asin']). \ agg({"st_asin_zr_orders": "sum"}) df_asin_st_sp_orders_sum = self.df_save.groupby(['asin']). \ agg({"st_asin_sp_orders": "sum"}) df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "asin_st_zr_orders_sum") df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "asin_st_sp_orders_sum") df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1)) df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1)) df_st_asin_zr_orders_sum = self.df_save.groupby(['search_term']). \ agg({"st_asin_zr_orders": "sum"}) df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "st_asin_zr_orders_sum") df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1)) df_st_asin_sp_orders_sum = self.df_save.groupby(['search_term']). \ agg({"st_asin_sp_orders": "sum"}) df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "st_asin_sp_orders_sum") df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1)) self.df_save = self.df_save.withColumn("is_zr_flag", F.when(self.df_save.st_asin_zr_page > 0, 1)) self.df_save = self.df_save.withColumn("is_sp_flag", F.when(self.df_save.st_asin_sp_page > 0, 1)) self.df_save = self.df_save. \ join(df_asin_st_zr_orders_sum, on=['asin', "is_zr_flag"], how='left'). \ join(df_asin_st_sp_orders_sum, on=['asin', "is_sp_flag"], how='left'). \ join(df_st_asin_zr_orders_sum, on=['search_term', "is_zr_flag"], how='left'). \ join(df_st_asin_sp_orders_sum, on=['search_term', "is_sp_flag"], how='left') self.df_save = self.df_save.withColumn( "st_asin_zr_flow", self.df_save.st_asin_zr_orders / self.df_save.st_asin_zr_orders_sum ) self.df_save = self.df_save.withColumn( "st_asin_sp_flow", self.df_save.st_asin_sp_orders / self.df_save.st_asin_sp_orders_sum ) self.df_save = self.df_save.withColumn( "asin_st_zr_flow", self.df_save.asin_st_zr_orders / self.df_save.asin_st_zr_orders_sum ) self.df_save = self.df_save.withColumn( "asin_st_sp_flow", self.df_save.asin_st_sp_orders / self.df_save.asin_st_sp_orders_sum ) self.df_save = self.df_save.drop("is_zr_flag", "is_sp_flag") print("self.df_save.columns:", self.df_save.columns) # self.df_save.show(10, truncate=False) def handle_st_type(self): print("2.5 根据search_term,asin等信息进行计算关键词的分类情况") self.df_save = self.df_save.withColumn( "st_type", self.u_st_type( "st_asin_zr_rate", "zr_page1_flag", "st_search_num", "st_click_share1", "st_conversion_share1", "st_click_share2", "st_conversion_share2", "st_click_share3", "st_conversion_share3" ) ) def handle_st_dtypes(self): print("2.5 更改pivot之后的列的数据类型, 保持和hive的数据类型一致") for col in self.df_save.columns: if ("_page" in col) or ("_page_row" in col) or ("_page_rank" in col): print("col:", col) self.df_save = self.df_save.withColumn(col, self.df_save[f'{col}'].cast("int")) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-01 handle_obj = DwdStAsinInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()