import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F, Window from utils.templates import Templates """ 获取搜索词 top100 相关指标 依赖 dwd_asin_measure 表 dwd_st_asin_measure 表 dim_asin_detail 表 输出为 Dws_top100_asin_info """ class DwsTop100AsinInfo(Templates): def __init__(self, site_name, date_type, date_info): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.db_save = "dws_top100_asin_info" self.df_search_term_asin = self.spark.sql("select 1+1;") self.df_search_term_id = self.spark.sql("select 1+1;") self.df_asin_bsr_orders = self.spark.sql("select 1+1;") self.df_asin_detail = self.spark.sql("select 1+1;") self.df_save = self.spark.sql("select 1+1;") self.partitions_by = ['site_name', 'date_type', 'date_info'] self.reset_partitions(partitions_num=25) partition_dict = { "site_name": self.site_name, "date_type": self.date_type, "date_info": self.date_info, } hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=partition_dict) print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) def read_data(self): sql1 = f""" select search_term, asin from dwd_st_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_search_term_asin = self.spark.sql(sql1).repartition(40, 'search_term', 'asin').cache() self.df_search_term_asin.show(10, truncate=True) sql2 = f""" select cast(st_key as integer) as search_term_id, search_term from ods_st_key where site_name = '{self.site_name}' """ self.df_search_term_id = self.spark.sql(sql2).repartition(40, 'search_term').cache() self.df_search_term_id.show(10, truncate=True) sql3 = f""" select asin, asin_bsr_orders as orders from dwd_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_asin_bsr_orders = self.spark.sql(sql3).repartition(40, 'asin').cache() self.df_asin_bsr_orders.show(10, truncate=True) sql4 = f""" select asin, asin_launch_time, asin_is_new from dim_asin_detail where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.df_asin_detail = self.spark.sql(sql4).repartition(40, 'asin').cache() self.df_asin_detail.show(10, truncate=True) def handle_data(self): self.df_save = self.df_search_term_asin.join( self.df_search_term_id, on='search_term', how='left' ).join( self.df_asin_bsr_orders, on='asin', how='left' ).join( self.df_asin_detail, on='asin', how='left' ) # 取前一百 self.df_save = self.df_save.withColumn( "row_number", F.row_number().over( Window.partitionBy(F.col('search_term_id')).orderBy(F.col("orders").desc_nulls_last()) ) ) self.df_save = self.df_save.filter("row_number <= 100") self.df_save = self.df_save.withColumn( "group_sum", F.sum(F.col('orders')).over( Window.partitionBy(F.col('search_term_id')) ) ) self.df_save = self.df_save.groupby(F.col("search_term_id")).agg( F.first("search_term").alias("search_term"), F.concat_ws(',', F.collect_list("asin")).alias("top100_asin"), F.concat_ws(',', F.collect_list(F.coalesce('orders', F.lit(0)))).alias("top100_orders"), # 市场比例 F.concat_ws(',', F.collect_list(F.coalesce(F.round(F.expr('orders / group_sum'), 4), F.lit(0)))) .alias("top100_market_share"), F.concat_ws(',', F.collect_list(F.coalesce(F.col("asin_is_new"), F.lit('-')))).alias("top100_is_new"), F.concat_ws(',', F.collect_list("row_number")).alias("top100_rank"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info"), ) if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DwsTop100AsinInfo(site_name, date_type, date_info) obj.run()