import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil, DateTypes from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F, Window class DwtStSearchVolumeLast365(object): def __init__(self, site_name, date_info): self.site_name = site_name self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) pass def run(self): last_12_month = [] for i in range(0, 12): last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i)) print(f"过去12个月为{last_12_month}") use_date_type = DateTypes.month.name if date_info > '2023-10' else DateTypes.month_old.name sql1 = f""" select search_term, total_search_volume, row_number() over (order by total_search_volume desc) as sv_rank, '{self.site_name}' as site_name, '{self.date_info}' as date_info from ( select search_term, sum(st_search_num) as total_search_volume from ( select search_term, st_search_num from dim_st_detail where site_name = '{self.site_name}' and date_type = '{use_date_type}' and date_info in ({CommonUtil.list_to_insql(last_12_month)}) ) group by search_term ); """ df_month = self.spark.sql(sql1).repartition(40, 'search_term').cache() df_save1 = df_month.repartition(1) hive_tb = "dwt_st_sv_last365_month" partition_by = ["site_name", "date_info"] print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) df_save1.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) sql2 = f""" select st_key, search_term from ods_st_key where site_name = '{self.site_name}'; """ df_st_key = self.spark.sql(sql2).repartition(40, 'search_term').cache() # 重新获取月搜索量 df_all = df_month.join( df_st_key, "search_term", "inner" ).select( df_st_key['st_key'].alias("search_term_id"), "search_term", "total_search_volume", F.lit(self.site_name).alias("site_name"), F.lit(self.date_info).alias("date_info"), ).dropDuplicates( ['search_term_id'] ).withColumn( "sv_rank", F.row_number().over(Window.partitionBy(['site_name']).orderBy(F.col("total_search_volume").desc())) ).repartition(40, 'sv_rank').cache() df_month.unpersist() df_st_key.unpersist() # 读取ods_rank_search_rate_repeat表 sql3 = f""" select rank, search_num, date_info from ods_rank_search_rate_repeat where site_name = '{self.site_name}' and date_type = 'month'; """ df_rank_sv = self.spark.sql(sql3).cache() window = Window.partitionBy(["rank"]).orderBy(df_rank_sv.date_info.desc()) df_rank_sv = df_rank_sv.withColumn( "date_info_rank", F.row_number().over(window=window) ).filter( "date_info_rank = 1" ).drop( "date_info_rank", "date_info" ).repartition(40, 'rank').cache() df_all = df_all.join( df_rank_sv, df_all['sv_rank'] == df_rank_sv['rank'], "left" ).select( "search_term_id", "search_term", "total_search_volume", "sv_rank", F.col("search_num").alias("sv_month"), "site_name", "date_info", ) df_rank_sv.unpersist() df_save2 = df_all.repartition(2) hive_tb = "dwt_st_sv_last365" partition_by = ["site_name", "date_info"] print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", ) hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) df_save2.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) obj = DwtStSearchVolumeLast365(site_name, date_info) obj.run()