From 1d91095fda000c0545f5c1ce28b3bf3265453284 Mon Sep 17 00:00:00 2001 From: taoying <2474671424@qq.com> Date: Fri, 28 Mar 2025 14:41:35 +0800 Subject: [PATCH 1/2] adapt openGauss --- .../algorithms/openGauss/Dockerfile | 39 ++ .../algorithms/openGauss/config.yml | 42 ++ ann_benchmarks/algorithms/openGauss/module.py | 435 ++++++++++++++++++ ann_benchmarks/results.py | 6 +- ann_benchmarks/runner.py | 1 + go_opgs.sh | 12 + requirements.txt | 3 + 7 files changed, 536 insertions(+), 2 deletions(-) create mode 100644 ann_benchmarks/algorithms/openGauss/Dockerfile create mode 100644 ann_benchmarks/algorithms/openGauss/config.yml create mode 100644 ann_benchmarks/algorithms/openGauss/module.py create mode 100644 go_opgs.sh diff --git a/ann_benchmarks/algorithms/openGauss/Dockerfile b/ann_benchmarks/algorithms/openGauss/Dockerfile new file mode 100644 index 000000000..32cadfe4d --- /dev/null +++ b/ann_benchmarks/algorithms/openGauss/Dockerfile @@ -0,0 +1,39 @@ +FROM ann-benchmarks + +# https://github.com/pgvector/pgvector/blob/master/Dockerfile + +RUN git clone https://github.com/pgvector/pgvector /tmp/pgvector + +RUN DEBIAN_FRONTEND=noninteractive apt-get -y install tzdata +RUN apt-get update && apt-get install -y --no-install-recommends build-essential postgresql-common +RUN /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh -y +RUN apt-get install -y --no-install-recommends postgresql-16 postgresql-server-dev-16 +RUN sh -c 'echo "local all all trust" > /etc/postgresql/16/main/pg_hba.conf' + +# Dynamically set OPTFLAGS based on the architecture +RUN ARCH=$(uname -m) && \ + if [ "$ARCH" = "aarch64" ]; then \ + OPTFLAGS="-march=native -msve-vector-bits=512"; \ + elif [ "$ARCH" = "x86_64" ]; then \ + OPTFLAGS="-march=native -mprefer-vector-width=512"; \ + else \ + OPTFLAGS="-march=native"; \ + fi && \ + cd /tmp/pgvector && \ + make clean && \ + make OPTFLAGS="$OPTFLAGS" && \ + make install + +USER postgres +RUN service postgresql start && \ + psql -c "CREATE USER ann WITH ENCRYPTED PASSWORD 'ann'" && \ + psql -c "CREATE DATABASE ann" && \ + psql -c "GRANT ALL PRIVILEGES ON DATABASE ann TO ann" && \ + psql -d ann -c "GRANT ALL ON SCHEMA public TO ann" && \ + psql -d ann -c "CREATE EXTENSION vector" && \ + psql -c "ALTER USER ann SET maintenance_work_mem = '4GB'" && \ + psql -c "ALTER USER ann SET max_parallel_maintenance_workers = 0" && \ + psql -c "ALTER SYSTEM SET shared_buffers = '4GB'" +USER root + +RUN pip install psycopg[binary] pgvector \ No newline at end of file diff --git a/ann_benchmarks/algorithms/openGauss/config.yml b/ann_benchmarks/algorithms/openGauss/config.yml new file mode 100644 index 000000000..e7edacb8a --- /dev/null +++ b/ann_benchmarks/algorithms/openGauss/config.yml @@ -0,0 +1,42 @@ +float: + any: + - base_args: ['@metric'] + constructor: openGaussHNSWPQ + disabled: false + docker_tag: ann-benchmarks-openGauss + module: ann_benchmarks.algorithms.openGauss + name: openGauss-hnswpq + run_groups: + M-16: + arg_groups: [{M: 16, efConstruction: 200, hnswEarlystopThreshold: 2147483647, pqM: 96, concurrents: 80}] + args: {} + query_args: [[10, 20, 40, 80, 120, 200, 400, 800]] + M-24: + arg_groups: [{M: 24, efConstruction: 200, hnswEarlystopThreshold: 2147483647, pqM: 96, concurrents: 80}] + args: {} + query_args: [[10, 20, 40, 80, 120, 200, 400, 800]] + - base_args: ['@metric'] + constructor: openGaussHNSW + disabled: false + docker_tag: ann-benchmarks-openGauss + module: ann_benchmarks.algorithms.openGauss + name: openGauss-hnsw + run_groups: + M-16: + arg_groups: [{M: 16, efConstruction: 200, concurrents: 80}] + args: {} + query_args: [[10, 20, 40, 80, 120, 200, 400, 800]] + M-24: + arg_groups: [{M: 24, efConstruction: 200, concurrents: 80}] + args: {} + query_args: [[10, 20, 40, 80, 120, 200, 400, 800]] + - base_args: ['@metric'] + constructor: openGaussIVF + disabled: false + docker_tag: ann-benchmarks-openGauss + module: ann_benchmarks.algorithms.openGauss + name: openGauss-ivfflat + run_groups: + pgvector: + args: [[100, 200, 400, 1000, 2000, 4000]] + query_args: [[1, 2, 4, 10, 20, 40, 100]] \ No newline at end of file diff --git a/ann_benchmarks/algorithms/openGauss/module.py b/ann_benchmarks/algorithms/openGauss/module.py new file mode 100644 index 000000000..7fa1a4f4e --- /dev/null +++ b/ann_benchmarks/algorithms/openGauss/module.py @@ -0,0 +1,435 @@ +""" +This module supports connecting to a openGauss instance and performing vector +indexing and search using the pgvector extension. The default behavior uses +the "ann" value of openGauss user name, password, and database name, as well +as the default host and port values of the psycopg driver. + +If openGaussis managed externally, e.g. in a cloud DBaaS environment, the +environment variable overrides listed below are available for setting openGauss +connection parameters: + +ANN_BENCHMARKS_OG_USER +ANN_BENCHMARKS_OG_PASSWORD +ANN_BENCHMARKS_OG_DBNAME +ANN_BENCHMARKS_OG_HOST +ANN_BENCHMARKS_OG_PORT + +This module starts the openGauss service automatically using the "service" +command. The environment variable ANN_BENCHMARKS_OG_START_SERVICE could be set +to "false" (or e.g. "0" or "no") in order to disable this behavior. + +This module will also attempt to create the pgvector extension inside the +target database, if it has not been already created. +""" +import subprocess +import sys +import os + +import pgvector.psycopg +import psycopg + +from typing import Dict, Any, Optional + +from ..base.module import BaseANN +from ...util import get_bool_env_var + +import multiprocessing +from multiprocessing import Pool +import numpy + +def get_pg_param_env_var_name(pg_param_name: str) -> str: + return f'ANN_BENCHMARKS_OG_{pg_param_name.upper()}' + + +def get_pg_conn_param( + pg_param_name: str, + default_value: Optional[str] = None) -> Optional[str]: + env_var_name = get_pg_param_env_var_name(pg_param_name) + env_var_value = os.getenv(env_var_name, default_value) + if env_var_value is None or len(env_var_value.strip()) == 0: + return default_value + return env_var_value + + +class openGaussHNSW(BaseANN): + def __init__(self, metric, method_param): + self._metric = metric + self._m = method_param['M'] + self._ef_construction = method_param['efConstruction'] + self._cur = None + self.curs = [] + self.connections = [] + self.init_conn_flags = False + self.search_pool = None + self.con = method_param['concurrents'] + + # default value + self.user = 'ann' + self.port = 5432 + self.dbname = 'ann' + self.host = 'localhost' + self.password = 'ann' + + if metric == "angular": + self._query = "SELECT id FROM items ORDER BY embedding <=> %s LIMIT %s" + elif metric == "euclidean": + self._query = "SELECT id FROM items ORDER BY embedding <-> %s LIMIT %s" + else: + raise RuntimeError(f"unknown metric {metric}") + + def fit(self, X): + psycopg_connect_kwargs: Dict[str, Any] = dict( + autocommit=True, + ) + for arg_name in ['user', 'password', 'dbname']: + # The default value is "ann" for all of these parameters. + psycopg_connect_kwargs[arg_name] = get_pg_conn_param( + arg_name, 'ann') + if arg_name == 'user': + self.user = psycopg_connect_kwargs['user'] + if arg_name == 'password': + self.password = psycopg_connect_kwargs['password'] + if arg_name == 'dbname': + self.dbname = psycopg_connect_kwargs['dbname'] + + # If host/port are not specified, leave the default choice to the + # psycopg driver. + og_host: Optional[str] = get_pg_conn_param('host') + if og_host is not None: + psycopg_connect_kwargs['host'] = og_host + self.host = og_host + + og_port_str: Optional[str] = get_pg_conn_param('port') + if og_port_str is not None: + psycopg_connect_kwargs['port'] = int(og_port_str) + self.port = int(og_port_str) + + conn = psycopg.connect(**psycopg_connect_kwargs) + + pgvector.psycopg.register_vector(conn) + cur = conn.cursor() + cur.execute("DROP TABLE IF EXISTS items") + cur.execute("CREATE TABLE items (id int, embedding vector(%d))" % X.shape[1]) + cur.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN") + cur.execute("ALTER TABLE items SET (parallel_workers = 32);") + print("copying data...") + with cur.copy("COPY items (id, embedding) FROM STDIN WITH (FORMAT BINARY)") as copy: + copy.set_types(["int4", "vector"]) + for i, embedding in enumerate(X): + copy.write_row((i, embedding)) + print("creating index...") + if self._metric == "angular": + cur.execute( + "CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = %d, ef_construction = %d)" % (self._m, self._ef_construction) + ) + elif self._metric == "euclidean": + cur.execute("CREATE INDEX ON items USING hnsw (embedding vector_l2_ops) WITH (m = %d, ef_construction = %d)" % (self._m, self._ef_construction)) + else: + raise RuntimeError(f"unknown metric {self._metric}") + print("done!") + self._cur = cur + + + def set_query_arguments(self, ef_search): + self._ef_search = ef_search + self._cur.execute("SET hnsw.ef_search = %d" % ef_search) + self._cur.execute("set enable_seqscan = off") + conc = self.con + self.search_pool = Pool(conc, initializer=self.init_connection) + + def query(self, v, n): + self._cur.execute(self._query, (v, n), binary=True, prepare=True) + return [id for id, in self._cur.fetchall()] + + def get_memory_usage(self): + if self._cur is None: + return 0 + self._cur.execute("SELECT pg_relation_size('items_embedding_idx')") + return self._cur.fetchone()[0] / 1024 + + # batch search + def init_connection(self): + conn = psycopg.connect(user=self.user, password=self.password, dbname=self.dbname, host=self.host, autocommit=True, port=self.port) + pgvector.psycopg.register_vector(conn) + global cur + cur = conn.cursor() + cur.execute("SET hnsw_ef_search = %d" % self._ef_search) + cur.execute("set enable_seqscan = off") + global base_query + base_query = self._query + + def close_connections(self): + for cur in self.curs: + if cur is not None: + cur.close() + cur = None + for conn in self.connections: + if conn is not None: + conn.close() + conn = None + self.curs = [] + self.connections = [] + self.init_conn_flags = False + + @staticmethod + def sub_query(chunk, n): + ids = [] + for item in chunk: + cur.execute(base_query, (item, n), binary=True, prepare=True) + res = cur.fetchall() + ids.append([i[0] for i in res]) + return ids + + def batch_query(self, X: numpy.array, n: int): + conc = self.con + chunk_size = len(X) // conc + 1 + chunks = [X[i:i + chunk_size] for i in range(0, len(X), chunk_size)] + res = self.search_pool.starmap(self.sub_query, [(chunk, n) for chunk in chunks]) + self.res = [] + for item in res: + for ids in item: + self.res.append(ids) + + def __str__(self): + return f"openGaussHNSW(m={self._m}, ef_construction={self._ef_construction}, ef_search={self._ef_search})" + + def __del__(self): + self.close_connections() + +class openGaussHNSWPQ(BaseANN): + def __init__(self, metric, method_param): + self._metric = metric + self._m = method_param['M'] + self._ef_construction = method_param['efConstruction'] + self._cur = None + self.curs = [] + self.connections = [] + self.init_conn_flags = False + self.search_pool = None + self.con = method_param['concurrents'] + self.hnsw_earlystop_threshold = method_param['hnswEarlystopThreshold'] + self.pq_m = method_param['pqM'] + + # default value + self.user = 'ann' + self.port = 5432 + self.dbname = 'ann' + self.host = 'localhost' + self.password = 'ann' + + if metric == "angular": + self._query = "SELECT id FROM items ORDER BY embedding <=> %s LIMIT %s" + elif metric == "euclidean": + self._query = "SELECT id FROM items ORDER BY embedding <-> %s LIMIT %s" + else: + raise RuntimeError(f"unknown metric {metric}") + + def fit(self, X): + psycopg_connect_kwargs: Dict[str, Any] = dict( + autocommit=True, + ) + for arg_name in ['user', 'password', 'dbname']: + # The default value is "ann" for all of these parameters. + psycopg_connect_kwargs[arg_name] = get_pg_conn_param( + arg_name, 'ann') + if arg_name == 'user': + self.user = psycopg_connect_kwargs['user'] + if arg_name == 'password': + self.password = psycopg_connect_kwargs['password'] + if arg_name == 'dbname': + self.dbname = psycopg_connect_kwargs['dbname'] + + # If host/port are not specified, leave the default choice to the + # psycopg driver. + og_host: Optional[str] = get_pg_conn_param('host') + if og_host is not None: + psycopg_connect_kwargs['host'] = og_host + self.host = og_host + + og_port_str: Optional[str] = get_pg_conn_param('port') + if og_port_str is not None: + psycopg_connect_kwargs['port'] = int(og_port_str) + self.port = int(og_port_str) + + conn = psycopg.connect(**psycopg_connect_kwargs) + + pgvector.psycopg.register_vector(conn) + cur = conn.cursor() + cur.execute("DROP TABLE IF EXISTS items") + cur.execute("CREATE TABLE items (id int, embedding vector(%d))" % X.shape[1]) + cur.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN") + cur.execute("ALTER TABLE items SET (parallel_workers = 32);") + cur.execute("set hnsw_earlystop_threshold = %d" % self.hnsw_earlystop_threshold) + print("copying data...") + with cur.copy("COPY items (id, embedding) FROM STDIN WITH (FORMAT BINARY)") as copy: + copy.set_types(["int4", "vector"]) + for i, embedding in enumerate(X): + copy.write_row((i, embedding)) + print(f"creating index [hnsw_earlystop_threshold={self.hnsw_earlystop_threshold}]...") + if self._metric == "angular": + cur.execute( + "CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = %d, ef_construction = %d, enable_pq = on, pq_m = %d)" % (self._m, self._ef_construction, self.pq_m)) + elif self._metric == "euclidean": + cur.execute("CREATE INDEX ON items USING hnsw (embedding vector_l2_ops) WITH (m = %d, ef_construction = %d, enable_pq = on, pq_m = %d)" % (self._m, self._ef_construction, self.pq_m)) + else: + raise RuntimeError(f"unknown metric {self._metric}") + print("done!") + self._cur = cur + + def set_query_arguments(self, ef_search): + self._ef_search = ef_search + self._cur.execute("SET hnsw_ef_search = %d" % ef_search) + self._cur.execute("set enable_seqscan = off") + self._cur.execute("set hnsw_earlystop_threshold = %d" % self.hnsw_earlystop_threshold) + conc = self.con + self.search_pool = Pool(conc, initializer=self.init_connection) + + def query(self, v, n): + self._cur.execute(self._query, (v, n), binary=True, prepare=True) + return [id for id, in self._cur.fetchall()] + + def get_memory_usage(self): + if self._cur is None: + return 0 + self._cur.execute("SELECT pg_relation_size('items_embedding_idx')") + return self._cur.fetchone()[0] / 1024 + + def init_connection(self): + conn = psycopg.connect(user=self.user, password=self.password, dbname=self.dbname, host=self.host, autocommit=True, port=self.port) + pgvector.psycopg.register_vector(conn) + global cur + cur = conn.cursor() + cur.execute("SET hnsw_ef_search = %d" % self._ef_search) + cur.execute("set enable_seqscan = off") + cur.execute("set hnsw_earlystop_threshold = %d" % self.hnsw_earlystop_threshold) + global base_query + base_query = self._query + + def close_connections(self): + for cur in self.curs: + if cur is not None: + cur.close() + cur = None + for conn in self.connections: + if conn is not None: + conn.close() + conn = None + self.curs = [] + self.connections = [] + self.init_conn_flags = False + + @staticmethod + def sub_query(chunk, n): + ids = [] + for item in chunk: + cur.execute(base_query, (item, n), binary=True, prepare=True) + res = cur.fetchall() + ids.append([i[0] for i in res]) + return ids + + def batch_query(self, X: numpy.array, n: int): + conc = self.con + chunk_size = len(X) // conc + 1 + chunks = [X[i:i + chunk_size] for i in range(0, len(X), chunk_size)] + res = self.search_pool.starmap(self.sub_query, [(chunk, n) for chunk in chunks]) + print(f"openGauss concuiiency_query process num: {conc}") + self.res = [] + for item in res: + for ids in item: + self.res.append(ids) + + def __str__(self): + return f"openGaussHNSWPQ(m={self._m}, ef_construction={self._ef_construction}, ef_search={self._ef_search}, pq_m={self.pq_m}, hnsw_earlystop_threshold={self.hnsw_earlystop_threshold})" + + def __del__(self): + self.close_connections() + + +class openGaussIVF(BaseANN): + def __init__(self, metric, lists): + self._metric = metric + self._lists = lists + self._cur = None + + # default value + self.user = 'ann' + self.port = 5432 + self.dbname = 'ann' + self.host = 'localhost' + self.password = 'ann' + + if metric == "angular": + self._query = "SELECT id FROM items ORDER BY embedding <=> %s LIMIT %s" + elif metric == "euclidean": + self._query = "SELECT id FROM items ORDER BY embedding <-> %s LIMIT %s" + else: + raise RuntimeError(f"unknown metric {metric}") + + def fit(self, X): + psycopg_connect_kwargs: Dict[str, Any] = dict( + autocommit=True, + ) + for arg_name in ['user', 'password', 'dbname']: + # The default value is "ann" for all of these parameters. + psycopg_connect_kwargs[arg_name] = get_pg_conn_param( + arg_name, 'ann') + if arg_name == 'user': + self.user = psycopg_connect_kwargs['user'] + if arg_name == 'password': + self.password = psycopg_connect_kwargs['password'] + if arg_name == 'dbname': + self.dbname = psycopg_connect_kwargs['dbname'] + + # If host/port are not specified, leave the default choice to the + # psycopg driver. + og_host: Optional[str] = get_pg_conn_param('host') + if og_host is not None: + psycopg_connect_kwargs['host'] = og_host + self.host = og_host + + og_port_str: Optional[str] = get_pg_conn_param('port') + if og_port_str is not None: + psycopg_connect_kwargs['port'] = int(og_port_str) + self.port = int(og_port_str) + + conn = psycopg.connect(**psycopg_connect_kwargs) + + pgvector.psycopg.register_vector(conn) + + cur = conn.cursor() + cur.execute("DROP TABLE IF EXISTS items") + cur.execute("CREATE TABLE items (id int, embedding vector(%d))" % X.shape[1]) + cur.execute("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN") + cur.execute("ALTER TABLE items SET (parallel_workers = 32);") + print("copying data...") + with cur.copy("COPY items (id, embedding) FROM STDIN") as copy: + for i, embedding in enumerate(X): + copy.write_row((i, embedding)) + print(f"creating index ...") + if self._metric == "angular": + cur.execute( + "CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = %d)" % (self._lists)) + elif self._metric == "euclidean": + cur.execute("CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists= %d)" % (self._lists)) + else: + raise RuntimeError(f"unknown metric {self._metric}") + print("done!") + self._cur = cur + + def set_query_arguments(self, probes): + self._probes = probes + self._cur.execute("SET ivfflat_probes = %d" % probes) + self._cur.execute("set enable_seqscan = off") + + def query(self, v, n): + self._cur.execute(self._query, (v, n), binary=True, prepare=True) + return [id for id, in self._cur.fetchall()] + + def get_memory_usage(self): + if self._cur is None: + return 0 + self._cur.execute("SELECT pg_relation_size('items_embedding_idx')") + return self._cur.fetchone()[0] / 1024 + + def __str__(self): + return f"openGaussIVF(lists={self._lists}, probes={self._probes})" \ No newline at end of file diff --git a/ann_benchmarks/results.py b/ann_benchmarks/results.py index e7e5b2e7f..73dcf81cd 100644 --- a/ann_benchmarks/results.py +++ b/ann_benchmarks/results.py @@ -91,8 +91,10 @@ def load_all_results(dataset: Optional[str] = None, try: with h5py.File(os.path.join(root, filename), "r+") as f: properties = dict(f.attrs) - if batch_mode != properties["batch_mode"]: - continue + #if batch_mode != properties["batch_mode"]: + #continue + last_folder = os.path.basename(os.path.normpath(root)) + properties["algo"] = last_folder yield properties, f except Exception: print(f"Was unable to read {filename}") diff --git a/ann_benchmarks/runner.py b/ann_benchmarks/runner.py index 81428114c..033f4b4ac 100644 --- a/ann_benchmarks/runner.py +++ b/ann_benchmarks/runner.py @@ -131,6 +131,7 @@ def batch_query(X: numpy.array) -> List[Tuple[float, List[Tuple[int, float]]]]: avg_candidates = total_candidates / len(X_test) best_search_time = min(best_search_time, search_time) + algo.search_pool.close() verbose = hasattr(algo, "query_verbose") attrs = { "batch_mode": batch, diff --git a/go_opgs.sh b/go_opgs.sh new file mode 100644 index 000000000..99800a01a --- /dev/null +++ b/go_opgs.sh @@ -0,0 +1,12 @@ +export ANN_BENCHMARKS_OG_USER='YourUserName' +export ANN_BENCHMARKS_OG_PASSWORD='YourPassword' +export ANN_BENCHMARKS_OG_DBNAME='YourDBName' +export ANN_BENCHMARKS_OG_HOST='YourHost' +export ANN_BENCHMARKS_OG_PORT=YourPort + + +python3 run.py --algorithm openGauss-hnsw --dataset fashion-mnist-784-euclidean --local --runs 2 -k 10 --batch +python3 run.py --algorithm openGauss-hnsw --dataset gist-960-euclidean --local --runs 2 -k 10 --batch +python3 run.py --algorithm openGauss-hnsw --dataset glove-100-angular --local --runs 2 -k 10 --batch +python3 run.py --algorithm openGauss-hnsw --dataset sift-128-euclidean --local --runs 2 -k 10 --batch +python3 run.py --algorithm openGauss-hnsw --dataset deep-image-96-angular --local --runs 2 -k 10 --batch \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 4682d5b6d..01c6aa5b4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,3 +10,6 @@ jinja2==3.1.2 pytest==7.2.2 datasets==2.12.0 requests==2.31.0 +psycopg +psycopg-binary +pgvector From fda339e9a9c3e7379cb9d68d906e62f73fda9c11 Mon Sep 17 00:00:00 2001 From: taoying <2474671424@qq.com> Date: Tue, 13 May 2025 17:19:51 +0800 Subject: [PATCH 2/2] add batch query for ivfflat --- .github/workflows/benchmarks.yml | 1 + .../algorithms/openGauss/Dockerfile | 33 ++++---- .../algorithms/openGauss/config.yml | 2 +- ann_benchmarks/algorithms/openGauss/module.py | 76 ++++++++++++++++++- requirements.txt | 3 +- 5 files changed, 90 insertions(+), 25 deletions(-) diff --git a/.github/workflows/benchmarks.yml b/.github/workflows/benchmarks.yml index 22203be52..a73240ae1 100644 --- a/.github/workflows/benchmarks.yml +++ b/.github/workflows/benchmarks.yml @@ -72,6 +72,7 @@ jobs: - voyager - vsag - weaviate + - openGauss include: - library: pynndescent dataset: random-xs-16-hamming diff --git a/ann_benchmarks/algorithms/openGauss/Dockerfile b/ann_benchmarks/algorithms/openGauss/Dockerfile index 32cadfe4d..53578d4fb 100644 --- a/ann_benchmarks/algorithms/openGauss/Dockerfile +++ b/ann_benchmarks/algorithms/openGauss/Dockerfile @@ -1,14 +1,12 @@ FROM ann-benchmarks -# https://github.com/pgvector/pgvector/blob/master/Dockerfile - -RUN git clone https://github.com/pgvector/pgvector /tmp/pgvector +RUN add-apt-repository ppa:opengauss/opengauss RUN DEBIAN_FRONTEND=noninteractive apt-get -y install tzdata -RUN apt-get update && apt-get install -y --no-install-recommends build-essential postgresql-common -RUN /usr/share/postgresql-common/pgdg/apt.postgresql.org.sh -y -RUN apt-get install -y --no-install-recommends postgresql-16 postgresql-server-dev-16 -RUN sh -c 'echo "local all all trust" > /etc/postgresql/16/main/pg_hba.conf' +RUN sed -i 's/bookworm/jammy/g' /etc/apt/sources.list.d/opengauss-ubuntu-opengauss-bookworm.list +RUN apt-get update && apt-get install -y --no-install-recommends build-essential +RUN apt install opengauss +RUN sh -c 'echo "local all all trust" > /var/lib/opengauss/data/pg_hba.conf' # Dynamically set OPTFLAGS based on the architecture RUN ARCH=$(uname -m) && \ @@ -24,16 +22,15 @@ RUN ARCH=$(uname -m) && \ make OPTFLAGS="$OPTFLAGS" && \ make install -USER postgres -RUN service postgresql start && \ - psql -c "CREATE USER ann WITH ENCRYPTED PASSWORD 'ann'" && \ - psql -c "CREATE DATABASE ann" && \ - psql -c "GRANT ALL PRIVILEGES ON DATABASE ann TO ann" && \ - psql -d ann -c "GRANT ALL ON SCHEMA public TO ann" && \ - psql -d ann -c "CREATE EXTENSION vector" && \ - psql -c "ALTER USER ann SET maintenance_work_mem = '4GB'" && \ - psql -c "ALTER USER ann SET max_parallel_maintenance_workers = 0" && \ - psql -c "ALTER SYSTEM SET shared_buffers = '4GB'" +USER opengauss +RUN service opengauss start && \ + gsql -c "CREATE USER ann WITH ENCRYPTED PASSWORD 'ann'" && \ + gsql -c "CREATE DATABASE ann" && \ + gsql -c "GRANT ALL PRIVILEGES ON DATABASE ann TO ann" && \ + gsql -d ann -c "GRANT ALL ON SCHEMA public TO ann" && \ + gsql -c "ALTER USER ann SET maintenance_work_mem = '4GB'" && \ + gsql -c "ALTER USER ann SET max_parallel_maintenance_workers = 0" && \ + gsql -c "ALTER SYSTEM SET shared_buffers = '4GB'" USER root -RUN pip install psycopg[binary] pgvector \ No newline at end of file +RUN pip install psycopg[binary] \ No newline at end of file diff --git a/ann_benchmarks/algorithms/openGauss/config.yml b/ann_benchmarks/algorithms/openGauss/config.yml index e7edacb8a..039a32186 100644 --- a/ann_benchmarks/algorithms/openGauss/config.yml +++ b/ann_benchmarks/algorithms/openGauss/config.yml @@ -38,5 +38,5 @@ float: name: openGauss-ivfflat run_groups: pgvector: - args: [[100, 200, 400, 1000, 2000, 4000]] + args: [[[enable_npu: "off", concurrents: 80]], [100, 200, 400, 1000, 2000, 4000]] query_args: [[1, 2, 4, 10, 20, 40, 100]] \ No newline at end of file diff --git a/ann_benchmarks/algorithms/openGauss/module.py b/ann_benchmarks/algorithms/openGauss/module.py index 7fa1a4f4e..ffd580a0e 100644 --- a/ann_benchmarks/algorithms/openGauss/module.py +++ b/ann_benchmarks/algorithms/openGauss/module.py @@ -36,6 +36,7 @@ import multiprocessing from multiprocessing import Pool import numpy +import time def get_pg_param_env_var_name(pg_param_name: str) -> str: return f'ANN_BENCHMARKS_OG_{pg_param_name.upper()}' @@ -118,6 +119,7 @@ def fit(self, X): for i, embedding in enumerate(X): copy.write_row((i, embedding)) print("creating index...") + if self._metric == "angular": cur.execute( "CREATE INDEX ON items USING hnsw (embedding vector_cosine_ops) WITH (m = %d, ef_construction = %d)" % (self._m, self._ef_construction) @@ -132,7 +134,7 @@ def fit(self, X): def set_query_arguments(self, ef_search): self._ef_search = ef_search - self._cur.execute("SET hnsw.ef_search = %d" % ef_search) + self._cur.execute("SET hnsw_ef_search = %d" % ef_search) self._cur.execute("set enable_seqscan = off") conc = self.con self.search_pool = Pool(conc, initializer=self.init_connection) @@ -157,6 +159,8 @@ def init_connection(self): cur.execute("set enable_seqscan = off") global base_query base_query = self._query + self.connections.append(conn) + self.curs.append(cur) def close_connections(self): for cur in self.curs: @@ -304,6 +308,8 @@ def init_connection(self): cur.execute("set hnsw_earlystop_threshold = %d" % self.hnsw_earlystop_threshold) global base_query base_query = self._query + self.connections.append(conn) + self.curs.append(cur) def close_connections(self): for cur in self.curs: @@ -346,10 +352,17 @@ def __del__(self): class openGaussIVF(BaseANN): - def __init__(self, metric, lists): + def __init__(self, metric, params, lists): self._metric = metric self._lists = lists self._cur = None + self.curs = [] + self.connections = [] + self.init_conn_flags = False + self.search_pool = None + + self.enable_npu = params[0]["enable_npu"] + self.conc = params[1]["concurrents"] # default value self.user = 'ann' @@ -406,6 +419,7 @@ def fit(self, X): for i, embedding in enumerate(X): copy.write_row((i, embedding)) print(f"creating index ...") + start_time = time.perf_counter() if self._metric == "angular": cur.execute( "CREATE INDEX ON items USING ivfflat (embedding vector_cosine_ops) WITH (lists = %d)" % (self._lists)) @@ -413,13 +427,17 @@ def fit(self, X): cur.execute("CREATE INDEX ON items USING ivfflat (embedding vector_l2_ops) WITH (lists= %d)" % (self._lists)) else: raise RuntimeError(f"unknown metric {self._metric}") - print("done!") + end_time = time.perf_counter() - start_time + print(f"done! time = {end_time}.") self._cur = cur def set_query_arguments(self, probes): self._probes = probes self._cur.execute("SET ivfflat_probes = %d" % probes) self._cur.execute("set enable_seqscan = off") + self._cur.execute("set enable_npu = %s" % self.enable_npu) + conc = self.conc + self.search_pool = Pool(conc, initializer=self.init_connection) def query(self, v, n): self._cur.execute(self._query, (v, n), binary=True, prepare=True) @@ -430,6 +448,56 @@ def get_memory_usage(self): return 0 self._cur.execute("SELECT pg_relation_size('items_embedding_idx')") return self._cur.fetchone()[0] / 1024 + + def init_connection(self): + conn = psycopg.connect(user=self.user, password=self.password, dbname=self.dbname, host=self.host, autocommit=True, port=self.port) + pgvector.psycopg.register_vector(conn) + global cur + cur = conn.cursor() + cur.execute("SET ivfflat_probes = %d" % self._probes) + cur.execute("set enable_seqscan = off") + cur.execute("set enable_npu = %s" % self.enable_npu) + global base_query + base_query = self._query + self.connections.append(conn) + self.curs.append(cur) + + def close_connections(self): + for cur in self.curs: + if cur is not None: + cur.close() + cur = None + for conn in self.connections: + if conn is not None: + conn.close() + conn = None + self.curs = [] + self.connections = [] + self.init_conn_flags = False + + @staticmethod + def sub_query(chunk, n): + ids = [] + for item in chunk: + cur.execute(base_query, (item, n), binary=True, prepare=True) + res = cur.fetchall() + ids.append([i[0] for i in res]) + return ids + + def batch_query(self, X: numpy.array, n: int): + conc = self.conc + chunk_size = len(X) // conc + 1 + chunks = [X[i:i + chunk_size] for i in range(0, len(X), chunk_size)] + res = self.search_pool.starmap(self.sub_query, [(chunk, n) for chunk in chunks]) + print(f"openGauss concuiiency_query process num: {conc}") + self.res = [] + for item in res: + for ids in item: + self.res.append(ids) + def __str__(self): - return f"openGaussIVF(lists={self._lists}, probes={self._probes})" \ No newline at end of file + return f"openGaussIVF(lists={self._lists}, probes={self._probes})" + + def __del__(self): + self.close_connections() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 01c6aa5b4..00943c8e2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -10,6 +10,5 @@ jinja2==3.1.2 pytest==7.2.2 datasets==2.12.0 requests==2.31.0 -psycopg -psycopg-binary pgvector +psycopg[binary] \ No newline at end of file