dim_bsr_asin_rank_history.py 2.14 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))

from utils.common_util import CommonUtil
from utils.spark_util import SparkUtil

"""
对bsr榜单ASIN历史纬度数据进行存档 从ods[ods_bs_category_top100_asin]=>dim[dim_bsr_asin_rank_history],us date_info
"""


class DimBsrAsinRankHistory(object):

    def __init__(self, site_name, date_info):
        self.site_name = site_name
        self.date_info = date_info
        app_name = f"{self.__class__.__name__}:{site_name}:{date_info}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.hive_tb = "dim_bsr_asin_rank_history"

    def run(self):
        # 数据清洗
        sql = f"""
        select asin,
               cate_current_id as old_category_id,
               bsr_rank,
               rating          as asin_rating,
               total_comments  as asin_total_comments,
               updated_at,
               category_id,
               date_info,
               site_name
        from ods_bs_category_top100_asin
            where 1 = 1
            and date_info = '{self.date_info}'
            and site_name='{self.site_name}'
"""

        print("======================查询sql如下======================")
        print(sql)
        df_save = self.spark.sql(sql)

        if df_save.first() == None:
            print("============================无数据跳过===================================")
            return

            # 清除重复数据
        df_save = df_save.dropDuplicates(['asin', 'category_id'])
        # 分区数量调整
        df_save = df_save.repartition(1)

        # 先清空数据
        partition_dict = {
            "site_name": self.site_name,
            "date_info": self.date_info,
        }

        CommonUtil.save_or_update_table(
            spark_session=self.spark,
            hive_tb_name=self.hive_tb,
            partition_dict=partition_dict,
            df_save=df_save,
            drop_exist_tmp_flag=False
        )


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    date_info = CommonUtil.get_sys_arg(2, None)
    obj = DimBsrAsinRankHistory(site_name, date_info)
    obj.run()