dws_top100_asin_info.py 5.14 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.templates import Templates

"""
获取搜索词 top100 相关指标
依赖 dwd_asin_measure 表 dwd_st_asin_measure 表 dim_asin_detail 表
输出为 Dws_top100_asin_info
"""


class DwsTop100AsinInfo(Templates):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.db_save = "dws_top100_asin_info"
        self.df_search_term_asin = self.spark.sql("select 1+1;")
        self.df_search_term_id = self.spark.sql("select 1+1;")
        self.df_asin_bsr_orders = self.spark.sql("select 1+1;")
        self.df_asin_detail = self.spark.sql("select 1+1;")
        self.df_save = self.spark.sql("select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=25)
        partition_dict = {
            "site_name": self.site_name,
            "date_type": self.date_type,
            "date_info": self.date_info,
        }
        hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=partition_dict)
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

    def read_data(self):
        sql1 = f"""
            select 
                search_term,
                asin 
            from dwd_st_asin_measure
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_search_term_asin = self.spark.sql(sql1).repartition(40, 'search_term', 'asin').cache()
        self.df_search_term_asin.show(10, truncate=True)

        sql2 = f"""
            select 
                cast(st_key as integer) as search_term_id,
                search_term
            from ods_st_key
            where site_name = '{self.site_name}'
        """
        self.df_search_term_id = self.spark.sql(sql2).repartition(40, 'search_term').cache()
        self.df_search_term_id.show(10, truncate=True)

        sql3 = f"""
            select 
                asin,
                asin_bsr_orders as orders
            from dwd_asin_measure
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_asin_bsr_orders = self.spark.sql(sql3).repartition(40, 'asin').cache()
        self.df_asin_bsr_orders.show(10, truncate=True)

        sql4 = f"""
            select 
                asin,
                asin_launch_time, 
                asin_is_new
            from dim_asin_detail
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_asin_detail = self.spark.sql(sql4).repartition(40, 'asin').cache()
        self.df_asin_detail.show(10, truncate=True)

    def handle_data(self):
        self.df_save = self.df_search_term_asin.join(
            self.df_search_term_id, on='search_term', how='left'
        ).join(
            self.df_asin_bsr_orders, on='asin', how='left'
        ).join(
            self.df_asin_detail, on='asin', how='left'
        )
        # 取前一百
        self.df_save = self.df_save.withColumn(
            "row_number",
            F.row_number().over(
                Window.partitionBy(F.col('search_term_id')).orderBy(F.col("orders").desc_nulls_last())
            )
        )
        self.df_save = self.df_save.filter("row_number <= 100")
        self.df_save = self.df_save.withColumn(
            "group_sum",
            F.sum(F.col('orders')).over(
                Window.partitionBy(F.col('search_term_id'))
            )
        )
        self.df_save = self.df_save.groupby(F.col("search_term_id")).agg(
            F.first("search_term").alias("search_term"),
            F.concat_ws(',', F.collect_list("asin")).alias("top100_asin"),
            F.concat_ws(',', F.collect_list(F.coalesce('orders', F.lit(0)))).alias("top100_orders"),
            #  市场比例
            F.concat_ws(',', F.collect_list(F.coalesce(F.round(F.expr('orders / group_sum'), 4), F.lit(0))))
                .alias("top100_market_share"),
            F.concat_ws(',', F.collect_list(F.coalesce(F.col("asin_is_new"), F.lit('-')))).alias("top100_is_new"),
            F.concat_ws(',', F.collect_list("row_number")).alias("top100_rank"),

            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info"),
        )


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwsTop100AsinInfo(site_name, date_type, date_info)
    obj.run()