import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.common_util import CommonUtil from yswg_utils.common_udf import udf_title_number_parse_reg from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil from pyspark.sql import functions as F """ 解析asin标题 获取标题中的信息 全量解析当前所有asin """ class DimAsinTitleParse(object): def __init__(self, site_name): self.site_name = site_name app_name = f"{self.__class__.__name__}:{site_name}" self.spark = SparkUtil.get_spark_session(app_name) self.udf_title_number_parse_reg = udf_title_number_parse_reg() self.hive_tb = "dim_asin_title_parse" hdfs_path = f"/home/{SparkUtil.DEF_USE_DB}/dim/{self.hive_tb}/site_name={self.site_name}" print(f"清除hdfs目录中.....{hdfs_path}") HdfsUtils.delete_hdfs_file(hdfs_path) def run(self): sql = f""" select asin, asin_title from dim_cal_asin_history_detail where site_name = '{self.site_name}' and asin_title is not null and asin_title != '' """ print("======================查询sql如下======================") print(sql) df_save = self.spark.sql(sql) df_save = df_save.select( "asin", self.udf_title_number_parse_reg("asin_title").alias("title_num_json"), F.lit(self.site_name).alias("site_name"), ).where("title_num_json is not null") # 分区数量调整为2个 df_save = df_save.repartition(6) partition_by = ["site_name"] print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) obj = DimAsinTitleParse(site_name) obj.run()