import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.hdfs_utils import HdfsUtils from utils.common_util import CommonUtil from utils.spark_util import SparkUtil from pyspark.sql import functions as F from pyspark.sql.window import Window class DwtSTBaseReport(object): def __init__(self, site_name, date_type, date_info): self.site_name = site_name self.date_type = date_type self.date_info = date_info self.hive_tb = "dwt_st_base_report" app_name = f"{self.hive_tb}:{site_name} {date_type} {date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.partitions_num = CommonUtil.reset_partitions(site_name, 1) def run(self): # 读ods_rank_search_rate_repeat表,获取排名+搜索量对应关系 if self.date_info <= '2022-08': params = "date_info = '2022-08'" else: params = f"date_info = '{self.date_info}'" sql1 = f""" select rank as st_rank, search_num as st_volume, search_sum as st_orders from ods_rank_search_rate_repeat where site_name = '{self.site_name}' and date_type = 'month' and {params}; """ df_rank_sv = self.spark.sql(sql1).repartition(40, 'st_rank').cache() print("排名+搜索量对应关系:") df_rank_sv.show(10, False) if df_rank_sv.count() == 0: sql1 = f""" select rank as st_rank, search_num as st_volume, search_sum as st_orders, date_info from ods_rank_search_rate_repeat where site_name = '{self.site_name}'; """ df_rank_sv = self.spark.sql(sql1) window = Window.partitionBy(["st_rank"]).orderBy(df_rank_sv.date_info.desc()) df_rank_sv = df_rank_sv.withColumn("rk", F.row_number().over(window=window))\ .filter("rk = 1")\ .drop("rk", "date_info")\ .repartition(40, 'st_rank').cache() print("排名+搜索量对应关系:") df_rank_sv.show(10, False) # 搜索词主表 sql2 = f""" select id as st_key, search_term from dwt_aba_st_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ df_st_base = self.spark.sql(sql2).repartition(40, 'search_term').cache() print("搜索词主表:") df_st_base.show(10, False) # 读ods_brand_analytics表,获取报告中的搜索词+排名 sql3 = f""" select search_term, rank as st_rank from ods_brand_analytics where site_name = '{self.site_name}' and date_type = '{self.date_type}' and date_info = '{self.date_info}'; """ df_st_rank = self.spark.sql(sql3).repartition(40, 'search_term').cache() print("搜索词+排名对应关系,月:") df_st_rank.show(10, False) df_save = df_st_base \ .join(df_st_rank, 'search_term', 'inner') \ .repartition(40, 'st_rank') \ .join(df_rank_sv, 'st_rank', 'left') \ .withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) \ .withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) \ .withColumn('years', F.lit(int(self.date_info.split("-")[0]))) \ .withColumn('site_name', F.lit(self.site_name)) \ .withColumn('date_type', F.lit(self.date_type)) \ .withColumn('date_info', F.lit(self.date_info)) hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}" print(f"清除hdfs目录中数据:{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) df_save = df_save.repartition(self.partitions_num) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") # 计算周维度的趋势图数据 # 读日期字典表,获取date_info对应的week_list sql = f""" select year_week from dim_date_20_to_30 where week_day = 1 and year_month = '{self.date_info}'; """ df_week = self.spark.sql(sql) week_list = sorted([row['year_week'] for row in df_week.collect()]) for year_week in week_list: sql = f""" select search_term, rank as st_rank from ods_brand_analytics where site_name = '{self.site_name}' and date_type = 'week' and date_info = '{year_week}' and rank <= 1500000; """ df_st_rank_week = self.spark.sql(sql).repartition(40, 'search_term').cache() print(f"搜索词+排名对应关系,{year_week}周:") df_st_rank_week.show(10, False) df_save = df_st_base\ .join(df_st_rank_week, 'search_term', 'inner')\ .repartition(40, 'st_rank')\ .join(df_rank_sv, 'st_rank', 'left')\ .withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\ .withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\ .withColumn('years', F.lit(int(year_week.split("-")[0])))\ .withColumn('site_name', F.lit(self.site_name))\ .withColumn('date_type', F.lit('week'))\ .withColumn('date_info', F.lit(year_week)) hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type=week/date_info={year_week}" print(f"清除hdfs目录中数据:{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) df_save = df_save.repartition(self.partitions_num) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_type = CommonUtil.get_sys_arg(2, None) date_info = CommonUtil.get_sys_arg(3, None) obj = DwtSTBaseReport(site_name, date_type, date_info) obj.run()