import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.spark_util import SparkUtil """ 对bsr榜单ASIN历史纬度数据进行存档 从ods[ods_bs_category_top100_asin]=>dim[dim_bsr_asin_rank_history],us date_info """ class DimBsrAsinRankHistory(object): def __init__(self, site_name, date_info): self.site_name = site_name self.date_info = date_info app_name = f"{self.__class__.__name__}:{site_name}:{date_info}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dim_bsr_asin_rank_history" def run(self): # 数据清洗 sql = f""" select asin, cate_current_id as old_category_id, bsr_rank, rating as asin_rating, total_comments as asin_total_comments, updated_at, category_id, date_info, site_name from ods_bs_category_top100_asin where 1 = 1 and date_info = '{self.date_info}' and site_name='{self.site_name}' """ print("======================查询sql如下======================") print(sql) df_save = self.spark.sql(sql) if df_save.first() == None: print("============================无数据跳过===================================") return # 清除重复数据 df_save = df_save.dropDuplicates(['asin', 'category_id']) # 分区数量调整 df_save = df_save.repartition(1) # 先清空数据 partition_dict = { "site_name": self.site_name, "date_info": self.date_info, } CommonUtil.save_or_update_table( spark_session=self.spark, hive_tb_name=self.hive_tb, partition_dict=partition_dict, df_save=df_save, drop_exist_tmp_flag=False ) if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) date_info = CommonUtil.get_sys_arg(2, None) obj = DimBsrAsinRankHistory(site_name, date_info) obj.run()