dwt_st_base_report.py 6.7 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.hdfs_utils import HdfsUtils
from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.window import Window


class DwtSTBaseReport(object):

    def __init__(self, site_name, date_type, date_info):
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        self.hive_tb = "dwt_st_base_report"
        app_name = f"{self.hive_tb}:{site_name} {date_type} {date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.partitions_num = CommonUtil.reset_partitions(site_name, 1)

    def run(self):
        # 读ods_rank_search_rate_repeat表,获取排名+搜索量对应关系
        if self.date_info <= '2022-08':
            params = "date_info = '2022-08'"
        else:
            params = f"date_info = '{self.date_info}'"
        sql1 = f"""
        select 
            rank as st_rank, 
            search_num as st_volume, 
            search_sum as st_orders 
        from ods_rank_search_rate_repeat 
        where site_name = '{self.site_name}' 
          and date_type = 'month' 
          and {params};
        """
        df_rank_sv = self.spark.sql(sql1).repartition(40, 'st_rank').cache()
        print("排名+搜索量对应关系:")
        df_rank_sv.show(10, False)
        if df_rank_sv.count() == 0:
            sql1 = f"""
            select 
                rank as st_rank, 
                search_num as st_volume, 
                search_sum as st_orders, 
                date_info 
            from ods_rank_search_rate_repeat 
            where site_name = '{self.site_name}';
            """
            df_rank_sv = self.spark.sql(sql1)
            window = Window.partitionBy(["st_rank"]).orderBy(df_rank_sv.date_info.desc())
            df_rank_sv = df_rank_sv.withColumn("rk", F.row_number().over(window=window))\
                .filter("rk = 1")\
                .drop("rk", "date_info")\
                .repartition(40, 'st_rank').cache()
            print("排名+搜索量对应关系:")
            df_rank_sv.show(10, False)

        # 搜索词主表
        sql2 = f""" 
        select 
            id as st_key, 
            search_term 
        from dwt_aba_st_analytics
        where site_name = '{self.site_name}'
          and date_type = '{self.date_type}'
          and date_info = '{self.date_info}';
        """
        df_st_base = self.spark.sql(sql2).repartition(40, 'search_term').cache()
        print("搜索词主表:")
        df_st_base.show(10, False)

        # 读ods_brand_analytics表,获取报告中的搜索词+排名
        sql3 = f"""
        select 
            search_term, 
            rank as st_rank 
        from ods_brand_analytics 
        where site_name = '{self.site_name}' 
          and date_type = '{self.date_type}' 
          and date_info = '{self.date_info}';
        """
        df_st_rank = self.spark.sql(sql3).repartition(40, 'search_term').cache()
        print("搜索词+排名对应关系,月:")
        df_st_rank.show(10, False)

        df_save = df_st_base \
            .join(df_st_rank, 'search_term', 'inner') \
            .repartition(40, 'st_rank') \
            .join(df_rank_sv, 'st_rank', 'left') \
            .withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) \
            .withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS')) \
            .withColumn('years', F.lit(int(self.date_info.split("-")[0]))) \
            .withColumn('site_name', F.lit(self.site_name)) \
            .withColumn('date_type', F.lit(self.date_type)) \
            .withColumn('date_info', F.lit(self.date_info))
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中数据:{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)
        df_save = df_save.repartition(self.partitions_num)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")

        # 计算周维度的趋势图数据
        # 读日期字典表,获取date_info对应的week_list
        sql = f""" 
        select year_week from dim_date_20_to_30 where week_day = 1 and year_month = '{self.date_info}';
        """
        df_week = self.spark.sql(sql)
        week_list = sorted([row['year_week'] for row in df_week.collect()])
        for year_week in week_list:
            sql = f"""
            select 
                search_term, 
                rank as st_rank 
            from ods_brand_analytics 
            where site_name = '{self.site_name}' 
              and date_type = 'week' 
              and date_info = '{year_week}'
              and rank <= 1500000;
            """
            df_st_rank_week = self.spark.sql(sql).repartition(40, 'search_term').cache()
            print(f"搜索词+排名对应关系,{year_week}周:")
            df_st_rank_week.show(10, False)
            df_save = df_st_base\
                .join(df_st_rank_week, 'search_term', 'inner')\
                .repartition(40, 'st_rank')\
                .join(df_rank_sv, 'st_rank', 'left')\
                .withColumn('created_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\
                .withColumn('updated_time', F.date_format(F.current_timestamp(), 'yyyy-MM-dd HH:mm:SS'))\
                .withColumn('years', F.lit(int(year_week.split("-")[0])))\
                .withColumn('site_name', F.lit(self.site_name))\
                .withColumn('date_type', F.lit('week'))\
                .withColumn('date_info', F.lit(year_week))
            hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwt/{self.hive_tb}/site_name={self.site_name}/date_type=week/date_info={year_week}"
            print(f"清除hdfs目录中数据:{hdfs_path}")
            HdfsUtils.delete_hdfs_file(hdfs_path)
            df_save = df_save.repartition(self.partitions_num)
            partition_by = ["site_name", "date_type", "date_info"]
            print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
            df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
            print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwtSTBaseReport(site_name, date_type, date_info)
    obj.run()