""" 1. 上架日期 2. 分类id """ """ author: 方星钧(ffman) description: 基于ods_asin_detail历史表,计算出asin的历史数据指标(上架时间+bs分类id) table_read_name: ods_asin_detail, , selection_off_line.dwd_bs_category_asin table_save_name: dim_asin_history_info table_save_level: dwd version: 1.0 created_date: 2022-06-20 updated_date: 2022-06-20 """ import os import sys from pyspark.storagelevel import StorageLevel sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates # from AmazonSpider.pyspark_job.utils.templates import Templates # 分组排序的udf窗口函数 from pyspark.sql.window import Window from pyspark.sql import functions as F from pyspark.sql.types import StringType, IntegerType class DimAsinHistoryInfo(Templates): def __init__(self, site_name="us", date_type="week", date_info="2022-1"): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info self.db_save = f"dim_asin_history_info" self.spark = self.create_spark_object(app_name=f"{self.db_save} {self.site_name}, {self.date_info}") self.df_date = self.get_year_week_tuple() self.df_asin_detail = self.spark.sql(f"select 1+1;") self.df_bs_category = self.spark.sql(f"select 1+1;") self.df_bs_category_report = self.spark.sql(f"select 1+1;") self.df_save = self.spark.sql(f"select 1+1;") self.year_month = list(self.df_date.loc[self.df_date.year_week == f'{self.year_week}'].year_month)[0] # self.reset_partitions_by() self.partitions_by = ['site_name'] self.reset_partitions(20) self.partitions_type = "dt" self.u_year_week = self.spark.udf.register('u_year_week', self.udf_year_week, StringType()) @staticmethod def udf_year_week(dt): year, week = dt.split("-")[0], dt.split("-")[1] if int(week) < 10: return f"{year}-0{week}" else: return f"{year}-{week}" def read_data(self): print("1.1 读取ods_asin_detail和ods_keep_date表") sql = f"select asin, launch_time, site_name, dt, 1 as type, rank, price, rating, total_comments from ods_asin_detail " \ f"where site_name='{self.site_name}' " \ f"union " \ f"select asin, launch_time, site_name, dt, 2 as type, null as rank, null as price, null as rating, null as total_comments from ods_keep_date " \ f"where site_name='{self.site_name}';" self.df_asin_detail = self.spark.sql(sqlQuery=sql).cache() self.df_asin_detail.show(10, truncate=False) print("1.2 读取dwd_bs_category_asin表") sql = f"select asin, cate_1_id, cate_current_id, dt from selection_off_line.dwd_bs_category_asin " \ f"where site='{self.site_name}' and dt !='9999-99';" # '9999-99'分区是去重不符合 self.df_bs_category = self.spark.sql(sqlQuery=sql).cache() self.df_bs_category.show(10, truncate=False) print("1.3 读取ods_one_category_report表") sql = f"select cate_1_id, rank, orders from ods_one_category_report " \ f"where site_name='{self.site_name}' and dm='{self.year_month}';" print("sql:", sql) self.df_bs_category_report = self.spark.sql(sqlQuery=sql).cache() self.df_bs_category_report.show(10, truncate=False) def handle_data(self): self.handle_data_duplicated() self.handle_data_join() self.handle_data_renamed() self.df_save.show(10, truncate=False) def handle_data_duplicated(self): print("2.1 根据asin,dt去重") self.df_asin_detail = self.df_asin_detail.withColumn( "dt_sort", self.u_year_week(self.df_asin_detail.dt) ) self.df_bs_category = self.df_bs_category.withColumn( "dt_sort", self.u_year_week(self.df_bs_category.dt) ) window = Window.partitionBy(['asin']).orderBy( self.df_asin_detail.type.asc_nulls_last(), self.df_asin_detail.dt_sort.desc() ) self.df_asin_detail = self.df_asin_detail.withColumn("asin_dt_top", F.row_number().over(window=window)) self.df_asin_detail = self.df_asin_detail.filter("asin_dt_top=1") # 合并keep_date的launch_time数据 # 这种写法会卡住 # asin_list = self.df_asin_detail.rdd.map(lambda x: x[0]).collect() # self.df_keep_date = self.df_keep_date.filter(~self.df_keep_date.asin.isin(asin_list)) # self.df_asin_detail = self.df_asin_detail.unionByName(self.df_keep_date) window = Window.partitionBy(['asin']).orderBy( self.df_bs_category.dt_sort.desc() ) self.df_bs_category = self.df_bs_category.withColumn("asin_dt_top", F.row_number().over(window=window)) self.df_bs_category = self.df_bs_category.filter("asin_dt_top=1") self.df_asin_detail.groupby('dt_sort').count().show(30, truncate=False) self.df_bs_category.groupby('dt_sort').count().show(30, truncate=False) self.df_asin_detail = self.df_asin_detail.drop("asin_dt_top", "type", "dt_sort") self.df_bs_category = self.df_bs_category.drop("asin_dt_top", "dt", "dt_sort") def handle_data_join(self): self.df_save = self.df_asin_detail.join( self.df_bs_category, on=['asin'], how='left' ).join( self.df_bs_category_report, on=['rank', 'cate_1_id'], how='left' ) # self.df_save = self.df_save.withColumn(f"{self.partitions_type}", F.lit(self.date_info)) # quit() def handle_data_renamed(self): self.df_save = self.df_save.\ withColumnRenamed("launch_time", "asin_launch_time").\ withColumnRenamed("cate_1_id", "asin_bs_cate_1_id").\ withColumnRenamed("cate_current_id", "asin_bs_cate_current_id").\ withColumnRenamed("rank", "asin_rank").\ withColumnRenamed("price", "asin_price"). \ withColumnRenamed("rating", "asin_rating"). \ withColumnRenamed("total_comments", "asin_total_comments").\ withColumnRenamed("orders", "asin_bs_orders") if __name__ == '__main__': site_name = sys.argv[1] # 参数1:站点 # date_type = sys.argv[2] # 参数2:类型:week/4_week/month/quarter date_info = sys.argv[2] # 参数3:年-周/年-月/年-季, 比如: 2022-1 # handle_obj = DwdAsinHistoryInfo(site_name=site_name, date_type=date_type, date_info=date_info) handle_obj = DimAsinHistoryInfo(site_name=site_name, date_info=date_info) handle_obj.run()