dws_st_num_stats.py 5.35 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from utils.templates import Templates

"""
获取搜索词对应asin标题最大最小数量等其他指标
依赖 dwd_asin_title_number 表 dwd_st_asin_measure 表
输出为 Dws_st_num_stats
"""


class DwsStNumStats(Templates):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.reset_partitions(partitions_num=1)
        self.asin_title_num = self.spark.sql("select 1+1;")
        self.st_asin_count = self.spark.sql("select 1+1;")
        self.self_asin = self.spark.sql("select 1+1;")
        self.search_term_id = self.spark.sql("select 1+1;")
        self.df_save = self.spark.sql("select 1+1;")
        self.db_save = "dws_st_num_stats"
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dws/{self.db_save}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_hdfs_file(hdfs_path)

    def read_data(self):
        print("读取dwd_asin_title_number:")
        sql1 = f"""
            select 
                asin,
                value
            from dwd_asin_title_number
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.asin_title_num = self.spark.sql(sql1).repartition(40, 'asin').cache()
        self.asin_title_num.show(10, truncate=True)

        print("读取dwd_st_asin_measure:")
        sql2 = f"""
            select 
                search_term,
                asin,
                count(asin) over (partition by search_term) as asin_count
            from dwd_st_asin_measure
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
            group by search_term, asin
        """
        self.st_asin_count = self.spark.sql(sql2).repartition(40, 'search_term', 'asin').cache()
        self.st_asin_count.show(10, truncate=True)

        print("读取ods_self_asin:")
        sql3 = f"""
            select 
                asin as max_num_asin,
                1 as is_self_max_num_asin
            from ods_self_asin 
            where site_name = '{self.site_name}'
        """
        self.self_asin = self.spark.sql(sql3).repartition(40, 'asin').cache()
        self.self_asin.show(10, truncate=True)

        print("读取ods_st_key:")
        sql4 = f"""
            select 
                cast(st_key as integer) as search_term_id,
                search_term
            from ods_st_key
            where site_name = '{self.site_name}'
        """
        self.search_term_id = self.spark.sql(sql4).repartition(40, 'search_term').cache()
        self.search_term_id.show(10, truncate=True)

    def handle_data(self):
        # st+asin维度与标题数量关联
        self.df_save = self.st_asin_count.join(
            self.asin_title_num, on='asin', how='left'
        ).fillna(
            {'value': 1}
        )
        # 计算搜索词下打包数量大于1count
        self.df_save = self.df_save.withColumn(
            'num_flag',
            F.when(F.col('value') > 1, 1).otherwise(0)
        )
        # 聚合,计算asin_count
        self.df_save = self.df_save.groupby('search_term').agg(
            F.max('asin_count').alias('asin_count'),
            F.sum('num_flag').alias('num_count'),
            F.max(F.struct('value', 'asin')).alias('max_row')
        ).repartition(30, 'search_term')
        # 关联搜索词id
        self.df_save = self.df_save.join(
            self.search_term_id, on='search_term', how='inner'
        )
        # 计算多数量占比
        self.df_save = self.df_save.withColumn(
            'max_num_asin',
            self.df_save.max_row.asin
        ).withColumn(
            'max_num',
            self.df_save.max_row.value
        ).withColumn(
            'most_proportion',
            F.round(F.col('num_count')/F.col('asin_count'), 3)
        )
        # 关联内部asin
        self.df_save = self.df_save.join(
            self.self_asin, on='max_num_asin', how='left'
        ).fillna(
            {'is_self_max_num_asin': 0}
        )
        # 字段入库前处理
        self.df_save = self.df_save.select(
            'search_term',
            'asin_count',
            'num_count',
            'max_num',
            'max_num_asin',
            'most_proportion',
            'search_term_id',
            'is_self_max_num_asin',
            F.lit(self.site_name).alias('site_name'),
            F.lit(self.date_type).alias('date_type'),
            F.lit(self.date_info).alias('date_info')
        )


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwsStNumStats(site_name, date_type, date_info)
    obj.run()