import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from pyspark.sql.types import MapType, StringType, IntegerType """ 计算asin某个时间纬度内是否上过nsr榜单 """ class DwdNsrAsinRank(object): def __init__(self, site_name, date_info): self.site_name = site_name self.date_info = date_info # 默认数据范围是30天 self.date_type = "last30day" # 30day前 self.last_30_day = CommonUtil.get_day_offset(date_info, -30) # 7 day前 self.last_7_day = CommonUtil.get_day_offset(date_info, -7) self.current_day = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) # 注册本地静态方法 udf 返回新函数 self.udf_calc_rank_reg = self.spark.udf.register("udf_calc_rank", self.udf_calc_rank, MapType(StringType(), IntegerType())) self.hive_tb = "dwd_nsr_asin_rank" hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dwd/{self.hive_tb}/site_name={self.site_name}/date_type={self.date_type}/date_info={self.date_info}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_file_in_folder(hdfs_path) @staticmethod def udf_calc_rank(dayArr: list, rankArr: list): """ 判断 是否新上榜等 :return: """ bsr_count = len(rankArr) is_30_day_flag = 1 is_1_day_flag = 0 is_7_day_flag = 0 last_7_day = dayArr[0] start_day = dayArr[1] end_day = dayArr[2] for item in rankArr: date_info = item[1] + "" if date_info >= last_7_day: is_7_day_flag = 1 if date_info >= start_day: is_30_day_flag = 1 if date_info >= end_day: is_1_day_flag = 1 return { "bsr_count": bsr_count, "is_30_day_flag": is_30_day_flag, "is_1_day_flag": is_1_day_flag, "is_7_day_flag": is_7_day_flag, } def run(self): sql = f""" select dbarh.asin, dbarh.category_id, dbarh.bsr_rank, dbarh.date_info, tmp.asin_launch_time from dim_nsr_asin_rank_history dbarh left join ( select asin, asin_launch_time from dim_cal_asin_history_detail where site_name = "{self.site_name}" ) tmp on tmp.asin = dbarh.asin where dbarh.site_name = "{site_name}" and dbarh.date_info >= "{self.last_30_day}" and dbarh.date_info <= "{self.current_day}" """ print("======================查询sql如下======================") print(sql) last_90_day = CommonUtil.get_day_offset(date_info, -90) df_all = self.spark.sql(sql) df_all = df_all \ .groupBy(["category_id", "asin"]) \ .agg( self.udf_calc_rank_reg( F.array(F.lit(self.last_7_day), F.lit(self.last_30_day), F.lit(self.current_day)), F.collect_set(F.array([df_all['bsr_rank'], df_all['date_info']])) ).alias("resultMap"), # 最新 bsr_rank F.max(F.struct("date_info", "bsr_rank")).alias("last_row"), # 近30天进入过BSR榜单的产品中,上架时间在90天以内的产品 asin_launch_time 为 null 表示是新上架的 F.when(F.first("asin_launch_time") < F.lit(last_90_day), F.lit(0)).otherwise(1).alias("is_asin_new"), ) df_save = df_all \ .select( F.col("asin"), F.col("category_id"), F.col("last_row").getField("bsr_rank").alias("bsr_rank"), F.col("resultMap").getField("bsr_count").alias("bsr_count"), F.col("resultMap").getField("is_1_day_flag").alias("is_1_day_flag"), F.col("resultMap").getField("is_7_day_flag").alias("is_7_day_flag"), F.col("resultMap").getField("is_30_day_flag").alias("is_30_day_flag"), F.col("is_asin_new"), # 近30天进入过NSR榜单的产品中,30天内是首次上榜的产品 即30天上榜次数为1 F.when( F.col("resultMap").getField("bsr_count") == 1, F.lit(1) ).otherwise(0).alias("is_asin_bsr_new"), # 最近排名上榜时间 F.col("last_row").getField("date_info").alias("last_bsr_day"), F.lit(self.site_name).alias("site_name"), F.lit(self.date_type).alias("date_type"), F.lit(self.date_info).alias("date_info") ) # 分区数量调整为2个 df_save = df_save.repartition(2) partition_by = ["site_name", "date_type", "date_info"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) obj = DwdNsrAsinRank(site_name, date_info) obj.run()