1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import os
from autofaiss import build_index
from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel
from pyspark import SparkConf, SparkContext
def create_spark_session():
# this must be a path that is available on all worker nodes
os.environ['PYSPARK_PYTHON'] = "/opt/module/spark/demo/py_demo/image_search/autofaiss.pex"
spark = (
SparkSession.builder
.config("spark.executorEnv.PEX_ROOT", "./.pex")
.config("spark.executor.cores", "4")
.config("spark.executor.memory", "20G") # make sure to increase this if you're using more cores per executor
.config("spark.num.executors", "10")
.config("spark.yarn.queue", "spark")
.master("local") # this should point to your master node, if using the tunnelling version, keep this to localhost
.appName("autofaiss-create-index")
.getOrCreate()
)
return spark
spark = create_spark_session()
index, index_infos = build_index(
embeddings="hdfs://nameservice1:8020/home/image_search/parquet",
distributed="pyspark",
file_format="parquet",
max_index_memory_usage="80G", # 16G
current_memory_available="120G", # 24G
temporary_indices_folder="hdfs://nameservice1:8020/home/image_search/tmp/distributed_autofaiss_indices",
index_path="hdfs://nameservice1:8020/home/image_search/index/knn.index",
index_infos_path="hdfs://nameservice1:8020/home/image_search/index/infos.json",
)
print("index, index_infos:", index, index_infos)