Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Add method explain for explain query to QuerySession and QuerySessionPool classes

## 3.21.8 ##
* Fix: convert gRPC stream termination to YDB errors in async query client

Expand Down
1 change: 0 additions & 1 deletion docker-compose-tls.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ services:
volumes:
- ./ydb_certs:/ydb_certs
environment:
- YDB_USE_IN_MEMORY_PDISKS=true
- YDB_ENABLE_COLUMN_TABLES=true
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,4 @@ services:
- 2136:2136
hostname: localhost
environment:
- YDB_USE_IN_MEMORY_PDISKS=true
- YDB_ENABLE_COLUMN_TABLES=true
37 changes: 37 additions & 0 deletions tests/aio/query/test_query_session.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import json

import pytest

import ydb
from ydb import QueryExplainResultFormat
from ydb.aio.query.session import QuerySession


Expand Down Expand Up @@ -125,3 +128,37 @@ async def test_terminated_stream_raises_ydb_error(self, session: QuerySession):
async with await session.execute("select 1") as results:
async for _ in results:
pass

@pytest.mark.asyncio
async def test_explain(self, pool: ydb.aio.query.QuerySessionPool):
await pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
await pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
try:
plan_fullscan = ""
plan_lookup = ""

async def callee(session: QuerySession):
nonlocal plan_fullscan, plan_lookup

plan = await session.explain("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR)
isinstance(plan, str)
assert "FullScan" in plan

plan_fullscan = await session.explain(
"SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT
)
plan_lookup = await session.explain(
"SELECT * FROM test_explain WHERE id = $id",
{"$id": 1},
result_format=QueryExplainResultFormat.DICT,
)

await pool.retry_operation_async(callee)

plan_fulltext_string = json.dumps(plan_fullscan)
assert "FullScan" in plan_fulltext_string

plan_lookup_string = json.dumps(plan_lookup)
assert "Lookup" in plan_lookup_string
finally:
await pool.execute_with_retries("DROP TABLE test_explain")
32 changes: 32 additions & 0 deletions tests/aio/query/test_query_session_pool.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio
import json

import pytest
import ydb

from typing import Optional

from tests.conftest import wait_container_ready_async
from ydb import QueryExplainResultFormat
from ydb.aio.query.pool import QuerySessionPool
from ydb.aio.query.session import QuerySession, QuerySessionStateEnum
from ydb.aio.query.transaction import QueryTxContext
Expand Down Expand Up @@ -162,6 +166,7 @@ async def test_no_session_leak(self, driver, docker_project):

docker_project.start()
await pool.stop()
await wait_container_ready_async(driver)

@pytest.mark.asyncio
async def test_acquire_no_race_condition(self, driver):
Expand All @@ -179,3 +184,30 @@ async def acquire_session():

assert len(ids) == 1
assert pool._current_size == 1

@pytest.mark.asyncio
async def test_explain_with_retries(self, pool: QuerySessionPool):
await pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
await pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
try:
plan = await pool.explain_with_retries(
"SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR
)
isinstance(plan, str)
assert "FullScan" in plan

plan = await pool.explain_with_retries(
"SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT
)
plan_string = json.dumps(plan)
assert "FullScan" in plan_string

plan = await pool.explain_with_retries(
"SELECT * FROM test_explain WHERE id = $id",
{"$id": 1},
result_format=QueryExplainResultFormat.DICT,
)
plan_string = json.dumps(plan)
assert "Lookup" in plan_string
finally:
await pool.execute_with_retries("DROP TABLE test_explain")
3 changes: 3 additions & 0 deletions tests/aio/test_connection_pool.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import asyncio
from unittest.mock import MagicMock

from tests.conftest import wait_container_ready_async
from ydb.aio.driver import Driver
import pytest
import ydb
Expand Down Expand Up @@ -108,6 +109,7 @@ async def restart_docker():
await asyncio.gather(*coros, return_exceptions=False)

docker_project.start()
await wait_container_ready_async(driver)
await driver.stop()


Expand All @@ -132,6 +134,7 @@ async def test_disconnect_by_call(endpoint, database, docker_project):
await asyncio.sleep(5)
assert len(driver._store.connections) == 0
docker_project.start()
await wait_container_ready_async(driver)
await driver.stop()


Expand Down
3 changes: 3 additions & 0 deletions tests/aio/test_session_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pytest
import ydb
from tests.conftest import wait_container_ready_async


@pytest.mark.asyncio
Expand Down Expand Up @@ -90,6 +91,7 @@ async def test_no_cluster_endpoints_no_failure(driver, docker_project):
await pool.release(sess)
assert pool._active_count == 0
await pool.stop()
await wait_container_ready_async(driver)


@pytest.mark.asyncio
Expand Down Expand Up @@ -144,3 +146,4 @@ async def test_no_session_leak(driver, docker_project):

docker_project.start()
await pool.stop()
await wait_container_ready_async(driver)
20 changes: 20 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import os

import pytest
Expand Down Expand Up @@ -30,6 +31,25 @@ def wait_container_ready(driver):
raise RuntimeError("Container is not ready after timeout.")


async def wait_container_ready_async(driver):
await driver.wait(timeout=30)

async with ydb.aio.SessionPool(driver, 1) as pool:

started_at = time.time()
while time.time() - started_at < 30:
try:
async with pool.checkout() as session:
await session.execute_scheme("create table `.sys_health/test_table` (A int32, primary key(A));")

return True

except ydb.Error:
await asyncio.sleep(1)

raise RuntimeError("Container is not ready after timeout.")


@pytest.fixture(scope="module")
def endpoint(pytestconfig, module_scoped_container_getter):
with ydb.Driver(endpoint="localhost:2136", database="/local") as driver:
Expand Down
39 changes: 38 additions & 1 deletion tests/query/test_query_session.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import json

import pytest
import threading
import time
from concurrent.futures import _base as b
from unittest import mock

from ydb.query.base import QueryStatsMode
from ydb import QuerySessionPool
from ydb.query.base import QueryStatsMode, QueryExplainResultFormat
from ydb.query.session import QuerySession


Expand Down Expand Up @@ -174,3 +177,37 @@ def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode):
assert len(stats.query_plan) > 0
else:
assert stats.query_plan == ""

def test_explain(self, pool: QuerySessionPool):
pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
try:
plan_fullscan = ""
plan_lookup = ""

def callee(session: QuerySession):
nonlocal plan_fullscan, plan_lookup

plan = session.explain("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR)
isinstance(plan, str)
assert "FullScan" in plan

plan_fullscan = session.explain(
"SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT
)

plan_lookup = session.explain(
"SELECT * FROM test_explain WHERE id = $id",
{"$id": 1},
result_format=QueryExplainResultFormat.DICT,
)

pool.retry_operation_sync(callee)

plan_fulltext_string = json.dumps(plan_fullscan)
assert "FullScan" in plan_fulltext_string

plan_lookup_string = json.dumps(plan_lookup)
assert "Lookup" in plan_lookup_string
finally:
pool.execute_with_retries("DROP TABLE test_explain")
30 changes: 30 additions & 0 deletions tests/query/test_query_session_pool.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import json

import pytest
import ydb
import time
from concurrent import futures

from typing import Optional

from tests.conftest import wait_container_ready
from ydb import QueryExplainResultFormat
from ydb.query.pool import QuerySessionPool
from ydb.query.session import QuerySession, QuerySessionStateEnum
from ydb.query.transaction import QueryTxContext
Expand Down Expand Up @@ -147,6 +151,7 @@ def test_no_session_leak(self, driver_sync, docker_project):
assert pool._current_size == 0

docker_project.start()
wait_container_ready(driver_sync)
pool.stop()

def test_execute_with_retries_async(self, pool: QuerySessionPool):
Expand Down Expand Up @@ -200,3 +205,28 @@ def test_async_methods_after_stop_raise(self, pool: QuerySessionPool):
pool.stop()
with pytest.raises(ydb.SessionPoolClosed):
pool.execute_with_retries_async("select 1;")

def test_explain_with_retries(self, pool: QuerySessionPool):
pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
try:

plan = pool.explain_with_retries("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.STR)
isinstance(plan, str)
assert "FullScan" in plan

plan = pool.explain_with_retries("SELECT * FROM test_explain", result_format=QueryExplainResultFormat.DICT)
assert isinstance(plan, dict)

plan_string = json.dumps(plan)
assert "FullScan" in plan_string

plan = pool.explain_with_retries(
"SELECT * FROM test_explain WHERE id = $id",
{"$id": 1},
result_format=ydb.QueryExplainResultFormat.DICT,
)
plan_string = json.dumps(plan)
assert "Lookup" in plan_string
finally:
pool.execute_with_retries("DROP TABLE test_explain")
28 changes: 27 additions & 1 deletion ydb/aio/query/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
Callable,
Optional,
List,
Dict,
Any,
Union,
)

from .session import (
Expand All @@ -13,7 +16,7 @@
RetrySettings,
retry_operation_async,
)
from ...query.base import BaseQueryTxMode
from ...query.base import BaseQueryTxMode, QueryExplainResultFormat
from ...query.base import QueryClientSettings
from ... import convert
from ..._grpc.grpcwrapper import common_utils
Expand Down Expand Up @@ -194,6 +197,29 @@ async def wrapped_callee():

return await retry_operation_async(wrapped_callee, retry_settings)

async def explain_with_retries(
self,
query: str,
parameters: Optional[dict] = None,
*,
result_format: QueryExplainResultFormat = QueryExplainResultFormat.STR,
retry_settings: Optional[RetrySettings] = None,
) -> Union[str, Dict[str, Any]]:
"""
Explain a query in retriable way. No real query execution will happen.

:param query: A query, yql or sql text.
:param parameters: dict with parameters and YDB types;
:param result_format: Return format: string or dict.
:param retry_settings: RetrySettings object.
:return: Parsed query plan.
"""

async def callee(session: QuerySession):
return await session.explain(query, parameters, result_format=result_format)

return await self.retry_operation_async(callee, retry_settings)

async def stop(self):
self._should_stop.set()

Expand Down
30 changes: 30 additions & 0 deletions ydb/aio/query/session.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import asyncio
import json

from typing import (
Optional,
Dict,
Any,
Union,
)

from .base import AsyncResponseContextIterator
Expand Down Expand Up @@ -162,3 +166,29 @@ async def execute(
),
error_converter=stream_error_converter,
)

async def explain(
self,
query: str,
parameters: Optional[dict] = None,
result_format: base.QueryExplainResultFormat = base.QueryExplainResultFormat.STR,
) -> Union[str, Dict[str, Any]]:
"""Explains query result
:param query: YQL or SQL query.
:param parameters: dict with parameters and YDB types;
:param result_format: Return format: string or dict.
:return: Parsed query plan.
"""

res = await self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN)
Copy link
Preview

Copilot AI Aug 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the sync version, this result consumption pattern could be made more explicit or documented to clarify why the async iterator must be exhausted.

Suggested change
res = await self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN)
res = await self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN)
# Exhaust the async iterator to ensure that all results are processed
# and self.last_query_stats is updated with the query plan.

Copilot uses AI. Check for mistakes.


# it needs to read result sets for set last_query_stats as sideeffect
async for _ in res:
pass

plan = self.last_query_stats.query_plan

if result_format == base.QueryExplainResultFormat.DICT:
plan = json.loads(plan)

return plan
Loading
Loading