import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from utils.db_util import DBUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F from yswg_utils.common_df import get_bsr_category_name_df """ 头部分类对应亚马逊分类ID """ class DimCategoryDescId(object): def __init__(self, site_name): app_name = f"{self.__class__.__name__}" self.spark = SparkUtil.get_spark_session(app_name) self.site_name = site_name self.hive_table = 'dim_category_desc_id' pass def run(self): sql = f""" select asin_category_desc, node_id from dim_cal_asin_history_detail where site_name = 'us' and node_id is not null and asin_category_desc is not null and asin_category_desc != '无' group by asin_category_desc, node_id """ df_save = self.spark.sql(sql) partition_dict = { "site_name": site_name } hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict) partition_by = list(partition_dict.keys()) HdfsUtils.delete_file_in_folder(hdfs_path) df_save.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=partition_by) print("success") pass if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) obj = DimCategoryDescId(site_name) obj.run()