script_template.py 1.55 KB
import os
import sys

sys.path.append(os.path.dirname(sys.path[0]))
from utils.common_util import CommonUtil
from utils.db_util import DBUtil
from utils.hdfs_utils import HdfsUtils

from utils.spark_util import SparkUtil
from pyspark.sql import functions as F
from yswg_utils.common_df import get_bsr_category_name_df

"""
头部分类对应亚马逊分类ID
"""


class DimCategoryDescId(object):

    def __init__(self, site_name):
        app_name = f"{self.__class__.__name__}"
        self.spark = SparkUtil.get_spark_session(app_name)
        self.site_name = site_name
        self.hive_table = 'dim_category_desc_id'
        pass

    def run(self):
        sql = f"""
            select asin_category_desc,
                   node_id
            from dim_cal_asin_history_detail
            where site_name = 'us'
              and node_id is not null
              and asin_category_desc is not null
              and asin_category_desc != '无'
            group by asin_category_desc, node_id
"""
        df_save = self.spark.sql(sql)

        partition_dict = {
            "site_name": site_name
        }
        hdfs_path = CommonUtil.build_hdfs_path(self.hive_table, partition_dict)
        partition_by = list(partition_dict.keys())
        HdfsUtils.delete_file_in_folder(hdfs_path)

        df_save.write.saveAsTable(name=self.hive_table, format='hive', mode='append', partitionBy=partition_by)
        print("success")
        pass


if __name__ == '__main__':
    site_name = CommonUtil.get_sys_arg(1, None)
    obj = DimCategoryDescId(site_name)
    obj.run()