diff --git a/CHANGELOG.md b/CHANGELOG.md index 55be303c..19b80e3c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Add method explain for explain query to QuerySession and QuerySessionPool classes + ## 3.21.8 ## * Fix: convert gRPC stream termination to YDB errors in async query client diff --git a/docker-compose-tls.yml b/docker-compose-tls.yml index f0a4b328..406d62f7 100644 --- a/docker-compose-tls.yml +++ b/docker-compose-tls.yml @@ -10,5 +10,4 @@ services: volumes: - ./ydb_certs:/ydb_certs environment: - - YDB_USE_IN_MEMORY_PDISKS=true - YDB_ENABLE_COLUMN_TABLES=true diff --git a/docker-compose.yml b/docker-compose.yml index 1a466fab..fd325bb0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -7,5 +7,4 @@ services: - 2136:2136 hostname: localhost environment: - - YDB_USE_IN_MEMORY_PDISKS=true - YDB_ENABLE_COLUMN_TABLES=true diff --git a/tests/aio/query/test_query_session.py b/tests/aio/query/test_query_session.py index 2602770a..cbc233d6 100644 --- a/tests/aio/query/test_query_session.py +++ b/tests/aio/query/test_query_session.py @@ -1,6 +1,9 @@ +import json + import pytest import ydb +from ydb import QueryExplainResultFormat from ydb.aio.query.session import QuerySession @@ -125,3 +128,37 @@ async def test_terminated_stream_raises_ydb_error(self, session: QuerySession): async with await session.execute("select 1") as results: async for _ in results: pass + + @pytest.mark.asyncio + async def test_explain(self, pool: ydb.aio.query.QuerySessionPool): + await pool.execute_with_retries("DROP TABLE IF EXISTS test_explain") + await pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))") + try: + plan_fullscan = "" + plan_lookup = "" + + async def callee(session: QuerySession): + nonlocal plan_fullscan, plan_lookup + + plan = await session.explain("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR) + isinstance(plan, str) + assert "FullScan" in plan + + plan_fullscan = await session.explain( + "SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT + ) + plan_lookup = await session.explain( + "SELECT * FROM test_explain WHERE id = $id", + {"$id": 1}, + result_format=QueryExplainResultFormat.DICT, + ) + + await pool.retry_operation_async(callee) + + plan_fulltext_string = json.dumps(plan_fullscan) + assert "FullScan" in plan_fulltext_string + + plan_lookup_string = json.dumps(plan_lookup) + assert "Lookup" in plan_lookup_string + finally: + await pool.execute_with_retries("DROP TABLE test_explain") diff --git a/tests/aio/query/test_query_session_pool.py b/tests/aio/query/test_query_session_pool.py index 2cd0d4b9..34dbb245 100644 --- a/tests/aio/query/test_query_session_pool.py +++ b/tests/aio/query/test_query_session_pool.py @@ -1,9 +1,13 @@ import asyncio +import json + import pytest import ydb from typing import Optional +from tests.conftest import wait_container_ready_async +from ydb import QueryExplainResultFormat from ydb.aio.query.pool import QuerySessionPool from ydb.aio.query.session import QuerySession, QuerySessionStateEnum from ydb.aio.query.transaction import QueryTxContext @@ -162,6 +166,7 @@ async def test_no_session_leak(self, driver, docker_project): docker_project.start() await pool.stop() + await wait_container_ready_async(driver) @pytest.mark.asyncio async def test_acquire_no_race_condition(self, driver): @@ -179,3 +184,30 @@ async def acquire_session(): assert len(ids) == 1 assert pool._current_size == 1 + + @pytest.mark.asyncio + async def test_explain_with_retries(self, pool: QuerySessionPool): + await pool.execute_with_retries("DROP TABLE IF EXISTS test_explain") + await pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))") + try: + plan = await pool.explain_with_retries( + "SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR + ) + isinstance(plan, str) + assert "FullScan" in plan + + plan = await pool.explain_with_retries( + "SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT + ) + plan_string = json.dumps(plan) + assert "FullScan" in plan_string + + plan = await pool.explain_with_retries( + "SELECT * FROM test_explain WHERE id = $id", + {"$id": 1}, + result_format=QueryExplainResultFormat.DICT, + ) + plan_string = json.dumps(plan) + assert "Lookup" in plan_string + finally: + await pool.execute_with_retries("DROP TABLE test_explain") diff --git a/tests/aio/test_connection_pool.py b/tests/aio/test_connection_pool.py index 12882f38..d9e8e7c6 100644 --- a/tests/aio/test_connection_pool.py +++ b/tests/aio/test_connection_pool.py @@ -1,6 +1,7 @@ import asyncio from unittest.mock import MagicMock +from tests.conftest import wait_container_ready_async from ydb.aio.driver import Driver import pytest import ydb @@ -108,6 +109,7 @@ async def restart_docker(): await asyncio.gather(*coros, return_exceptions=False) docker_project.start() + await wait_container_ready_async(driver) await driver.stop() @@ -132,6 +134,7 @@ async def test_disconnect_by_call(endpoint, database, docker_project): await asyncio.sleep(5) assert len(driver._store.connections) == 0 docker_project.start() + await wait_container_ready_async(driver) await driver.stop() diff --git a/tests/aio/test_session_pool.py b/tests/aio/test_session_pool.py index af1ef351..dff604a1 100644 --- a/tests/aio/test_session_pool.py +++ b/tests/aio/test_session_pool.py @@ -2,6 +2,7 @@ import pytest import ydb +from tests.conftest import wait_container_ready_async @pytest.mark.asyncio @@ -90,6 +91,7 @@ async def test_no_cluster_endpoints_no_failure(driver, docker_project): await pool.release(sess) assert pool._active_count == 0 await pool.stop() + await wait_container_ready_async(driver) @pytest.mark.asyncio @@ -144,3 +146,4 @@ async def test_no_session_leak(driver, docker_project): docker_project.start() await pool.stop() + await wait_container_ready_async(driver) diff --git a/tests/conftest.py b/tests/conftest.py index a8177f46..86d3b1de 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,3 +1,4 @@ +import asyncio import os import pytest @@ -30,6 +31,25 @@ def wait_container_ready(driver): raise RuntimeError("Container is not ready after timeout.") +async def wait_container_ready_async(driver): + await driver.wait(timeout=30) + + async with ydb.aio.SessionPool(driver, 1) as pool: + + started_at = time.time() + while time.time() - started_at < 30: + try: + async with pool.checkout() as session: + await session.execute_scheme("create table `.sys_health/test_table` (A int32, primary key(A));") + + return True + + except ydb.Error: + await asyncio.sleep(1) + + raise RuntimeError("Container is not ready after timeout.") + + @pytest.fixture(scope="module") def endpoint(pytestconfig, module_scoped_container_getter): with ydb.Driver(endpoint="localhost:2136", database="/local") as driver: diff --git a/tests/query/test_query_session.py b/tests/query/test_query_session.py index af861668..f84c2061 100644 --- a/tests/query/test_query_session.py +++ b/tests/query/test_query_session.py @@ -1,10 +1,13 @@ +import json + import pytest import threading import time from concurrent.futures import _base as b from unittest import mock -from ydb.query.base import QueryStatsMode +from ydb import QuerySessionPool +from ydb.query.base import QueryStatsMode, QueryExplainResultFormat from ydb.query.session import QuerySession @@ -174,3 +177,37 @@ def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode): assert len(stats.query_plan) > 0 else: assert stats.query_plan == "" + + def test_explain(self, pool: QuerySessionPool): + pool.execute_with_retries("DROP TABLE IF EXISTS test_explain") + pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))") + try: + plan_fullscan = "" + plan_lookup = "" + + def callee(session: QuerySession): + nonlocal plan_fullscan, plan_lookup + + plan = session.explain("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR) + isinstance(plan, str) + assert "FullScan" in plan + + plan_fullscan = session.explain( + "SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT + ) + + plan_lookup = session.explain( + "SELECT * FROM test_explain WHERE id = $id", + {"$id": 1}, + result_format=QueryExplainResultFormat.DICT, + ) + + pool.retry_operation_sync(callee) + + plan_fulltext_string = json.dumps(plan_fullscan) + assert "FullScan" in plan_fulltext_string + + plan_lookup_string = json.dumps(plan_lookup) + assert "Lookup" in plan_lookup_string + finally: + pool.execute_with_retries("DROP TABLE test_explain") diff --git a/tests/query/test_query_session_pool.py b/tests/query/test_query_session_pool.py index 5901c8c8..e4347b1a 100644 --- a/tests/query/test_query_session_pool.py +++ b/tests/query/test_query_session_pool.py @@ -1,3 +1,5 @@ +import json + import pytest import ydb import time @@ -5,6 +7,8 @@ from typing import Optional +from tests.conftest import wait_container_ready +from ydb import QueryExplainResultFormat from ydb.query.pool import QuerySessionPool from ydb.query.session import QuerySession, QuerySessionStateEnum from ydb.query.transaction import QueryTxContext @@ -147,6 +151,7 @@ def test_no_session_leak(self, driver_sync, docker_project): assert pool._current_size == 0 docker_project.start() + wait_container_ready(driver_sync) pool.stop() def test_execute_with_retries_async(self, pool: QuerySessionPool): @@ -200,3 +205,28 @@ def test_async_methods_after_stop_raise(self, pool: QuerySessionPool): pool.stop() with pytest.raises(ydb.SessionPoolClosed): pool.execute_with_retries_async("select 1;") + + def test_explain_with_retries(self, pool: QuerySessionPool): + pool.execute_with_retries("DROP TABLE IF EXISTS test_explain") + pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))") + try: + + plan = pool.explain_with_retries("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR) + isinstance(plan, str) + assert "FullScan" in plan + + plan = pool.explain_with_retries("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT) + assert isinstance(plan, dict) + + plan_string = json.dumps(plan) + assert "FullScan" in plan_string + + plan = pool.explain_with_retries( + "SELECT * FROM test_explain WHERE id = $id", + {"$id": 1}, + result_format=ydb.QueryExplainResultFormat.DICT, + ) + plan_string = json.dumps(plan) + assert "Lookup" in plan_string + finally: + pool.execute_with_retries("DROP TABLE test_explain") diff --git a/ydb/aio/query/pool.py b/ydb/aio/query/pool.py index b691a1b1..a7fdd3b0 100644 --- a/ydb/aio/query/pool.py +++ b/ydb/aio/query/pool.py @@ -4,6 +4,9 @@ Callable, Optional, List, + Dict, + Any, + Union, ) from .session import ( @@ -13,7 +16,7 @@ RetrySettings, retry_operation_async, ) -from ...query.base import BaseQueryTxMode +from ...query.base import BaseQueryTxMode, QueryExplainResultFormat from ...query.base import QueryClientSettings from ... import convert from ..._grpc.grpcwrapper import common_utils @@ -194,6 +197,29 @@ async def wrapped_callee(): return await retry_operation_async(wrapped_callee, retry_settings) + async def explain_with_retries( + self, + query: str, + parameters: Optional[dict] = None, + *, + result_format: QueryExplainResultFormat = QueryExplainResultFormat.STR, + retry_settings: Optional[RetrySettings] = None, + ) -> Union[str, Dict[str, Any]]: + """ + Explain a query in retriable way. No real query execution will happen. + + :param query: A query, yql or sql text. + :param parameters: dict with parameters and YDB types; + :param result_format: Return format: string or dict. + :param retry_settings: RetrySettings object. + :return: Parsed query plan. + """ + + async def callee(session: QuerySession): + return await session.explain(query, parameters, result_format=result_format) + + return await self.retry_operation_async(callee, retry_settings) + async def stop(self): self._should_stop.set() diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index 83d527b7..13906164 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -1,7 +1,11 @@ import asyncio +import json from typing import ( Optional, + Dict, + Any, + Union, ) from .base import AsyncResponseContextIterator @@ -162,3 +166,29 @@ async def execute( ), error_converter=stream_error_converter, ) + + async def explain( + self, + query: str, + parameters: Optional[dict] = None, + result_format: base.QueryExplainResultFormat = base.QueryExplainResultFormat.STR, + ) -> Union[str, Dict[str, Any]]: + """Explains query result + :param query: YQL or SQL query. + :param parameters: dict with parameters and YDB types; + :param result_format: Return format: string or dict. + :return: Parsed query plan. + """ + + res = await self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN) + + # it needs to read result sets for set last_query_stats as sideeffect + async for _ in res: + pass + + plan = self.last_query_stats.query_plan + + if result_format == base.QueryExplainResultFormat.DICT: + plan = json.loads(plan) + + return plan diff --git a/ydb/query/__init__.py b/ydb/query/__init__.py index 76436f98..56e87524 100644 --- a/ydb/query/__init__.py +++ b/ydb/query/__init__.py @@ -1,5 +1,6 @@ __all__ = [ "BaseQueryTxMode", + "QueryExplainResultFormat", "QueryOnlineReadOnly", "QuerySerializableReadWrite", "QuerySnapshotReadOnly", @@ -15,6 +16,7 @@ from .base import ( QueryClientSettings, + QueryExplainResultFormat, QueryStatsMode, ) diff --git a/ydb/query/base.py b/ydb/query/base.py index 4007e72d..d69aa81d 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -42,6 +42,11 @@ class QueryExecMode(enum.IntEnum): EXECUTE = 50 +class QueryExplainResultFormat(enum.Enum): + STR = 0 + DICT = 10 + + class QueryStatsMode(enum.IntEnum): UNSPECIFIED = 0 NONE = 10 diff --git a/ydb/query/pool.py b/ydb/query/pool.py index fc05950c..dd2ac021 100644 --- a/ydb/query/pool.py +++ b/ydb/query/pool.py @@ -4,12 +4,15 @@ Callable, Optional, List, + Dict, + Any, + Union, ) import time import threading import queue -from .base import BaseQueryTxMode +from .base import BaseQueryTxMode, QueryExplainResultFormat from .base import QueryClientSettings from .session import ( QuerySession, @@ -270,6 +273,29 @@ def execute_with_retries_async( **kwargs, ) + def explain_with_retries( + self, + query: str, + parameters: Optional[dict] = None, + *, + result_format: QueryExplainResultFormat = QueryExplainResultFormat.STR, + retry_settings: Optional[RetrySettings] = None, + ) -> Union[str, Dict[str, Any]]: + """ + Explain a query in retriable way. No real query execution will happen. + + :param query: A query, yql or sql text. + :param parameters: dict with parameters and YDB types; + :param result_format: Return format: string or dict. + :param retry_settings: RetrySettings object. + :return: Parsed query plan. + """ + + def callee(session: QuerySession): + return session.explain(query, parameters, result_format=result_format) + + return self.retry_operation_sync(callee, retry_settings) + def stop(self, timeout=None): acquire_timeout = timeout if timeout is not None else -1 acquired = self._lock.acquire(timeout=acquire_timeout) diff --git a/ydb/query/session.py b/ydb/query/session.py index 5cfbdc6c..48fa6463 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -1,13 +1,18 @@ import abc import enum +import json import logging import threading from typing import ( Iterable, Optional, + Dict, + Any, + Union, ) from . import base +from .base import QueryExplainResultFormat from .. import _apis, issues, _utilities from ..settings import BaseRequestSettings @@ -371,3 +376,29 @@ def execute( settings=self._settings, ), ) + + def explain( + self, + query: str, + parameters: dict = None, + *, + result_format: QueryExplainResultFormat = QueryExplainResultFormat.STR, + ) -> Union[str, Dict[str, Any]]: + """Explains query result + :param query: YQL or SQL query. + :param parameters: dict with parameters and YDB types; + :param result_format: Return format: string or dict. + :return: Parsed query plan. + """ + + res = self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN) + + # is needs to read result sets for set last_query_stats as sideeffect + for _ in res: + pass + + plan = self.last_query_stats.query_plan + if result_format == QueryExplainResultFormat.DICT: + plan = json.loads(plan) + + return plan