dwt_st_sv_last365.py 5.03 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil, DateTypes
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window


class DwtStSearchVolumeLast365(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        pass

    def run(self):
        last_12_month = []
        for i in range(0, 12):
            last_12_month.append(CommonUtil.get_month_offset(self.date_info, -i))
        print(f"过去12个月为{last_12_month}")

        use_date_type = DateTypes.month.name if date_info > '2023-10' else DateTypes.month_old.name

        sql1 = f"""
            select 
                search_term, 
                total_search_volume, 
                row_number() over (order by total_search_volume desc) as sv_rank, 
                '{self.site_name}' as site_name, 
                '{self.date_info}' as date_info 
            from (
                select 
                    search_term, 
                    sum(st_search_num) as total_search_volume 
                from (
                    select 
                        search_term, 
                        st_search_num 
                    from dim_st_detail
                    where site_name = '{self.site_name}'
                      and date_type = '{use_date_type}'
                      and date_info in ({CommonUtil.list_to_insql(last_12_month)})
                    )
                group by search_term
                );
        """
        df_month = self.spark.sql(sql1).repartition(40, 'search_term').cache()

        df_save1 = df_month.repartition(1)
        hive_tb = "dwt_st_sv_last365_month"
        partition_by = ["site_name", "date_info"]
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)
        df_save1.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)

        sql2 = f"""
            select 
                st_key,
                search_term
            from ods_st_key
            where site_name = '{self.site_name}';
        """
        df_st_key = self.spark.sql(sql2).repartition(40, 'search_term').cache()

        # 重新获取月搜索量
        df_all = df_month.join(
            df_st_key, "search_term", "inner"
        ).select(
            df_st_key['st_key'].alias("search_term_id"),
            "search_term",
            "total_search_volume",
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_info).alias("date_info"),
        ).dropDuplicates(
            ['search_term_id']
        ).withColumn(
            "sv_rank",
            F.row_number().over(Window.partitionBy(['site_name']).orderBy(F.col("total_search_volume").desc()))
        ).repartition(40, 'sv_rank').cache()
        df_month.unpersist()
        df_st_key.unpersist()

        # 读取ods_rank_search_rate_repeat表
        sql3 = f"""
            select 
                rank, 
                search_num, 
                date_info 
            from ods_rank_search_rate_repeat 
            where site_name = '{self.site_name}' 
              and date_type = 'month';
        """
        df_rank_sv = self.spark.sql(sql3).cache()
        window = Window.partitionBy(["rank"]).orderBy(df_rank_sv.date_info.desc())
        df_rank_sv = df_rank_sv.withColumn(
            "date_info_rank", F.row_number().over(window=window)
        ).filter(
            "date_info_rank = 1"
        ).drop(
            "date_info_rank", "date_info"
        ).repartition(40, 'rank').cache()

        df_all = df_all.join(
            df_rank_sv, df_all['sv_rank'] == df_rank_sv['rank'], "left"
        ).select(
            "search_term_id",
            "search_term",
            "total_search_volume",
            "sv_rank",
            F.col("search_num").alias("sv_month"),
            "site_name",
            "date_info",
        )
        df_rank_sv.unpersist()

        df_save2 = df_all.repartition(2)
        hive_tb = "dwt_st_sv_last365"
        partition_by = ["site_name", "date_info"]
        print(f"当前存储的表名为:{hive_tb},分区为{partition_by}", )
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{hive_tb}/site_name={self.site_name}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)
        df_save2.write.saveAsTable(name=hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DwtStSearchVolumeLast365(site_name, date_info)
    obj.run()