create_parquet_to_slice.py 3.6 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
import os
import sys

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from sqlalchemy import create_engine
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from pyspark.storagelevel import StorageLevel
from utils.templates import Templates
# from ..utils.templates import Templates
from pyspark.sql.window import Window
from pyspark.sql import functions as F


class CreateParquet(Templates):

    def __init__(self):
        super(CreateParquet, self).__init__()
        self.df_features = pd.DataFrame()
        self.db_save = 'create_parquet'
        self.spark = self.create_spark_object(app_name=f"{self.db_save}")

    def read_data(self):
        # sql = f"select id, img_vector as features from us_asin_extract_features;"
        # self.df_features = pd.read_sql(sql, con=self.engine)

        sql = f"select id, img_vector as embedding from ods_asin_extract_features limit 10000;"
        print("sql:", sql)
        self.df_features = self.spark.sql(sql).cache()
        # 添加索引列
        window = Window.orderBy("id")
        self.df_features = self.df_features.withColumn("index", F.row_number().over(window) - 1)  # 从0开始
        self.df_features.show(20, truncate=False)
        # self.df_features = self.df_features.cache()
        # 定义窗口按id排序

    def handle_data(self):
        # 假设你的DataFrame中有一个名为'id'的列,它的值是唯一的并且是从1开始的递增的整数。
        # 'block'列将每200000个'id'值放入一个块。
        self.df_features = self.df_features.withColumn('block', F.floor(self.df_features['index'] / 200))
        self.df_features.show()
        # 用 'block' 列进行分区写入
        self.df_features.write.mode('overwrite').partitionBy('block').parquet('/home/ffman/parquet/files')


        # self.df_features.write.mode('overwrite').parquet("/home/ffman/parquet/image.parquet")
        # df_count = self.df_features.count()
        # df = self.df_features.filter("index < 200000").select("embedding")
        # print("df.count():", df_count, df.count())
        # df = df.toPandas()
        # df.embedding = df.embedding.apply(lambda x: eval(x))
        # table = pa.Table.from_pandas(df)
        # pq.write_table(table, "/root/part1.parquet")

        # df_count = self.df_features.count()
        # df_count = 35000000
        # image_num = df_count
        #
        # # os.makedirs("my_parquet", exist_ok=True)
        # step = 200000
        # index_list = list(range(0, image_num, step))
        # file_id_list = [f"{i:04}" for i in range(len(index_list))]
        # print("index_list:", index_list)
        # print("file_id_list:", file_id_list)
        # for index, flie_id in zip(index_list, file_id_list):
        #     df_part = self.df_features.filter(f"index >= {index} and index < {index+step}").select("embedding")
        #     df_part = df_part.toPandas()
        #     table = pa.Table.from_pandas(df=df_part)
        #     file_name = f"/mnt/ffman/my_parquet/part_{flie_id}.parquet"
        #     pq.write_table(table, file_name)
        #     print("df_part.shape, index, file_name:", df_part.shape, index, file_name)
        pass

    def save_data(self):
        # # 设置分区大小为200000
        # self.spark.conf.set("spark.sql.files.maxRecordsPerFile", 200)
        #
        # # 将数据存储为Parquet格式
        # self.df_features.write.partitionBy("index").parquet("/home/ffman/parquet/files")
        pass

    def run(self):
        self.read_data()
        self.handle_data()
        self.save_data()


if __name__ == '__main__':
    handle_obj = CreateParquet()
    handle_obj.run()