import os import sys from pyspark.storagelevel import StorageLevel sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType class DwTStAsinReverse(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_st_asin_reverse" self.spark = self.create_spark_object(app_name=f"{self.db_save}, {self.site_name}, {self.date_type}, {self.date_info}") self.get_date_info_tuple() self.get_year_week_tuple() self.get_year_month_days_dict(year=int(self.year)) self.df_st_asin = self.spark.sql(f"select 1+1;") self.df_st_asin_flow = self.spark.sql(f"select 1+1;") self.df_st = self.spark.sql(f"select 1+1;") self.df_st_measure = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;") self.df_st_brand_label = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.df_save_std = self.spark.sql(f"select * from {self.db_save} limit 0;") self.u_st_type = self.spark.udf.register('u_st_type', self.udf_st_type, StringType()) self.partitions_by = ['site_name', 'date_type', 'date_info'] if self.date_type in ["week"]: self.reset_partitions(400) else: self.reset_partitions(1000) @staticmethod def udf_st_type(st_asin_zr_rate, zr_page1_flag, st_search_num, st_click_share_sum, st_conversion_share_sum): st_type_list = [] if st_asin_zr_rate >= 0.05: st_type_list.append('1') # 主要流量词 if zr_page1_flag == 1: if st_search_num < 10000: st_type_list.append('2') # 精准长尾词 else: st_type_list.append('3') # 精准流量词 if st_click_share_sum > 0: if (st_conversion_share_sum - st_click_share_sum) / st_click_share_sum >= 0.2: st_type_list.append('4') # 转化优质词 else: st_type_list.append('5') # 转化平稳词 if (st_click_share_sum - st_conversion_share_sum) / st_click_share_sum >= 0.2: st_type_list.append('6') # 转化流失词 if st_conversion_share_sum > 0: st_type_list.append('7') # 出单词 if st_click_share_sum > 0 and st_conversion_share_sum == 0: st_type_list.append('8') # 无效曝光词 return ",".join(st_type_list) if st_type_list else '' def read_data(self): print("1 读取st+asin两个维度: dim_st_asin_info表和ods_rank_flow表") print("1.1 读取dim_st_asin_info表") # if (int(self.year) == 2022 and int(self.month) < 10) or int(self.year) <= 2021: # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info='{self.date_info}'" # else: # sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='day' and date_info in {self.date_info_tuple}" # 测试: and date_info>='2023-01-19' if date_type in ['month', 'month_week'] and ((self.site_name == 'us' and date_info >= '2023-10') or (self.site_name in ['uk', 'de'] and self.date_info >= '2024-05')): sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}'" else: sql = f"select * from dim_st_asin_info where site_name='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple}" print("sql:", sql) self.df_st_asin = self.spark.sql(sqlQuery=sql) self.df_st_asin.persist(storageLevel=StorageLevel.MEMORY_ONLY) # self.df_st_asin = self.spark.sql(sqlQuery=sql).cache() self.df_st_asin = self.df_st_asin.withColumnRenamed("updated_time", "updated_at") self.df_st_asin.show(10, truncate=False) print("1.2 读取ods_rank_flow表") # sql = f"select rank as page_rank, flow from ods_rank_flow " \ # f"where site_name='{self.site_name}'" sql = f"select rank as st_asin_zr_page_rank, rank as st_asin_sp_page_rank, flow as st_asin_zr_rate, flow as st_asin_sp_rate from ods_rank_flow " \ f"where site_name='{self.site_name}'" self.df_st_asin_flow = self.spark.sql(sql).cache() self.df_st_asin_flow.show(10, truncate=False) print("1.3 读取dim_st_detail表") sql = f"select search_term, st_rank, st_search_num, st_search_rate, st_search_sum, " \ f"st_quantity_being_sold, st_click_share_sum, st_conversion_share_sum from dim_st_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st = self.spark.sql(sql).cache() self.df_st = self.df_st.fillna(0) self.df_st.show(10, truncate=False) print("1.4 读取dwd_st_measure表") sql = f"select search_term, st_adv_counts, st_ao_val, st_zr_page1_title_appear_rate as zr_page1_flag from dwd_st_measure " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_measure = self.spark.sql(sql).cache() self.df_st_measure.show(10, truncate=False) print("1.5 读取ods_st_key表") sql = f"select st_key, search_term from ods_st_key " \ f"where site_name='{self.site_name}';" print("sql:", sql) self.df_st_key = self.spark.sql(sql).cache() self.df_st_key.show(10, truncate=False) print("1.6 读取dws_st_brand_info表") sql = f"select search_term, st_brand_label from dws_st_brand_info " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_brand_label = self.spark.sql(sql).cache() self.df_st_brand_label.show(10, truncate=False) def handle_data(self): self.handle_st_duplicated() self.handle_st_asin_pivot() self.handle_st_asin_orders() self.handle_st_type() self.handle_st_dtypes() # self.handle_st_current_page_asin_counts() self.df_save = self.df_save.withColumn("site_name", F.lit(self.site_name)) self.df_save = self.df_save.withColumn("date_type", F.lit(self.date_type)) self.df_save = self.df_save.withColumn("date_info", F.lit(self.date_info)) self.df_save = self.df_save.drop("zr_page1_flag", "st_click_share_sum", "st_conversion_share_sum") self.df_save = self.df_save_std.unionByName(self.df_save, allowMissingColumns=True) # self.df_save.show(20, truncate=False) print("cols:", self.df_save.columns) # quit() def handle_st_duplicated(self): print("2.2 根据search_term,asin,data_type进行去重, page_rank选择最小值") window = Window.partitionBy(['search_term', 'asin', 'data_type']).orderBy( self.df_st_asin.page_rank.asc(), self.df_st_asin.date_info.desc(), ) self.df_st_asin = self.df_st_asin. \ withColumn("page_rank_top", F.row_number().over(window=window)) # print("self.df_st_asin_info, 开窗去重前:", self.df_st_asin_info.count()) self.df_st_asin = self.df_st_asin.filter("page_rank_top=1") # print("self.df_st_asin_info, 开窗去重后:", self.df_st_asin_info.count()) self.df_st_asin = self.df_st_asin.cache() # self.df_st_asin = self.df_st_asin.persist(storageLevel=StorageLevel.MEMORY_AND_DISK) # self.df_st_asin.show(10, truncate=False) def handle_st_asin_pivot(self): print(f"2.3 根据search_term和asin进行透视表") self.df_st_asin = self.df_st_asin. \ withColumn("updated_at_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_updated_at"))). \ withColumn("page_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page"))). \ withColumn("page_row_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_row"))). \ withColumn("page_rank_data_type", F.concat(F.lit("st_asin_"), self.df_st_asin.data_type, F.lit("_page_rank"))) df1 = self.df_st_asin.select("search_term", "asin", "updated_at_data_type", "updated_at"). \ withColumnRenamed("updated_at_data_type", "pivot_key"). \ withColumnRenamed("updated_at", "pivot_value") df2 = self.df_st_asin.select("search_term", "asin", "page_data_type", "page") # page_row和page_rank: 只有zr,sp才有 self.df_st_asin = self.df_st_asin.filter("data_type in ('zr', 'sp')") df3 = self.df_st_asin.select("search_term", "asin", "page_row_data_type", "page_row") df4 = self.df_st_asin.select("search_term", "asin", "page_rank_data_type", "page_rank") self.df_save = df1.union(df2).union(df3).union(df4) df_st_zr_counts = self.df_st_asin.filter("data_type='zr'").groupby(["search_term", "page"]).agg( F.max('page_row').alias("st_zr_current_page_asin_counts")) df_st_sp_counts = self.df_st_asin.filter("data_type='sp'").groupby(["search_term", "page"]).agg( F.max('page_row').alias("st_sp_current_page_asin_counts")) df_st_zr_counts = df_st_zr_counts.withColumnRenamed("page", "st_asin_zr_page") df_st_sp_counts = df_st_sp_counts.withColumnRenamed("page", "st_asin_sp_page") self.df_save = self.df_save.groupby(["search_term", "asin"]). \ pivot(f"pivot_key").agg(F.min(f"pivot_value")). \ join(self.df_st_asin_flow.select("st_asin_zr_page_rank", "st_asin_zr_rate"), on=["st_asin_zr_page_rank"], how="left"). \ join(self.df_st_asin_flow.select("st_asin_sp_page_rank", "st_asin_sp_rate"), on=["st_asin_sp_page_rank"], how="left"). \ join(self.df_st_measure, on=["search_term"], how="left"). \ join(self.df_st_key, on=["search_term"], how="left"). \ join(self.df_st_brand_label, on=["search_term"], how="left"). \ join(self.df_st, on=["search_term"], how="inner").join( df_st_zr_counts, on=["search_term", "st_asin_zr_page"], how='left' ).join(df_st_sp_counts, on=["search_term", "st_asin_sp_page"], how='left') # join(self.df_st_measure, on=["search_term"], how="inner"). \ # join(self.df_st_key, on=["search_term"], how="inner"). \ self.df_save = self.df_save.fillna( { "st_asin_zr_rate": 0, "st_asin_sp_rate": 0 } ) # 释放内存 del self.df_st_asin self.df_save.persist(storageLevel=StorageLevel.MEMORY_ONLY) def handle_st_asin_orders(self): print("2.4 计算zr, sp预估销量") self.df_save = self.df_save.withColumn( "st_asin_zr_orders", F.ceil(self.df_save.st_asin_zr_rate * self.df_save.st_search_sum) ).withColumn( "st_asin_sp_orders", F.ceil(self.df_save.st_asin_sp_rate * self.df_save.st_search_sum) ) self.df_save = self.df_save.withColumn( "asin_st_zr_orders", self.df_save.st_asin_zr_orders ).withColumn( "asin_st_sp_orders", self.df_save.st_asin_sp_orders ) df_asin_st_zr_orders_sum = self.df_save.groupby(['asin']). \ agg({"st_asin_zr_orders": "sum"}) df_asin_st_sp_orders_sum = self.df_save.groupby(['asin']). \ agg({"st_asin_sp_orders": "sum"}) df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "asin_st_zr_orders_sum") df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "asin_st_sp_orders_sum") df_asin_st_zr_orders_sum = df_asin_st_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1)) df_asin_st_sp_orders_sum = df_asin_st_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1)) df_st_asin_zr_orders_sum = self.df_save.groupby(['search_term']). \ agg({"st_asin_zr_orders": "sum"}) df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumnRenamed("sum(st_asin_zr_orders)", "st_asin_zr_orders_sum") df_st_asin_zr_orders_sum = df_st_asin_zr_orders_sum.withColumn(f"is_zr_flag", F.lit(1)) df_st_asin_sp_orders_sum = self.df_save.groupby(['search_term']). \ agg({"st_asin_sp_orders": "sum"}) df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumnRenamed("sum(st_asin_sp_orders)", "st_asin_sp_orders_sum") df_st_asin_sp_orders_sum = df_st_asin_sp_orders_sum.withColumn(f"is_sp_flag", F.lit(1)) self.df_save = self.df_save.withColumn("is_zr_flag", F.when(self.df_save.st_asin_zr_page > 0, 1)) self.df_save = self.df_save.withColumn("is_sp_flag", F.when(self.df_save.st_asin_sp_page > 0, 1)) self.df_save = self.df_save. \ join(df_asin_st_zr_orders_sum, on=['asin', "is_zr_flag"], how='left'). \ join(df_asin_st_sp_orders_sum, on=['asin', "is_sp_flag"], how='left'). \ join(df_st_asin_zr_orders_sum, on=['search_term', "is_zr_flag"], how='left'). \ join(df_st_asin_sp_orders_sum, on=['search_term', "is_sp_flag"], how='left') self.df_save = self.df_save.withColumn( "st_asin_zr_flow", F.round(self.df_save.st_asin_zr_orders / self.df_save.st_asin_zr_orders_sum, 4) ) self.df_save = self.df_save.withColumn( "st_asin_sp_flow", F.round(self.df_save.st_asin_sp_orders / self.df_save.st_asin_sp_orders_sum, 4) ) self.df_save = self.df_save.withColumn( "asin_st_zr_flow", F.round(self.df_save.asin_st_zr_orders / self.df_save.asin_st_zr_orders_sum, 4) ) self.df_save = self.df_save.withColumn( "asin_st_sp_flow", F.round(self.df_save.asin_st_sp_orders / self.df_save.asin_st_sp_orders_sum, 4) ) self.df_save = self.df_save.drop("is_zr_flag", "is_sp_flag") print("self.df_save.columns:", self.df_save.columns) def handle_st_type(self): print("2.5 根据search_term,asin等信息进行计算关键词的分类情况") self.df_save = self.df_save.withColumn( "st_type", self.u_st_type( "st_asin_zr_rate", "zr_page1_flag", "st_search_num", "st_click_share_sum", "st_conversion_share_sum" ) ) def handle_st_dtypes(self): print("2.5 更改pivot之后的列的数据类型, 保持和hive的数据类型一致") for col in self.df_save.columns: if ("_page" in col) or ("_page_row" in col) or ("_page_rank" in col): print("col:", col) self.df_save = self.df_save.withColumn(col, self.df_save[f'{col}'].cast("int")) def handle_st_current_page_asin_counts(self): df_st_zr_counts = self.df_st_asin.filter("data_type='zr'").groupby(["search_term", "page"]).agg(F.max('page_row').alias("st_zr_current_page_asin_counts")) df_st_sp_counts = self.df_st_asin.filter("data_type='sp'").groupby(["search_term", "page"]).agg(F.max('page_row').alias("st_sp_current_page_asin_counts")) df_st_zr_counts = df_st_zr_counts.withColumnRenamed("page", "st_asin_zr_page") df_st_sp_counts = df_st_sp_counts.withColumnRenamed("page", "st_asin_sp_page") self.df_save = self.df_save.join( df_st_zr_counts, on=["search_term", "st_asin_zr_page"], how='left' ).join( df_st_sp_counts, on=["search_term", "st_asin_sp_page"], how='left' ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwTStAsinReverse(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()