From c82f2a3726b7d1c785918bb90c1fc1ab749912a8 Mon Sep 17 00:00:00 2001 From: Jose Luis Franco Arza Date: Wed, 27 Nov 2024 15:21:40 +0100 Subject: [PATCH] Add cluster/statistics endpoint handling. This commit adds a new cluster method get_cluster_statistics() which retrieves valuable statistics from the cluster. --- .pre-commit-config.yaml | 9 ++- integration_v3/test_cluster.py | 67 +++++++++++++++++++ test/cluster/test_cluster.py | 114 +++++++++++++++++++++++++++++++++ weaviate/cluster/cluster.py | 32 ++++++++- weaviate/cluster/types.py | 49 +++++++++++++- 5 files changed, 264 insertions(+), 7 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0d0583be6..9dd7cb0b8 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -13,12 +13,11 @@ repos: - id: trailing-whitespace - repo: https://github.com/myint/autoflake - rev: v1.4 + rev: v2.2.1 # autoflake v2.2.1 is the latest version that supports Python 3.12 hooks: - id: autoflake args: [--in-place, --remove-all-unused-imports, --exclude=weaviate/proto/*] - - repo: https://github.com/PyCQA/flake8 rev: 7.1.0 hooks: @@ -36,13 +35,13 @@ repos: ] files: '^weaviate/collections' -- repo: local - hooks: +- repo: local + hooks: - id: mypy name: mypy entry: ./run-mypy.sh language: python - language_version: "3.11" + language_version: "3.12" # use require_serial so that script # is only called once per commit require_serial: true diff --git a/integration_v3/test_cluster.py b/integration_v3/test_cluster.py index 3852b6130..09e718b9c 100644 --- a/integration_v3/test_cluster.py +++ b/integration_v3/test_cluster.py @@ -104,3 +104,70 @@ def test_get_nodes_status_with_data(client: weaviate.Client): assert shards[0]["class"] == class_name1 assert shards[0]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT assert resp[0]["stats"]["objectCount"] == 0 if server_is_at_least_124 else NUM_OBJECT + + +def test_get_cluster_statistics(client: weaviate.Client): + + if not client._connection._weaviate_version.is_lower_than(1, 25, 0): + pytest.skip("Cluster statistics are supported in versions higher than 1.25.0") + + """Test getting cluster statistics.""" + stats = client.cluster.get_cluster_statistics() + + # Check top level structure + assert "statistics" in stats + assert "synchronized" in stats + assert isinstance(stats["synchronized"], bool) + + # Check statistics array + assert isinstance(stats["statistics"], list) + assert len(stats["statistics"]) >= 1 # At least one node + + # Check first node's statistics + node = stats["statistics"][0] + # bootstrapped is optional + if "bootstrapped" in node: + assert isinstance(node["bootstrapped"], bool) + assert isinstance(node["candidates"], dict) + # Check candidates structure if not empty + if node["candidates"]: + for node_name, address in node["candidates"].items(): + assert isinstance(node_name, str) + assert isinstance(address, str) + assert ":" in address # Address should be in format IP:PORT + assert isinstance(node["dbLoaded"], bool) + assert isinstance(node["isVoter"], bool) + assert isinstance(node["leaderAddress"], str) + assert isinstance(node["leaderId"], str) + assert isinstance(node["name"], str) + assert isinstance(node["open"], bool) # API returns 'open', not 'open_' + assert isinstance(node["ready"], bool) + assert isinstance(node["status"], str) + + # Check Raft statistics + raft = node["raft"] + assert isinstance(raft["appliedIndex"], str) + assert isinstance(raft["commitIndex"], str) + assert isinstance(raft["fsmPending"], str) + assert isinstance(raft["lastContact"], str) + assert isinstance(raft["lastLogIndex"], str) + assert isinstance(raft["lastLogTerm"], str) + assert isinstance(raft["lastSnapshotIndex"], str) + assert isinstance(raft["lastSnapshotTerm"], str) + assert isinstance(raft["latestConfiguration"], list) + assert isinstance(raft["latestConfigurationIndex"], str) + assert isinstance(raft["numPeers"], str) + assert isinstance(raft["protocolVersion"], str) + assert isinstance(raft["protocolVersionMax"], str) + assert isinstance(raft["protocolVersionMin"], str) + assert isinstance(raft["snapshotVersionMax"], str) + assert isinstance(raft["snapshotVersionMin"], str) + assert isinstance(raft["state"], str) + assert isinstance(raft["term"], str) + + # Check at least one peer in the configuration + assert len(raft["latestConfiguration"]) >= 1 + peer = raft["latestConfiguration"][0] + assert isinstance(peer["address"], str) + assert isinstance(peer["id"], str) # API returns 'id', not 'id_' + assert isinstance(peer["suffrage"], int) diff --git a/test/cluster/test_cluster.py b/test/cluster/test_cluster.py index cda56e9ad..3188628dd 100644 --- a/test/cluster/test_cluster.py +++ b/test/cluster/test_cluster.py @@ -51,3 +51,117 @@ def test_get_nodes_status(self): result = Cluster(mock_conn).get_nodes_status() self.assertListEqual(result, expected_resp.get("nodes")) mock_conn.get.assert_called_with(path="/nodes") + + def test_get_cluster_statistics(self): + # error messages + unexpected_err_msg = "Cluster statistics" + empty_response_err_msg = "Cluster statistics response returned empty" + connection_err_msg = "Get cluster statistics failed due to connection error" + + # expected failure + mock_conn = mock_connection_func("get", status_code=500) + with self.assertRaises(UnexpectedStatusCodeException) as error: + Cluster(mock_conn).get_cluster_statistics() + check_startswith_error_message(self, error, unexpected_err_msg) + + mock_conn = mock_connection_func("get", status_code=200, return_json=None) + with self.assertRaises(EmptyResponseException) as error: + Cluster(mock_conn).get_cluster_statistics() + check_error_message(self, error, empty_response_err_msg) + + mock_conn = mock_connection_func("get", side_effect=RequestsConnectionError) + with self.assertRaises(RequestsConnectionError) as error: + Cluster(mock_conn).get_cluster_statistics() + check_error_message(self, error, connection_err_msg) + + # expected success + expected_resp = { + "statistics": [ + { + "candidates": { + "weaviate-0": "10.244.2.3:8300", + "weaviate-1": "10.244.1.3:8300", + }, + "dbLoaded": True, + "isVoter": True, + "leaderAddress": "10.244.3.3:8300", + "leaderId": "weaviate-2", + "name": "weaviate-0", + "open_": True, + "raft": { + "appliedIndex": "3", + "commitIndex": "3", + "fsmPending": "0", + "lastContact": "29.130625ms", + "lastLogIndex": "3", + "lastLogTerm": "2", + "lastSnapshotIndex": "0", + "lastSnapshotTerm": "0", + "latestConfiguration": [ + {"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0}, + {"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0}, + {"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0}, + ], + "latestConfigurationIndex": "0", + "numPeers": "2", + "protocolVersion": "3", + "protocolVersionMax": "3", + "protocolVersionMin": "0", + "snapshotVersionMax": "1", + "snapshotVersionMin": "0", + "state": "Follower", + "term": "2", + }, + "ready": True, + "status": "HEALTHY", + }, + { + "bootstrapped": True, + "candidates": {}, + "dbLoaded": True, + "isVoter": True, + "leaderAddress": "10.244.3.3:8300", + "leaderId": "weaviate-2", + "name": "weaviate-1", + "open_": True, + "raft": { + "appliedIndex": "3", + "commitIndex": "3", + "fsmPending": "0", + "lastContact": "41.289833ms", + "lastLogIndex": "3", + "lastLogTerm": "2", + "lastSnapshotIndex": "0", + "lastSnapshotTerm": "0", + "latestConfiguration": [ + {"address": "10.244.1.3:8300", "id_": "weaviate-1", "suffrage": 0}, + {"address": "10.244.3.3:8300", "id_": "weaviate-2", "suffrage": 0}, + {"address": "10.244.2.3:8300", "id_": "weaviate-0", "suffrage": 0}, + ], + "latestConfigurationIndex": "0", + "numPeers": "2", + "protocolVersion": "3", + "protocolVersionMax": "3", + "protocolVersionMin": "0", + "snapshotVersionMax": "1", + "snapshotVersionMin": "0", + "state": "Follower", + "term": "2", + }, + "ready": True, + "status": "HEALTHY", + }, + ], + "synchronized": True, + } + mock_conn = mock_connection_func("get", status_code=200, return_json=expected_resp) + result = Cluster(mock_conn).get_cluster_statistics() + + # Convert the response to match our type definitions with renamed fields + for node in result["statistics"]: + node["open_"] = node.pop("open_") + for peer in node["raft"]["latestConfiguration"]: + peer["id_"] = peer.pop("id_") + + self.assertEqual(result, expected_resp) + mock_conn.get.assert_called_with(path="/cluster/statistics") diff --git a/weaviate/cluster/cluster.py b/weaviate/cluster/cluster.py index 28bbf1139..76abb2d83 100644 --- a/weaviate/cluster/cluster.py +++ b/weaviate/cluster/cluster.py @@ -6,7 +6,7 @@ from requests.exceptions import ConnectionError as RequestsConnectionError -from weaviate.cluster.types import Node +from weaviate.cluster.types import Node, ClusterStats from weaviate.connect import Connection from weaviate.exceptions import ( EmptyResponseException, @@ -79,3 +79,33 @@ def get_nodes_status( if nodes is None or nodes == []: raise EmptyResponseException("Nodes status response returned empty") return cast(List[Node], nodes) + + def get_cluster_statistics(self) -> ClusterStats: + """ + Get the cluster statistics including Raft consensus information. + + Returns + ------- + ClusterStats + Statistics about the cluster including Raft consensus information. + + Raises + ------ + requests.ConnectionError + If the network connection to weaviate fails. + weaviate.UnexpectedStatusCodeException + If weaviate reports a none OK status. + weaviate.EmptyResponseException + If the response is empty. + """ + try: + response = self._connection.get(path="/cluster/statistics") + except RequestsConnectionError as conn_err: + raise RequestsConnectionError( + "Get cluster statistics failed due to connection error" + ) from conn_err + + response_typed = _decode_json_response_dict(response, "Cluster statistics") + if response_typed is None: + raise EmptyResponseException("Cluster statistics response returned empty") + return cast(ClusterStats, response_typed) diff --git a/weaviate/cluster/types.py b/weaviate/cluster/types.py index 10c4e21d6..529fdf4a8 100644 --- a/weaviate/cluster/types.py +++ b/weaviate/cluster/types.py @@ -1,4 +1,4 @@ -from typing import List, Literal, Optional, TypedDict +from typing import List, Literal, Optional, TypedDict, Dict class BatchStats(TypedDict): @@ -34,3 +34,50 @@ class Node(TypedDict): stats: Stats status: str version: str + + +class RaftPeer(TypedDict): + address: str + id_: str + suffrage: int + + +class RaftStats(TypedDict): + appliedIndex: str + commitIndex: str + fsmPending: str + lastContact: str + lastLogIndex: str + lastLogTerm: str + lastSnapshotIndex: str + lastSnapshotTerm: str + latestConfiguration: List[RaftPeer] + latestConfigurationIndex: str + numPeers: str + protocolVersion: str + protocolVersionMax: str + protocolVersionMin: str + snapshotVersionMax: str + snapshotVersionMin: str + state: str + term: str + + +# total=False is used to make handle some of the optional fields +class ClusterNodeStats(TypedDict, total=False): + bootstrapped: bool + candidates: Dict[str, str] + dbLoaded: bool + isVoter: bool + leaderAddress: str + leaderId: str + name: str + open_: bool + raft: RaftStats + ready: bool + status: str + + +class ClusterStats(TypedDict): + statistics: List[ClusterNodeStats] + synchronized: bool