import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from utils.templates import Templates """ 获取搜索词对应asin标题最大最小数量等其他指标 依赖 dwd_asin_title_number 表 dwd_st_asin_measure 表 输出为 Dws_st_num_stats """ class DwsStNumStats(Templates): def __init__(self, site_name, date_type, date_info): super().__init__() self.site_name = site_name self.date_type = date_type self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.reset_partitions(partitions_num=1) self.asin_title_num = self.spark.sql("select 1+1;") self.st_asin_count = self.spark.sql("select 1+1;") self.self_asin = self.spark.sql("select 1+1;") self.search_term_id = self.spark.sql("select 1+1;") self.df_save = self.spark.sql("select 1+1;") self.db_save = "dws_st_num_stats" self.partitions_by = ['site_name', 'date_type', 'date_info'] hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dws/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) def read_data(self): print("读取dwd_asin_title_number:") sql1 = f""" select asin, value from dwd_asin_title_number where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' """ self.asin_title_num = self.spark.sql(sql1).repartition(40, 'asin').cache() self.asin_title_num.show(10, truncate=True) print("读取dwd_st_asin_measure:") sql2 = f""" select search_term, asin, count(asin) over (partition by search_term) as asin_count from dwd_st_asin_measure where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}' group by search_term, asin """ self.st_asin_count = self.spark.sql(sql2).repartition(40, 'search_term', 'asin').cache() self.st_asin_count.show(10, truncate=True) print("读取ods_self_asin:") sql3 = f""" select asin as max_num_asin, 1 as is_self_max_num_asin from ods_self_asin where site_name = '{self.site_name}' """ self.self_asin = self.spark.sql(sql3).repartition(40, 'asin').cache() self.self_asin.show(10, truncate=True) print("读取ods_st_key:") sql4 = f""" select cast(st_key as integer) as search_term_id, search_term from ods_st_key where site_name = '{self.site_name}' """ self.search_term_id = self.spark.sql(sql4).repartition(40, 'search_term').cache() self.search_term_id.show(10, truncate=True) def handle_data(self): # st+asin维度与标题数量关联 self.df_save = self.st_asin_count.join( self.asin_title_num, on='asin', how='left' ).fillna( {'value': 1} ) # 计算搜索词下打包数量大于1count self.df_save = self.df_save.withColumn( 'num_flag', F.when(F.col('value') > 1, 1).otherwise(0) ) # 聚合,计算asin_count self.df_save = self.df_save.groupby('search_term').agg( F.max('asin_count').alias('asin_count'), F.sum('num_flag').alias('num_count'), F.max(F.struct('value', 'asin')).alias('max_row') ).repartition(30, 'search_term') # 关联搜索词id self.df_save = self.df_save.join( self.search_term_id, on='search_term', how='inner' ) # 计算多数量占比 self.df_save = self.df_save.withColumn( 'max_num_asin', self.df_save.max_row.asin ).withColumn( 'max_num', self.df_save.max_row.value ).withColumn( 'most_proportion', F.round(F.col('num_count')/F.col('asin_count'), 3) ) # 关联内部asin self.df_save = self.df_save.join( self.self_asin, on='max_num_asin', how='left' ).fillna( {'is_self_max_num_asin': 0} ) # 字段入库前处理 self.df_save = self.df_save.select( 'search_term', 'asin_count', 'num_count', 'max_num', 'max_num_asin', 'most_proportion', 'search_term_id', 'is_self_max_num_asin', F.lit(self.site_name).alias('site_name'), F.lit(self.date_type).alias('date_type'), F.lit(self.date_info).alias('date_info') ) if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DwsStNumStats(site_name, date_type, date_info) obj.run()