import os from autofaiss import build_index from pyspark.sql import SparkSession # pylint: disable=import-outside-toplevel from pyspark import SparkConf, SparkContext def create_spark_session(): # this must be a path that is available on all worker nodes os.environ['PYSPARK_PYTHON'] = "/opt/module/spark/demo/py_demo/image_search/autofaiss.pex" spark = ( SparkSession.builder .config("spark.executorEnv.PEX_ROOT", "./.pex") .config("spark.executor.cores", "4") .config("spark.executor.memory", "20G") # make sure to increase this if you're using more cores per executor .config("spark.num.executors", "10") .config("spark.yarn.queue", "spark") .master("local") # this should point to your master node, if using the tunnelling version, keep this to localhost .appName("autofaiss-create-index") .getOrCreate() ) return spark spark = create_spark_session() index, index_infos = build_index( embeddings="hdfs://nameservice1:8020/home/image_search/parquet", distributed="pyspark", file_format="parquet", max_index_memory_usage="80G", # 16G current_memory_available="120G", # 24G temporary_indices_folder="hdfs://nameservice1:8020/home/image_search/tmp/distributed_autofaiss_indices", index_path="hdfs://nameservice1:8020/home/image_search/index/knn.index", index_infos_path="hdfs://nameservice1:8020/home/image_search/index/infos.json", ) print("index, index_infos:", index, index_infos)