image_create_index.py 1.52 KB
Newer Older
chenyuanjie committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
import os

from autofaiss import build_index
from pyspark.sql import SparkSession  # pylint: disable=import-outside-toplevel

from pyspark import SparkConf, SparkContext


def create_spark_session():
    # this must be a path that is available on all worker nodes

    os.environ['PYSPARK_PYTHON'] = "/opt/module/spark/demo/py_demo/image_search/autofaiss.pex"
    spark = (
        SparkSession.builder
            .config("spark.executorEnv.PEX_ROOT", "./.pex")
            .config("spark.executor.cores", "4")
            .config("spark.executor.memory", "20G")  # make sure to increase this if you're using more cores per executor
            .config("spark.num.executors", "10")
            .config("spark.yarn.queue", "spark")
            .master("local")  # this should point to your master node, if using the tunnelling version, keep this to localhost
            .appName("autofaiss-create-index")
            .getOrCreate()
    )
    return spark


spark = create_spark_session()

index, index_infos = build_index(
    embeddings="hdfs://nameservice1:8020/home/image_search/parquet",
    distributed="pyspark",
    file_format="parquet",
    max_index_memory_usage="80G",  # 16G
    current_memory_available="120G",  # 24G
    temporary_indices_folder="hdfs://nameservice1:8020/home/image_search/tmp/distributed_autofaiss_indices",
    index_path="hdfs://nameservice1:8020/home/image_search/index/knn.index",
    index_infos_path="hdfs://nameservice1:8020/home/image_search/index/infos.json",
)
print("index, index_infos:", index, index_infos)