faiss_search.py 6.61 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193
# import os
# import re
# import sys
# import faiss
#
# import pandas as pd
# import numpy as np
# sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
# from pyspark.storagelevel import StorageLevel
# from utils.templates import Templates
# from ..utils.templates import Templates
#
# class FaissSearch(Templates):
#
#     def __init__(self, site_name='us'):
#         super(FaissSearch, self).__init__()
#         self.site_name = site_name
#         self.db_save = f'faiss_search'
#         self.spark = self.create_spark_object(app_name=f"{self.db_save}: {self.site_name}")
#         self.df_features = self.spark.sql(f"select 1+1;")
#         self.df_save = self.spark.sql(f"select 1+1;")
#         self.query_vectors = np.random.rand(512)
#
#     def read_data(self):
#         # 创建一个包含两列的 DataFrame
#         df_features = pd.DataFrame({
#             'id': range(1, 1001),  # 假设我们有 1000 张图片,每张图片的 ID 从 1 到 100
#             'features': [np.random.rand(512) for _ in range(100)]  # 每个特征向量是一个长度为 512 的随机向量
#         })
#         self.df_features = self.spark.createDataFrame(df_features)
#
#     def handle_data(self):
#         # 2. 对 DataFrame 进行分区
#         # 添加一个新列,表示分区号
#         partition_size = 1000000  # 每个分区的大小
#         self.df_features = self.df_features.withColumn('partition_id', (self.df_features['id'] / partition_size).cast('integer'))
#
#         # 使用 repartition 方法对 DataFrame 进行分区
#         self.df_features = self.df_features.repartition('partition_id')
#
#         # 3. 在每个分区上创建 FAISS 索引
#         def create_index(partition):
#             # 每个分区创建一个索引
#             d = 512  # 特征向量的维度
#             index = faiss.IndexFlatL2(d)
#
#             # 将分区中的向量添加到索引
#             for row in partition:
#                 index.add(np.array(row['features'], dtype=np.float32).reshape(-1, d))
#
#             return [(index, row['id']) for row in partition]
#
#         rdd = self.df_features.rdd.mapPartitions(create_index)
#
#         # 4. 对查询向量进行广播,然后在每个分区上执行查询
#         # query_vectors = ...  # 这是一个 numpy 数组,包含查询向量
#         query_vectors_broadcast = self.spark.sparkContext.broadcast(self.query_vectors)
#
#         def search(partition):
#             # 每个分区执行查询
#             for index, _ in partition:
#                 D, I = index.search(query_vectors_broadcast.value, k)
#                 yield (D, I)
#
#         results = rdd.mapPartitions(search).collect()
#
#         # 5. 合并结果
#         D, I = zip(*results)
#         D = np.concatenate(D)
#         I = np.concatenate(I)
#
#         # 对结果进行排序并取前 k 个结果
#         indices = np.argsort(D, axis=1)[:, :k]
#         D = np.take_along_axis(D, indices, axis=1)
#         I = np.take_along_axis(I, indices, axis=1)

import pandas as pd
from sqlalchemy import create_engine
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark import pandas as pdf
from pyspark.sql import Row
import faiss

# 创建一个包含两列的 DataFrame
# df_features = pd.DataFrame({
#     'id': range(1, 101),  # 假设我们有 100 张图片,每张图片的 ID 从 1 到 100
#     'features': [np.random.rand(512).tolist() for _ in range(100)]  # 每个特征向量是一个长度为 512 的随机向量
# })
# data = []
# for i in range(1, 101):
#     data.append({
#         'id': i,
#         'features': np.random.rand(512).tolist()
#     })
#
# df_features = pdf.DataFrame(data)
import sys
import os
sys.path.append(os.path.dirname(sys.path[0]))  # 上级目录
from utils.templates import Templates
# from ..utils.templates import Templates

spark = Templates().create_spark_object(app_name=f"faiss")
sql = "select * from ods_brand_analytics limit 100"
df = spark.sql(sql)
df.show(10)

Mysql_arguments = {
    'user': 'adv_yswg',
    'password': 'HCL1zcUgQesaaXNLbL37O5KhpSAy0c',
    'host': 'rm-wz9yg9bsb2zf01ea4yo.mysql.rds.aliyuncs.com',
    'port': 3306,
    'database': 'selection',
    'charset': 'utf8mb4',
}
def get_country_engine(site_name="us"):
    if site_name == 'us':
        db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*Mysql_arguments.values())
    else:
        Mysql_arguments["database"] = f"selection_{site_name}"
        db_ = 'mysql+pymysql://{}:{}@{}:{}/{}?charset={}'.format(*Mysql_arguments.values())
    engine = create_engine(db_)  # , pool_recycle=3600
    return engine
engine = get_country_engine()
sql = f"select id, img_vector as features from us_asin_extract_features limit 100;"
df_features = pd.read_sql(sql, con=engine)


# 1. 创建一个 SparkSession
# spark = SparkSession.builder.getOrCreate()

# 将 Pandas DataFrame 转换为 PySpark DataFrame
df_spark = spark.createDataFrame(df_features)


# 2. 对 DataFrame 进行分区
# 添加一个新列,表示分区号
partition_size = 100  # 每个分区的大小
df_spark = df_spark.withColumn('partition_id', (df_spark['id'] / partition_size).cast('integer'))

# 使用 repartition 方法对 DataFrame 进行分区
df_spark = df_spark.repartition('partition_id')


# 3. 在每个分区上创建 FAISS 索引
def create_index(partition):
    # 每个分区创建一个索引
    d = 512  # 特征向量的维度
    nlist = 100
    k = 4
    quantizer = faiss.IndexFlatL2(d)
    index = faiss.IndexIVFFlat(quantizer, d, nlist)

    # 将分区中的向量添加到索引
    data = [np.array(row.features, dtype=np.float32) for row in partition]
    ids = [row.id for row in partition]
    index.train(np.array(data))
    index.add_with_ids(np.array(data), np.array(ids, dtype=np.int64))

    return [(index, row.id) for row in partition]


rdd = df_spark.rdd.mapPartitions(create_index)

# 4. 对查询向量进行广播,然后在每个分区上执行查询
query_vectors = np.random.rand(1, 512).astype('float32')  # 这是一个 numpy 数组,包含查询向量
query_vectors_broadcast = spark.sparkContext.broadcast(query_vectors)


def search(partition):
    # 每个分区执行查询
    for index, _ in partition:
        D, I = index.search(query_vectors_broadcast.value, 4)
        yield (D, I)


results = rdd.mapPartitions(search).collect()

# 5. 合并结果
D, I = zip(*results)
D = np.concatenate(D)
I = np.concatenate(I)

# 对结果进行排序并取前 k 个结果
k = 4
indices = np.argsort(D, axis=1)[:, :k]
D = np.take_along_axis(D, indices, axis=1)
I = np.take_along_axis(I, indices, axis=1)

print("查询向量的最近邻的id:", I)
print("查询向量的最近邻的距离:", D)