""" 1. 热搜词,上升词,新出词,在售商品数等 2. 预估销量 3. bs销量, bs的category_id 4,st_ao_val """ """ author: 方星钧(ffman) description: 基于dwd层等表,计算出search_term和asin维度的基础信息表(包括预估销量) table_read_name: dwd_st_counts系列, dwd_st_info系列, dwd_st_asin_info系列, dwd_asin_bs_info table_save_name: dwt_st_info系列 table_save_level: dwt version: 2.0 created_date: 2022-06-20 updated_date: 2022-12-25 """ import os import sys sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType from sqlalchemy import create_engine import pandas as pd class DwtStInfo(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-01"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dwt_st_info" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.get_date_info_tuple() self.df_save = self.spark.sql(f"select 1+1;") self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_bs_report = self.spark.sql(f"select 1+1;") self.df_st_detail = self.spark.sql(f"select 1+1;") self.df_st_counts = self.spark.sql(f"select 1+1;") self.df_st_asin_info = self.spark.sql(f"select 1+1;") self.df_st_asin_zr = self.spark.sql(f"select 1+1;") self.df_st_key = self.spark.sql(f"select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(1) self.u_is_title_appear = self.spark.udf.register("u_is_title_appear", self.udf_is_title_appear, IntegerType()) print("self.date_info_tuple:", self.date_info_tuple) self.current_date = self.date_info_tuple[0] print("self.current_date:", self.current_date) @staticmethod def udf_is_title_appear(search_term, title): if str(search_term).lower() in str(title).lower(): return 1 else: return 0 def read_data(self): print("1.1 读取asin维度: dim_cal_asin_history_detail表") sql = f"select asin, bsr_cate_1_id, bsr_cate_current_id as st_asin_bs_cate_current_id, " \ f"asin_rank, asin_title, asin_launch_time, asin_price as asin1_price, asin_rating as asin1_rating, " \ f"asin_total_comments as asin1_total_comments " \ f"from dim_cal_asin_history_detail " \ f"where site_name='{self.site_name}';" print("sql:", sql) self.df_asin_detail = self.spark.sql(sql).cache() self.df_asin_detail.show(10) print("1.2 读取bsr维度: ods_one_category_report表") sql = f"select cate_1_id as bsr_cate_1_id, rank as asin_rank, orders as asin_bs_orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and date_type='month' and date_info='{self.year}-{self.month}';" print("sql:", sql) self.df_bs_report = self.spark.sql(sqlQuery=sql).cache() self.df_bs_report.show(10) print("1.3 读取dim_st_detail系列表") sql = f"select * from dim_st_detail " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_detail = self.spark.sql(sql).cache() self.df_st_detail.show(10, truncate=False) print("1.4 读取dwd_st_counts系列表") sql = f"select search_term, st_ao_val, st_ao_val_rank, st_ao_val_rate, st_zr_counts, st_sp_counts, " \ f"st_sb_counts, st_sb1_counts, st_sb2_counts, st_sb3_counts, st_adv_counts, " \ f"st_ac_counts, st_bs_counts, st_er_counts, st_tr_counts " \ f" from dwd_st_counts " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_counts = self.spark.sql(sql).cache() self.df_st_counts.show(10, truncate=False) print("1.5 读取dwd_st_asin_info系列表") sql = f"select search_term, asin, st_asin_zr_orders as st_asin_orders, st_asin_zr_orders_sum as st_asin_orders_sum from dwd_st_asin_info " \ f"where site_name='{self.site_name}' and date_type='{self.date_type}' and date_info = '{self.date_info}';" print("sql:", sql) self.df_st_asin_info = self.spark.sql(sql).cache() self.df_st_asin_info.show(10, truncate=False) sql = f"select search_term, asin from ods_search_term_zr " \ f"where site_name ='{self.site_name}' and date_type='week' and date_info in {self.year_week_tuple} and page=1" self.df_st_asin_zr = self.spark.sql(sql).cache() self.df_st_asin_zr = self.df_st_asin_zr.drop_duplicates(['search_term', 'asin']) self.df_st_asin_zr.show(10) print("1.6 读取ods_st_key表") sql = f"select st_key, search_term from ods_st_key " \ f"where site_name='{self.site_name}';" print("sql:", sql) self.df_st_key = self.spark.sql(sql).cache() self.df_st_key.show(10, truncate=False) def handle_data(self): self.handle_join() self.handle_data_asin_new() self.handle_data_st_orders() self.handle_data_asin_detail() self.handle_st_asin_zr_title() self.df_save = self.df_st_detail print(self.df_save.columns) self.df_save.show(10) self.df_save = self.df_save.drop("st_updated_time", "st_bsr_cate_1_id_new", "st_bsr_cate_current_id_new") # quit() def handle_join(self): # st维度 self.df_st_detail = self.df_st_detail.join( self.df_st_counts, on="search_term", how="left" ).join( self.df_st_key, on="search_term", how="left" ) # asin维度 self.df_asin_detail = self.df_asin_detail.join( self.df_bs_report, on=['asin_rank', 'bsr_cate_1_id'], how='left' ) # self.df_asin_detail = self.df_asin_detail.drop("bsr_cate_1_id") # st+asin维度 self.df_st_asin_zr = self.df_st_asin_zr.join( self.df_asin_detail.select("asin", "asin_title"), on='asin', how='left' ) def handle_data_asin_new(self): """ 1. 对self.df_asin_bs_info对象,选择asin最新一周的数据,并删掉不需要的字段 2. 获取asin """ # 获取新品的判定 self.df_asin_detail = self.df_asin_detail.withColumn("current_date", F.lit(self.date_info_tuple[-1])) self.df_asin_detail = self.df_asin_detail.withColumn("days_diff", F.datediff("current_date", "asin_launch_time")) self.df_asin_detail = self.df_asin_detail.withColumn( "asin_new_flag", F.when( self.df_asin_detail.days_diff > 180, 0 ).when( self.df_asin_detail.days_diff > 0, 1 ).otherwise(2) ) self.df_asin_detail.show(10, truncate=False) def handle_data_st_orders(self): """ 计算关键词维度的st_asin_bs_orders_sum和st_asin_orders_sum """ self.df_st_asin_info = self.df_st_asin_info.join( self.df_asin_detail.select("asin", "asin_bs_orders", "asin_new_flag"), on="asin", how="left" ) # df_st_search_sum = self.df_st_asin_info.groupby(['search_term']). \ # agg({"st_search_sum": "max"}) # df_st_search_sum = df_st_search_sum.withColumnRenamed("max(st_search_sum)", "st_search_sum") self.df_st_asin_info = self.df_st_asin_info.withColumnRenamed("asin_bs_orders", "st_asin_bs_orders") df_st_asin_bs_orders_sum = self.df_st_asin_info.groupby(['search_term']). \ agg({"st_asin_bs_orders": "sum"}) df_st_asin_bs_orders_sum = df_st_asin_bs_orders_sum.withColumnRenamed("sum(st_asin_bs_orders)", "st_asin_bs_orders_sum") df_st_asin_orders_sum = self.df_st_asin_info.groupby(['search_term']). \ agg({"st_asin_orders_sum": "max", "asin": "count"}) # df_st_asin_orders_sum.show(10, truncate=False) df_st_asin_orders_sum = df_st_asin_orders_sum.withColumnRenamed("max(st_asin_orders_sum)", "st_asin_orders_sum") df_st_asin_orders_sum = df_st_asin_orders_sum.withColumnRenamed("count(asin)", "st_asin_counts") df_st_asin_new_orders_sum = self.df_st_asin_info.filter("asin_new_flag = 1").groupby(['search_term']). \ agg({"st_asin_orders": "sum", "asin": "count"}) df_st_asin_new_orders_sum = df_st_asin_new_orders_sum.withColumnRenamed("sum(st_asin_orders)", "st_asin_new_orders_sum") df_st_asin_new_orders_sum = df_st_asin_new_orders_sum.withColumnRenamed("count(asin)", "st_asin_new_counts") # df_st_asin_new_orders_sum.show(10, truncate=False) self.df_st_detail = self.df_st_detail.join( df_st_asin_bs_orders_sum, on="search_term", how="left" ).join( df_st_asin_orders_sum, on="search_term", how="left" ).join( df_st_asin_new_orders_sum, on="search_term", how="left" ) self.df_st_detail = self.df_st_detail.withColumn("st_asin_new_orders_rate", self.df_st_detail.st_asin_new_orders_sum/self.df_st_detail.st_asin_orders_sum) self.df_st_detail = self.df_st_detail.withColumn("st_asin_new_counts_rate", self.df_st_detail.st_asin_new_counts/self.df_st_detail.st_asin_counts) def handle_data_asin_detail(self): self.df_st_detail = self.df_st_detail.join( self.df_asin_detail.select("asin", "asin_bs_orders", "asin1_price", "asin1_rating", "asin1_total_comments").withColumnRenamed("asin", "st_top_asin1"), on="st_top_asin1", how="left" ).withColumnRenamed("asin_bs_orders", "st_asin1_bs_orders") def handle_st_asin_zr_title(self): self.df_st_asin_zr = self.df_st_asin_zr.withColumn( "st_asin_in_title_flag", self.u_is_title_appear(self.df_st_asin_zr.search_term, self.df_st_asin_zr.asin_title) ) df_st_zr_page1_counts = self.df_st_asin_zr.groupby("search_term").count() df_st_zr_page1_counts = df_st_zr_page1_counts.withColumnRenamed("count", "st_zr_page1_counts") df_st_zr_page1_in_title_counts = self.df_st_asin_zr.filter("st_asin_in_title_flag=1").groupby( "search_term").count() df_st_zr_page1_in_title_counts = df_st_zr_page1_in_title_counts.withColumnRenamed("count", "st_zr_page1_in_title_counts") df_st_zr_page1_counts = df_st_zr_page1_counts.join( df_st_zr_page1_in_title_counts, on='search_term', how='left' ) df_st_zr_page1_counts = df_st_zr_page1_counts.fillna(0) df_st_zr_page1_counts = df_st_zr_page1_counts.withColumn( "st_zr_page1_in_title_rate", df_st_zr_page1_counts.st_zr_page1_in_title_counts / df_st_zr_page1_counts.st_zr_page1_counts ) df_st_zr_page1_counts.show(10, truncate=False) self.df_st_detail = self.df_st_detail.join( df_st_zr_page1_counts, on='search_term', how='left' ) if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[3] # 参数3:年-周/年-月/年-季, 比如: 2022-1 handle_obj = DwtStInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj.run()