import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil """ 对nsr榜单(新品销售排行榜)ASIN历史纬度数据进行存档 """ class DimNsrAsinRankHistory(object): def __init__(self, site_name, date_info): self.site_name = site_name self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dim_nsr_asin_rank_history" def run(self): sql = f""" select asin, cate_current_id as old_category_id, category_id as category_id, bsr_rank , rating as asin_rating, total_comments as asin_total_comments, updated_at, date_info, site_name from ods_new_releases_top100_asin where 1 = 1 and date_info = '{self.date_info}' and site_name='{self.site_name}' """ print("======================查询sql如下======================") print(sql) df_save = self.spark.sql(sql) if df_save.first() == None: print("============================无数据跳过===================================") return # 清除重复数据 df_save = df_save.dropDuplicates(['asin', 'category_id']) # 分区重置 df_save = df_save.repartition(1) partition_dict = { "site_name": self.site_name, "date_info": self.date_info, } CommonUtil.save_or_update_table( spark_session=self.spark, hive_tb_name=self.hive_tb, partition_dict=partition_dict, df_save=df_save, drop_exist_tmp_flag=False ) if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) obj = DimNsrAsinRankHistory(site_name, date_info) obj.run()