dim_nsr_asin_rank_history.py 2.09 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.hdfs_utils import HdfsUtils
from utils.spark_util import SparkUtil

"""
对nsr榜单(新品销售排行榜)ASIN历史纬度数据进行存档 
"""


class DimNsrAsinRankHistory(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dim_nsr_asin_rank_history"

    def run(self):
        sql = f"""
        select asin,
               cate_current_id as old_category_id,
               category_id     as category_id,
               bsr_rank        ,
               rating          as asin_rating,
               total_comments  as asin_total_comments,
               updated_at,
               date_info,
               site_name
        from ods_new_releases_top100_asin
            where 1 = 1
            and date_info = '{self.date_info}'
            and site_name='{self.site_name}'
"""
        print("======================查询sql如下======================")
        print(sql)
        df_save = self.spark.sql(sql)

        if df_save.first() == None:
            print("============================无数据跳过===================================")
            return

        # 清除重复数据
        df_save = df_save.dropDuplicates(['asin', 'category_id'])
        # 分区重置
        df_save = df_save.repartition(1)

        partition_dict = {
            "site_name": self.site_name,
            "date_info": self.date_info,
        }

        CommonUtil.save_or_update_table(
            spark_session=self.spark,
            hive_tb_name=self.hive_tb,
            partition_dict=partition_dict,
            df_save=df_save,
            drop_exist_tmp_flag=False
        )


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DimNsrAsinRankHistory(site_name, date_info)
    obj.run()