From c9b0f72c94c00163c8b49e767b09cd9ac3cfb730 Mon Sep 17 00:00:00 2001 From: AnnaWheel Date: Wed, 1 Jan 2025 01:04:44 +0800 Subject: [PATCH 1/2] add SPTAG --- bigvectorbench/algorithms/sptag/Dockerfile | 34 ++++ bigvectorbench/algorithms/sptag/config.yml | 14 ++ .../algorithms/sptag/docker-compose.yml | 23 +++ bigvectorbench/algorithms/sptag/module.py | 190 ++++++++++++++++++ 4 files changed, 261 insertions(+) create mode 100644 bigvectorbench/algorithms/sptag/Dockerfile create mode 100644 bigvectorbench/algorithms/sptag/config.yml create mode 100644 bigvectorbench/algorithms/sptag/docker-compose.yml create mode 100644 bigvectorbench/algorithms/sptag/module.py diff --git a/bigvectorbench/algorithms/sptag/Dockerfile b/bigvectorbench/algorithms/sptag/Dockerfile new file mode 100644 index 0000000..238c284 --- /dev/null +++ b/bigvectorbench/algorithms/sptag/Dockerfile @@ -0,0 +1,34 @@ +FROM bigvectorbench-base + +WORKDIR /home/app + +RUN set GIT_LFS_SKIP_SMUDGE=1 +RUN git clone --recurse-submodules https://github.com/microsoft/SPTAG + +WORKDIR /home/app/SPTAG/ThirdParty +RUN rm -rf zstd +RUN git clone https://github.com/facebook/zstd.git + +WORKDIR /home/app + +RUN apt-get update && apt-get -y install wget build-essential cmake libboost-all-dev libtbb-dev software-properties-common swig + +# Patch https://github.com/microsoft/SPTAG/issues/243 +# RUN cd SPTAG && \ +# wget -qO- https://github.com/pabs3/SPTAG/commit/bd9c25d1409325ac45ebeb7f1e8fc87d03ec478c.patch | git apply && \ +# cd .. + +# SPTAG defaults to Python 2 if it's found on the system, so as a hack, we remove it. See https://github.com/microsoft/SPTAG/blob/master/Wrappers/CMakeLists.txt +RUN apt-get -y remove libpython2.7 + +# Compile +RUN cd SPTAG && mkdir build && cd build && cmake .. && make && cd .. + +# so python can find the SPTAG module +ENV PYTHONPATH=/home/app/SPTAG/Release +# RUN python3 -c 'import SPTAG' + +# COPY bigvectorbench/algorithms/sptag/docker-compose.yml ./SPTAG +# COPY docker-compose.yml ./SPTAG +# COPY bigvectorbench/algorithms/sptag/Dockerfile.builder ./SPTAG + diff --git a/bigvectorbench/algorithms/sptag/config.yml b/bigvectorbench/algorithms/sptag/config.yml new file mode 100644 index 0000000..0f57f22 --- /dev/null +++ b/bigvectorbench/algorithms/sptag/config.yml @@ -0,0 +1,14 @@ +float: + euclidean: + - base_args: ["@metric", "@dimension"] + constructor: SPTAGBKT + disabled: False + docker_tag: bigvectorbench-sptag + module: bigvectorbench.algorithms.sptag + name: sptag-bkt + run_groups: + BKT: + args: + nlist: [64] + query_args: [[100]] +# , 128, 256, 512, 1024, 2048, 4096, 8192 \ No newline at end of file diff --git a/bigvectorbench/algorithms/sptag/docker-compose.yml b/bigvectorbench/algorithms/sptag/docker-compose.yml new file mode 100644 index 0000000..1096a1e --- /dev/null +++ b/bigvectorbench/algorithms/sptag/docker-compose.yml @@ -0,0 +1,23 @@ +version: '3.8' + +services: + builder: + build: + context: . + dockerfile: Dockerfile + volumes: + - .:/app + command: /bin/bash -c "apt-get update && apt-get -y install wget build-essential swig cmake git libnuma-dev python3.8-dev python3-distutils gcc-8 g++-8 libboost-filesystem-dev libboost-test-dev libboost-serialization-dev libboost-regex-dev libboost-serialization-dev libboost-regex-dev libboost-thread-dev libboost-system-dev && wget https://bootstrap.pypa.io/get-pip.py && python3.8 get-pip.py && python3.8 -m pip install numpy && export CC=/usr/bin/gcc-8 && export CXX=/usr/bin/g++-8 && mkdir build && cd build && cmake .. && make -j && cd .." + + app: + build: + context: . + dockerfile: Dockerfile + volumes: + - .:/app + environment: + - DEBIAN_FRONTEND=noninteractive + - PYTHONPATH=/app/Release + command: /bin/bash + depends_on: + - builder \ No newline at end of file diff --git a/bigvectorbench/algorithms/sptag/module.py b/bigvectorbench/algorithms/sptag/module.py new file mode 100644 index 0000000..14a2cf1 --- /dev/null +++ b/bigvectorbench/algorithms/sptag/module.py @@ -0,0 +1,190 @@ +""" SPTAG module for BigVectorBench framework. """ + +import subprocess +from time import sleep +import numpy as np +import SPTAG +from bigvectorbench.algorithms.base.module import BaseANN +import csv +import shutil + +def metric_mapping(_metric: str): + """ + Mapping metric type to SPTAG distance metric + + Args: + _metric (str): metric type + + Returns: + str: SPTAG distance metric type + """ + _metric = _metric.lower() + _metric_type = { + "angular": "Cosine", + "euclidean": "L2", + }.get(_metric, None) + if _metric_type is None: + raise ValueError(f"[SPTAG] Not support metric type: {_metric}!!!") + return _metric_type + +class SPTAGBase(BaseANN): + """SPTAG implementation""" + + def __init__(self, metric: str, dim: int): + self._metric = metric + self._dim = dim + self._metric_type = metric_mapping(metric) + self._database_name = "SPTAG_test" + + self.num_labels = 0 + self.label_names = [] + self.load_batch_size = 1000 + self.query_batch_size = 100 + + self.name = f"SPTAG Base metric:{self._metric}" + self.search_params = None + + self.query_vector = None + self.query_topk = 0 + self.query_filter = None + self.prepare_query_results = None + self.batch_search_queries = [] + self.batch_results = [] + self.batch_latencies = [] + + def get_index_param(self) -> dict: + """ + Get index parameters + + Note: This is a placeholder method to be implemented by subclasses. + """ + raise NotImplementedError() + + def load_data( + self, + embeddings: np.array, + labels: np.ndarray | None = None, + label_names: list[str] | None = None, + label_types: list[str] | None = None, + ) -> None: + num_labels = len(label_names) if label_names is not None else 0 + self.num_labels = num_labels + self.label_names = label_names + print(f"[SPTAG] load data with {num_labels} labels!!!") + self.num_entities = len(embeddings) + self.index_name = "bvb_index" + self.index_type = self.get_index_param() + print(self.index_type) + print(self._metric_type) + self.index = SPTAG.AnnIndex(self.index_type, 'Float', embeddings.shape[1]) + a = self.index.SetBuildParam("NumberOfThreads", '4', "Index") + b = self.index.SetBuildParam("DistCalcMethod", self._metric_type, "Index") + # self.index.SetBuildParam("NumberOfThreads", '4') + # self.index.SetBuildParam("DistCalcMethod", self._metric_type) + # print(self.index,a,b) + print("[SPTAG] create index successfully!!!") + # print(embeddings.shape) + print() + print(f"[SPTAG] Start uploading {len(embeddings)} vectors...") + # while not self.index.Build(embeddings.tobytes(), embeddings.shape[0], False): + # continue + # # print(f"[SPTAG] Waiting for Uploading vectors successfully!!!") + print(embeddings.tobytes()) + ret = self.index.Build(embeddings.tobytes(), embeddings.shape[0], False) + print(ret) + self.index.Save(self.index_name) + print(f"[SPTAG] Uploaded {len(embeddings)} vectors successfully!!!") + # sleep(30) + + + def create_index(self): + """SPTAG has already started indexing when inserting data""" + pass + + def set_query_arguments(self): + """ + Set query arguments for SPTAG query with hnsw index + """ + raise NotImplementedError() + + def query(self, v, n, filter_expr=None): + if filter_expr is not None: + raise ValueError( f"[SPTAG] have not supported filter-query!!!" ) + j = SPTAG.AnnIndex.Load(self.index_name) + # print(j.Search(v.tobytes(), n)[0]) + # print(j.Search(v.tobytes(), n)[1]) + # print(j.Search(v.tobytes(), n)[2]) + return j.Search(v.tobytes(), n)[0] + + def done(self): + # shutil.rmtree(self.index_name) + print("[SPTAG] index deleted successfully!!!") + + def insert(self, embeddings: np.ndarray, labels: np.ndarray | None = None) -> None: + """ + Single insert data + + Args: + embeddings (np.ndarray): embeddings + labels (np.ndarray): labels + + Returns: + None + """ + j = SPTAG.AnnIndex.Load(self.index_name) + j.Add(embeddings.tobytes(), embeddings.shape[0], False) + j.Save(self.index_name) + + # def update( + # self, index: int, embeddings: np.ndarray, labels: np.ndarray | None = None + # ) -> None: + # """ + # Single update data + + # Args: + # index (int): index to update + # embeddings (np.ndarray): embeddings + # labels (np.ndarray): labels + + # Returns: + + # None + # """ + + # def delete( + # self, + # index: int, + # ) -> None: + # """ + # Single delete data + + # Args: + # index (int): index to delete + + # Returns: + # None + # """ + # sptag only support delete data by vector not idx + +class SPTAGBKT(SPTAGBase): + """SPTAG HNSW implementation""" + + def __init__(self, metric: str, dim: int, index_param: dict): + super().__init__(metric, dim) + self._nlinks = index_param.get("nlinks", 32) + self._efConstruction = index_param.get("efConstruction", 40) + + def get_index_param(self): + return 'BKT' + + + def set_query_arguments(self, efSearch: int = 40): + """ + Set query arguments for SPTAG query with BKT index + """ + self.search_params = { + "metric_type": self._metric_type, + "efSearch": efSearch, + } + self.name = f"SPTAG BKT metric:{self._metric}, nlinks:{self._nlinks}, efConstruction:{self._efConstruction}, efSearch:{efSearch}" + # self.index.SetSearchParam("MaxCheck", str(efSearch), "Index") From 73d15b04986f389919ef7f390cc2ceae77691e00 Mon Sep 17 00:00:00 2001 From: AnnaWheel Date: Wed, 1 Jan 2025 01:35:54 +0800 Subject: [PATCH 2/2] add PGVector --- bigvectorbench/algorithms/pgvector/Dockerfile | 43 ++++ bigvectorbench/algorithms/pgvector/config.yml | 48 ++++ bigvectorbench/algorithms/pgvector/module.py | 209 ++++++++++++++++++ 3 files changed, 300 insertions(+) create mode 100644 bigvectorbench/algorithms/pgvector/Dockerfile create mode 100644 bigvectorbench/algorithms/pgvector/config.yml create mode 100644 bigvectorbench/algorithms/pgvector/module.py diff --git a/bigvectorbench/algorithms/pgvector/Dockerfile b/bigvectorbench/algorithms/pgvector/Dockerfile new file mode 100644 index 0000000..4fc8c1b --- /dev/null +++ b/bigvectorbench/algorithms/pgvector/Dockerfile @@ -0,0 +1,43 @@ +FROM bigvectorbench-base + +ENV TZ=Asia/Shanghai +RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo '$TZ' > /etc/timezone + +RUN git clone https://github.com/pgvector/pgvector /tmp/pgvector +RUN DEBIAN_FRONTEND=noninteractive apt-get -y install tzdata + +RUN apt-get update && apt-get install -y --no-install-recommends build-essential postgresql-common +RUN /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh -y +RUN apt-get install -y --no-install-recommends postgresql-16 postgresql-server-dev-16 +RUN pip install psutil +RUN sh -c 'echo "local all all trust" > /etc/postgresql/16/main/pg_hba.conf' + +# Dynamically set OPTFLAGS based on the architecture +RUN ARCH=$(uname -m) && \ + if [ "$ARCH" = "aarch64" ]; then \ + OPTFLAGS="-march=native -msve-vector-bits=512"; \ + elif [ "$ARCH" = "x86_64" ]; then \ + OPTFLAGS="-march=native -mprefer-vector-width=512"; \ + else \ + OPTFLAGS="-march=native"; \ + fi && \ + cd /tmp/pgvector && \ + make clean && \ + make OPTFLAGS="$OPTFLAGS" && \ + make install + +USER postgres +RUN service postgresql start && \ + psql -c "CREATE USER bvb WITH ENCRYPTED PASSWORD 'bvb'" && \ + psql -c "CREATE DATABASE bvb" && \ + psql -c "GRANT ALL PRIVILEGES ON DATABASE bvb TO bvb" && \ + psql -d bvb -c "GRANT ALL ON SCHEMA public TO bvb" && \ + psql -d bvb -c "CREATE EXTENSION vector" && \ + psql -c "ALTER USER bvb SET maintenance_work_mem = '4GB'" && \ + psql -c "ALTER USER bvb SET max_parallel_maintenance_workers = 0" && \ + psql -c "ALTER SYSTEM SET shared_buffers = '4GB'" +USER root + +RUN pip install psycopg[binary] pgvector + +# ENTRYPOINT ["bash"] diff --git a/bigvectorbench/algorithms/pgvector/config.yml b/bigvectorbench/algorithms/pgvector/config.yml new file mode 100644 index 0000000..3439f18 --- /dev/null +++ b/bigvectorbench/algorithms/pgvector/config.yml @@ -0,0 +1,48 @@ +float: + any: + - base_args: ["@metric"] + constructor: PGVectorHNSW + disabled: false + docker_tag: bigvectorbench-pgvector + module: bigvectorbench.algorithms.pgvector + name: pgvector-hnsw + run_groups: + M-16: + arg_groups: [{M: 16, efConstruction: 200}] + # args: {} + query_args: [[10, 20, 40, 80, 120, 200, 400, 800]] + M-24: + arg_groups: [{M: 24, efConstruction: 200}] + # args: {} + query_args: [[10, 20, 40, 80, 120, 200, 400, 800]] + - base_args: ["@metric"] + constructor: PGVectorIVFFLAT + disabled: false + docker_tag: bigvectorbench-pgvector + module: bigvectorbench.algorithms.pgvector + name: pgvector-ivfflat + run_groups: + IVFFLAT_32: + arg_groups: [{ nlist: 32 }] + query_args: [[1, 4, 8, 16, 32]] + # IVFFLAT_64: + # arg_groups: [{ nlist: 64 }] + # query_args: [[4, 16, 32, 48, 64]] + # IVFFLAT_128: + # arg_groups: [{ nlist: 128 }] + # query_args: [[8, 32, 64, 96, 128]] + # IVFFLAT_512: + # arg_groups: [{ nlist: 512 }] + # query_args: [[32, 128, 256, 384, 512]] + # IVFFLAT_1024: + # arg_groups: [{ nlist: 1024 }] + # query_args: [[64, 256, 512, 768, 1024]] + # IVFFLAT_2048: + # arg_groups: [{ nlist: 2048 }] + # query_args: [[128, 512, 1024, 1536, 2048]] + # IVFFLAT_4096: + # arg_groups: [{ nlist: 4096 }] + # query_args: [[256, 1024, 2048, 3072, 4096]] + # IVFFLAT_8192: + # arg_groups: [{ nlist: 8192 }] + # query_args: [[512, 2048, 4096, 6144, 8192]] \ No newline at end of file diff --git a/bigvectorbench/algorithms/pgvector/module.py b/bigvectorbench/algorithms/pgvector/module.py new file mode 100644 index 0000000..cd66587 --- /dev/null +++ b/bigvectorbench/algorithms/pgvector/module.py @@ -0,0 +1,209 @@ +""" Pgvector module for BigVectorBench framework. """ + +import subprocess +import sys +import numpy as np +import pgvector.psycopg +import psycopg +import os + +from bigvectorbench.algorithms.base.module import BaseANN + +class PGVector(BaseANN): + def __init__(self, metric, method_param): + self._metric = metric + self._m = method_param['M'] + self._ef_construction = method_param['efConstruction'] + self._cur = None + self.labels = None + self.label_names = None + self.label_types = None + self.index = self.get_vector_index() + + if metric == "angular": + self._query = "SELECT id FROM items ORDER BY embedding <=> %s LIMIT %s" + elif metric == "euclidean": + self._query = "SELECT id FROM items ORDER BY embedding <-> %s LIMIT %s" + else: + raise RuntimeError(f"Unknown metric {metric}") + + + def get_vector_index(self): + """Get vector index""" + raise NotImplementedError() + + def load_data( + self, + embeddings: np.array, + labels: np.ndarray | None = None, + label_names: list[str] | None = None, + label_types: list[str] | None = None, + ) -> None: + subprocess.run("service postgresql start", shell=True, check=True, stdout=sys.stdout, stderr=sys.stderr) + conn = psycopg.connect(user="bvb", password="bvb", dbname="bvb", autocommit=True) + pgvector.psycopg.register_vector(conn) + cur = conn.cursor() + cur.execute("DROP TABLE IF EXISTS items") + self.label_names = label_names + if labels is not None and label_names is not None and label_types is not None: + pg_types = ['integer' if t == 'int32' else t for t in label_types] + additional_columns = ', '.join(f"{name} integer" for name in label_names) + table_definition = f"id integer, embedding vector({embeddings.shape[1]}), {additional_columns}" + else: + table_definition = f"id integer, embedding vector({embeddings.shape[1]})" + cur.execute(f"CREATE TABLE items ({table_definition})") + cur.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN") + + if labels is not None and label_names is not None: + with cur.copy(f"COPY items (id, embedding, {', '.join(label_names)}) FROM STDIN WITH (FORMAT BINARY)") as copy: + copy.set_types(["int4", "vector"] + ["int4" for _ in label_names]) + for i, embedding in enumerate(embeddings): + copy.write_row((i, embedding) + tuple(int(x) for x in labels[i])) + else: + with cur.copy("COPY items (id, embedding) FROM STDIN WITH (FORMAT BINARY)") as copy: + copy.set_types(["int4", "vector"]) + for i, embedding in enumerate(embeddings): + copy.write_row((i, embedding)) + + print("Creating index...") + + if self._metric == "angular": + cur.execute("CREATE INDEX ON items USING %s (embedding vector_cosine_ops) WITH (m = %d, ef_construction = %d)" % (self.index,self._m, self._ef_construction)) + elif self._metric == "euclidean": + cur.execute("CREATE INDEX ON items USING %s (embedding vector_l2_ops) WITH (m = %d, ef_construction = %d)" % (self.index,self._m, self._ef_construction)) + else: + raise RuntimeError(f"Unknown metric {self._metric}") + + print("Done!") + self._cur = cur + + def parse_filter_expr(self, filter_expr: str) -> str: + """Parse filter expression and return SQL WHERE clause""" + + print(f"Received filter expression: {filter_expr}") + return filter_expr + + def set_query_arguments(self, ef_search): + self._ef_search = ef_search + self._cur.execute("SET %s.ef_search = %d" % (self.index,ef_search)) + + def query(self, v: np.array, n: int, filter_expr: str | None = None) -> list[int]: + if filter_expr: + filter_expr = filter_expr.replace("==", "=") + query = f""" + SELECT id FROM items + WHERE {filter_expr} + ORDER BY embedding <-> %s + LIMIT %s + """ + else: + query = self._query + self._cur.execute(query, (v, n), binary=True, prepare=True) + return [id for id, in self._cur.fetchall()] + + def get_memory_usage(self): + if self._cur is None: + return 0 + self._cur.execute("SELECT pg_relation_size('items_embedding_idx')") + return self._cur.fetchone()[0] / 1024 + + def __str__(self): + return f"PGVector(m={self._m}, ef_construction={self._ef_construction}, ef_search={self._ef_search})" + + def insert(self, embeddings: np.ndarray, labels: np.ndarray | None = None) -> None: + """ + Single insert data + + Args: + embeddings (np.ndarray): embeddings + labels (np.ndarray): labels + + Returns: + None + """ + if labels is not None and self.label_names is not None: + insert_sentence = (f"INSERT INTO items (id,embedding,{', '.join(self.label_names)}) VALUES ({self.num_entities+1},{embeddings},{', '.join(labels)})") + else: + insert_sentence = (f"INSERT INTO items (id,embedding) VALUES ({self.num_entities+1},{embeddings}") + self._cur.execute(insert_sentence) + self.num_entities += 1 + + def update( + self, index: int, embeddings: np.ndarray, labels: np.ndarray | None = None + ) -> None: + """ + Single update data + + Args: + index (int): index to update + embeddings (np.ndarray): embeddings + labels (np.ndarray): labels + + Returns: + None + """ + update_item = (f"embeddings = {embeddings},") + if labels is not None and self.label_names is not None: + for i in enumerate(self.label_names): + update_item += f"{self.label_names[i]} = {labels[i]}" + update_sentence = (f"UPDATE items SET {update_item} where id = {index}") + + self._cur.execute(update_sentence) + + def delete( + self, + index: int, + ) -> None: + """ + Single delete data + + Args: + index (int): index to delete + + Returns: + None + """ + delete_sentence = (f"DELETE FROM items where id = {index}") + + self._cur.execute(delete_sentence) + + +class PGVectorHNSW(PGVector): + def __init__(self, metric: str, index_param: dict): + super().__init__(metric, index_param) + self._nlinks = index_param.get("nlinks", 32) + self._efConstruction = index_param.get("efConstruction", 40) + + def get_vector_index(self): + """Get HNSW vector index""" + return "hnsw" + + def set_query_arguments(self, efSearch: int = 40): + """ + Set query arguments for pgvector query with hnsw index + """ + self.search_params = { + "metric_type": self._metric_type, + "efSearch": efSearch, + } + self.name = f"pgvector HNSW metric:{self._metric}, nlinks:{self._nlinks}, efConstruction:{self._efConstruction}, efSearch:{efSearch}" + +class PGVectorIVFFLAT(PGVector): + def __init__(self, metric: str, index_param: dict): + super().__init__(metric, index_param) + self._nlinks = index_param.get("nlinks", 32) + self._efConstruction = index_param.get("efConstruction", 40) + + def get_vector_index(self): + """Get IVFFLAT vector index""" + return "ivfflat" + + def set_query_arguments(self, efSearch: int = 40): + """ + Set query arguments for pgvector query with ivfflat index + """ + self.search_params = { + "metric_type": self._metric_type, + "efSearch": efSearch, + } + self.name = f"pgvector ivfflat metric:{self._metric}, nlinks:{self._nlinks}, efConstruction:{self._efConstruction}, efSearch:{efSearch}" \ No newline at end of file