import os import sys sys.path.append(os.path.dirname(sys.path[0])) from utils.db_util import DBUtil from pyspark.sql import functions as F from utils.common_util import CommonUtil from utils.hdfs_utils import HdfsUtils from utils.spark_util import SparkUtil """ 对 asin图片本地存储地址进行存档,同时进行预处理 """ class DimAsinImgPath(object): def __init__(self, site_name): self.site_name = site_name app_name = f"{self.__class__.__name__}:{site_name}" self.spark = SparkUtil.get_spark_session(app_name) self.hive_tb = "dim_asin_img_path" def run(self): sql = f""" select id, asin, asin_img_url, asin_img_path, created_at, bsr_cate_current_id from ods_asin_img_path where site_name = '{self.site_name}' and asin_img_url is not null and asin_img_url != 'null' and asin_img_path is not null """ print("======================查询sql如下======================") print(sql) df_save = self.spark.sql(sql) if df_save.first() == None: print("============================无数据跳过===================================") return path_sql = f""" select id as bsr_cate_current_id, category_id from us_bs_category """ conn_info = DBUtil.get_connection_info("mysql", "us") id_df = SparkUtil.read_jdbc_query( session=self.spark, url=conn_info["url"], pwd=conn_info["pwd"], username=conn_info["username"], query=path_sql ) # todo df_save = df_save.join(id_df, on='bsr_cate_current_id').select( F.col("id"), F.col("asin"), F.col("asin_img_url"), F.col("asin_img_path"), F.col("created_at"), df_save["bsr_cate_current_id"], F.col("category_id"), F.lit(self.site_name).alias("site_name"), ) partition_dict = { "site_name": self.site_name } hdfs_path = CommonUtil.build_hdfs_path(self.hive_tb, partition_dict=partition_dict) HdfsUtils.delete_hdfs_file(hdfs_path) partition_by = list(partition_dict.keys()) print(f"当前存储的表名为:{self.hive_tb},分区为{partition_by}", ) df_save.write.saveAsTable(name=self.hive_tb, format='hive', mode='append', partitionBy=partition_by) print("success") if __name__ == '__main__': site_name = CommonUtil.get_sys_arg(1, None) obj = DimAsinImgPath(site_name) obj.run()