import os import sys import pandas as pd os.environ["PYARROW_IGNORE_TIMEZONE"] = "1" sys.path.append(os.path.dirname(sys.path[0])) # 上级目录 from utils.templates import Templates # from ..utils.templates import Templates from py4j.java_gateway import java_import from utils.db_util import DbTypes, DBUtil class ImgHdfsIndex(Templates): def __init__(self, site_name='us', img_type=1): super(ImgHdfsIndex, self).__init__() self.site_name = site_name self.img_type = img_type self.engine_srs = DBUtil.get_db_engine(db_type=DbTypes.srs.name, site_name=self.site_name) self.db_save = f'img_hdfs_index' self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}") self.df_features = self.spark.sql(f"select 1+1;") self.df_save = pd.DataFrame() self.hdfs_file_path = f'hdfs://nameservice1:8020/home/big_data_selection/dim/image_dim_features_slice/site_name={self.site_name}/' self.hdfs_file_path = f'hdfs://nameservice1:8020/' # self.hdfs_file_path = f'hdfs://192.168.200.210:8020/home/big_data_selection/dim/image_dim_features_slice/site_name={self.site_name}/' self.hdfs_file_list = self.get_hdfs_file_list() self.index_count = 0 # def get_hdfs_file_list(self): # # 导入hadoop的包 # java_import(self.spark._jvm, 'org.apache.hadoop.fs.Path') # # fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration(self.hdfs_file_path)) # # status = fs.listStatus(self.spark._jvm.org.apache.hadoop.fs.Path()) # # fs = self.spark._jvm.org.apache.hadoop.fs.FileSystem.get(self.spark._jsc.hadoopConfiguration()) # path = self.spark._jvm.org.apache.hadoop.fs.Path(self.hdfs_file_path) # status = fs.listStatus(path) # # hdfs_file_list = [file_status.getPath().getName() for file_status in status] # return hdfs_file_list def get_hdfs_file_list(self): # 使用 os.system 执行 hdfs dfs -ls 命令 command = f"hdfs dfs -ls /home/big_data_selection/dim/img_dim_features_slice/site_name={self.site_name}/img_type={self.img_type}" result = os.popen(command).read() # 解析命令输出 file_list = [] for line in result.split('\n'): if line: parts = line.split() if len(parts) > 7: file_path = parts[-1] file_list.append(file_path) print(f"file_list: {(len(file_list))}", file_list) return file_list def read_data(self, hdfs_path): df = self.spark.read.text(hdfs_path) index_count = df.count() return df, index_count def handle_data(self): pass def save_data(self): self.df_save.to_sql(self.db_save, con=self.engine_srs, if_exists="append", index=False) def run(self): data_list = [] for hdfs_file in self.hdfs_file_list: index = self.hdfs_file_list.index(hdfs_file) hdfs_path = self.hdfs_file_path + hdfs_file df, index_count = self.read_data(hdfs_path) data_list.append([index, hdfs_path, index_count, self.index_count]) print([index, hdfs_path, index_count, self.index_count]) self.index_count += index_count self.df_save = pd.DataFrame(data=data_list, columns=['index', 'hdfs_path', 'current_counts', 'all_counts']) self.df_save["site_name"] = self.site_name self.df_save["img_type"] = self.img_type # self.df_save.to_csv("/root/hdfs_parquet_block_info.csl") self.save_data() if __name__ == '__main__': # site_name = sys.argv[1] # 参数1:站点 # img_type = int(sys.argv[2]) # 参数1:图片类型来源 site_name = 'us' img_type = 'amazon_inv' handle_obj = ImageHdfsIndex(site_name=site_name, img_type=img_type) handle_obj.run()