dwt_st_sv_last365.py 5.03 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window


class DwtStSearchVolumeLast365(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        pass

    def run(self):
        last_12_month = []
        for i in range(0, 12):
            last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i))
        print(f"过去12个月为{last_12_month}")

        use_date_type = DateTypes.month.name if date_info > '2023-10' else DateTypes.month_old.name

        sql1 = f"""
            select 
                search_term, 
                total_search_volume, 
                row_number() over (order by total_search_volume desc) as sv_rank, 
                '{self.site_name}' as site_name, 
                '{self.date_info}' as date_info 
            from (
                select 
                    search_term, 
                    sum(st_search_num) as total_search_volume 
                from (
                    select 
                        search_term, 
                        st_search_num 
                    from dim_st_detail
                    where site_name = '{self.site_name}'
                      and date_type = '{use_date_type}'
                      and date_info in ({CommonUtil.list_to_insql(last_12_month)})
                    )
                group by search_term
                );
        """
        df_month = self.spark.sql(sql1).repartition(40, 'search_term').cache()

        df_save1 = df_month.repartition(1)
        hive_tb = "dwt_st_sv_last365_month"
        partition_by = ["site_name", "date_info"]
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)
        df_save1.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)

        sql2 = f"""
            select 
                st_key,
                search_term
            from ods_st_key
            where site_name = '{self.site_name}';
        """
        df_st_key = self.spark.sql(sql2).repartition(40, 'search_term').cache()

        # 重新获取月搜索量
        df_all = df_month.join(
            df_st_key, "search_term", "inner"
        ).select(
            df_st_key['st_key'].alias("search_term_id"),
            "search_term",
            "total_search_volume",
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_info).alias("date_info"),
        ).dropDuplicates(
            ['search_term_id']
        ).withColumn(
            "sv_rank",
            F.row_number().over(Window.partitionBy(['site_name']).orderBy(F.col("total_search_volume").desc()))
        ).repartition(40, 'sv_rank').cache()
        df_month.unpersist()
        df_st_key.unpersist()

        # 读取ods_rank_search_rate_repeat表
        sql3 = f"""
            select 
                rank, 
                search_num, 
                date_info 
            from ods_rank_search_rate_repeat 
            where site_name = '{self.site_name}' 
              and date_type = 'month';
        """
        df_rank_sv = self.spark.sql(sql3).cache()
        window = Window.partitionBy(["rank"]).orderBy(df_rank_sv.date_info.desc())
        df_rank_sv = df_rank_sv.withColumn(
            "date_info_rank", F.row_number().over(window=window)
        ).filter(
            "date_info_rank = 1"
        ).drop(
            "date_info_rank", "date_info"
        ).repartition(40, 'rank').cache()

        df_all = df_all.join(
            df_rank_sv, df_all['sv_rank'] == df_rank_sv['rank'], "left"
        ).select(
            "search_term_id",
            "search_term",
            "total_search_volume",
            "sv_rank",
            F.col("search_num").alias("sv_month"),
            "site_name",
            "date_info",
        )
        df_rank_sv.unpersist()

        df_save2 = df_all.repartition(2)
        hive_tb = "dwt_st_sv_last365"
        partition_by = ["site_name", "date_info"]
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)
        df_save2.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DwtStSearchVolumeLast365(site_name, date_info)
    obj.run()