dwd_nsr_asin_rank.py 5.31 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from pyspark.sql.types import MapType, StringType, IntegerType

"""
计算asin某个时间纬度内是否上过nsr榜单
"""


class DwdNsrAsinRank(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        # 默认数据范围是30天
        self.date_type = "last30day"

        # 30day前
        self.last_30_day = CommonUtil.get_day_offset(date_info, -30)
        # 7 day前
        self.last_7_day = CommonUtil.get_day_offset(date_info, -7)
        self.current_day = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        #  注册本地静态方法 udf 返回新函数
        self.udf_calc_rank_reg = self.spark.udf.register("udf_calc_rank", self.udf_calc_rank, MapType(StringType(), IntegerType()))

        self.hive_tb = "dwd_nsr_asin_rank"
        hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}"
        print(f"清除hdfs目录中.....{hdfs_path}")
        HdfsUtils.delete_file_in_folder(hdfs_path)

    @staticmethod
    def udf_calc_rank(dayArr: list, rankArr: list):
        """
        判断 是否新上榜等
        :return:
        """
        bsr_count = len(rankArr)
        is_30_day_flag = 1
        is_1_day_flag = 0
        is_7_day_flag = 0
        last_7_day = dayArr[0]
        start_day = dayArr[1]
        end_day = dayArr[2]
        for item in rankArr:
            date_info = item[1] + ""

            if date_info >= last_7_day:
                is_7_day_flag = 1

            if date_info >= start_day:
                is_30_day_flag = 1

            if date_info >= end_day:
                is_1_day_flag = 1

        return {
            "bsr_count": bsr_count,
            "is_30_day_flag": is_30_day_flag,
            "is_1_day_flag": is_1_day_flag,
            "is_7_day_flag": is_7_day_flag,
        }

    def run(self):
        sql = f"""
            select dbarh.asin,
                   dbarh.category_id,
                   dbarh.bsr_rank,
                   dbarh.date_info,
                   tmp.asin_launch_time
            from dim_nsr_asin_rank_history dbarh
                     left join (
                select asin, asin_launch_time
                from dim_cal_asin_history_detail
                where site_name = "{self.site_name}"
            ) tmp on tmp.asin = dbarh.asin
            where dbarh.site_name = "{site_name}"
              and dbarh.date_info >= "{self.last_30_day}"
              and dbarh.date_info <= "{self.current_day}"
"""
        print("======================查询sql如下======================")
        print(sql)

        last_90_day = CommonUtil.get_day_offset(date_info, -90)

        df_all = self.spark.sql(sql)

        df_all = df_all \
            .groupBy(["category_id", "asin"]) \
            .agg(
            self.udf_calc_rank_reg(
                F.array(F.lit(self.last_7_day), F.lit(self.last_30_day), F.lit(self.current_day)),
                F.collect_set(F.array([df_all['bsr_rank'], df_all['date_info']]))
            ).alias("resultMap"),

            # 最新 bsr_rank
            F.max(F.struct("date_info", "bsr_rank")).alias("last_row"),

            # 近30天进入过BSR榜单的产品中,上架时间在90天以内的产品 asin_launch_time 为 null 表示是新上架的
            F.when(F.first("asin_launch_time") < F.lit(last_90_day), F.lit(0)).otherwise(1).alias("is_asin_new"),

        )

        df_save = df_all \
            .select(
            F.col("asin"),
            F.col("category_id"),
            F.col("last_row").getField("bsr_rank").alias("bsr_rank"),
            F.col("resultMap").getField("bsr_count").alias("bsr_count"),
            F.col("resultMap").getField("is_1_day_flag").alias("is_1_day_flag"),
            F.col("resultMap").getField("is_7_day_flag").alias("is_7_day_flag"),
            F.col("resultMap").getField("is_30_day_flag").alias("is_30_day_flag"),
            F.col("is_asin_new"),
            # 近30天进入过NSR榜单的产品中,30天内是首次上榜的产品 即30天上榜次数为1
            F.when(
                F.col("resultMap").getField("bsr_count") == 1, F.lit(1)
            ).otherwise(0).alias("is_asin_bsr_new"),
            # 最近排名上榜时间
            F.col("last_row").getField("date_info").alias("last_bsr_day"),
            F.lit(self.site_name).alias("site_name"),
            F.lit(self.date_type).alias("date_type"),
            F.lit(self.date_info).alias("date_info")
        )
        # 分区数量调整为2个
        df_save = df_save.repartition(2)
        partition_by = ["site_name", "date_type", "date_info"]
        print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", )
        df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by)
        print("success")


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DwdNsrAsinRank(site_name, date_info)
    obj.run()