dws_top100_asin_info.py 5.14 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F, Window
from utils.templates import Templates

"""
获取搜索词 top100 相关指标
依赖 dwd_asin_measure 表 dwd_st_asin_measure 表 dim_asin_detail 表
输出为 Dws_top100_asin_info
"""


class DwsTop100AsinInfo(Templates):

    def __init__(self, site_name, date_type, date_info):
        super().__init__()
        self.site_name = site_name
        self.date_type = date_type
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_type}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.db_save = "dws_top100_asin_info"
        self.df_search_term_asin = self.spark.sql("select 1+1;")
        self.df_search_term_id = self.spark.sql("select 1+1;")
        self.df_asin_bsr_orders = self.spark.sql("select 1+1;")
        self.df_asin_detail = self.spark.sql("select 1+1;")
        self.df_save = self.spark.sql("select 1+1;")
        self.partitions_by = ['site_name', 'date_type', 'date_info']
        self.reset_partitions(partitions_num=25)
        partition_dict = {
            "site_name": self.site_name,
            "date_type": self.date_type,
            "date_info": self.date_info,
        }
        hdfs_path = CommonUtil.build_hdfs_path(self.db_save, partition_dict=partition_dict)
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

    def read_data(self):
        sql1 = f"""
            select 
                search_term,
                asin 
            from dwd_st_asin_measure
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_search_term_asin = self.spark.sql(sql1).repartition(40, 'search_term', 'asin').cache()
        self.df_search_term_asin.show(10, truncate=True)

        sql2 = f"""
            select 
                cast(st_key as integer) as search_term_id,
                search_term
            from ods_st_key
            where site_name = '{self.site_name}'
        """
        self.df_search_term_id = self.spark.sql(sql2).repartition(40, 'search_term').cache()
        self.df_search_term_id.show(10, truncate=True)

        sql3 = f"""
            select 
                asin,
                asin_bsr_orders as orders
            from dwd_asin_measure
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_asin_bsr_orders = self.spark.sql(sql3).repartition(40, 'asin').cache()
        self.df_asin_bsr_orders.show(10, truncate=True)

        sql4 = f"""
            select 
                asin,
                asin_launch_time, 
                asin_is_new
            from dim_asin_detail
            where site_name = '{self.site_name}'
            and date_type = '{self.date_type}'
            and date_info = '{self.date_info}'
        """
        self.df_asin_detail = self.spark.sql(sql4).repartition(40, 'asin').cache()
        self.df_asin_detail.show(10, truncate=True)

    def handle_data(self):
        self.df_save = self.df_search_term_asin.join(
            self.df_search_term_id, on='search_term', how='left'
        ).join(
            self.df_asin_bsr_orders, on='asin', how='left'
        ).join(
            self.df_asin_detail, on='asin', how='left'
        )
        # 取前一百
        self.df_save = self.df_save.withColumn(
            "row_number",
            F.row_number().over(
                Window.partitionBy(F.col('search_term_id')).orderBy(F.col("orders").desc_nulls_last())
            )
        )
        self.df_save = self.df_save.filter("row_number <= 100")
        self.df_save = self.df_save.withColumn(
            "group_sum",
            F.sum(F.col('orders')).over(
                Window.partitionBy(F.col('search_term_id'))
            )
        )
        self.df_save = self.df_save.groupby(F.col("search_term_id")).agg(
            F.first("search_term").alias("search_term"),
            F.concat_ws(',', F.collect_list("asin")).alias("top100_asin"),
            F.concat_ws(',', F.collect_list(F.coalesce('orders', F.lit(0)))).alias("top100_orders"),
            #  市场比例
            F.concat_ws(',', F.collect_list(F.coalesce(F.round(F.expr('orders / group_sum'), 4), F.lit(0))))
                .alias("top100_market_share"),
            F.concat_ws(',', F.collect_list(F.coalesce(F.col("asin_is_new"), F.lit('-')))).alias("top100_is_new"),
            F.concat_ws(',', F.collect_list("row_number")).alias("top100_rank"),

            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info"),
        )


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_type = CommonUtil.get_sys_arg(2, None)
    date_info = CommonUtil.get_sys_arg(3, None)
    obj = DwsTop100AsinInfo(site_name, date_type, date_info)
    obj.run()