Skip to content

Commit 2d2b87e

Browse files
committed
Add explain method to query session and pool
1 parent a2536ff commit 2d2b87e

File tree

8 files changed

+157
-0
lines changed

8 files changed

+157
-0
lines changed

tests/aio/query/test_query_session.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
import pytest
24

35
import ydb
@@ -125,3 +127,30 @@ async def test_terminated_stream_raises_ydb_error(self, session: QuerySession):
125127
async with await session.execute("select 1") as results:
126128
async for _ in results:
127129
pass
130+
131+
@pytest.mark.asyncio
132+
async def test_explain(self, pool: ydb.aio.query.QuerySessionPool):
133+
await pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
134+
await pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
135+
try:
136+
plan_fullscan = ""
137+
plan_lookup = ""
138+
139+
async def callee(session: QuerySession):
140+
nonlocal plan_fullscan, plan_lookup
141+
142+
plan_fullscan = await session.explain("SELECT * FROM test_explain")
143+
plan_lookup = await session.explain(
144+
"SELECT * FROM test_explain WHERE id = $id",
145+
{"$id": 1},
146+
)
147+
148+
await pool.retry_operation_async(callee)
149+
150+
plan_fulltext_string = json.dumps(plan_fullscan)
151+
assert "FullScan" in plan_fulltext_string
152+
153+
plan_lookup_string = json.dumps(plan_lookup)
154+
assert "Lookup" in plan_lookup_string
155+
finally:
156+
await pool.execute_with_retries("DROP TABLE test_explain")

tests/aio/query/test_query_session_pool.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import json
3+
24
import pytest
35
import ydb
46

@@ -179,3 +181,21 @@ async def acquire_session():
179181

180182
assert len(ids) == 1
181183
assert pool._current_size == 1
184+
185+
@pytest.mark.asyncio
186+
async def test_explain_with_retries(self, pool: QuerySessionPool):
187+
await pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
188+
await pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
189+
try:
190+
plan = await pool.explain_with_retries("SELECT * FROM test_explain")
191+
plan_string = json.dumps(plan)
192+
assert "FullScan" in plan_string
193+
194+
plan = await pool.explain_with_retries(
195+
"SELECT * FROM test_explain WHERE id = $id",
196+
{"$id": 1},
197+
)
198+
plan_string = json.dumps(plan)
199+
assert "Lookup" in plan_string
200+
finally:
201+
await pool.execute_with_retries("DROP TABLE test_explain")

tests/query/test_query_session.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
import json
2+
13
import pytest
24
import threading
35
import time
46
from concurrent.futures import _base as b
57
from unittest import mock
68

9+
from ydb import QuerySessionPool
710
from ydb.query.base import QueryStatsMode
811
from ydb.query.session import QuerySession
912

@@ -174,3 +177,28 @@ def test_stats_mode(self, session: QuerySession, stats_mode: QueryStatsMode):
174177
assert len(stats.query_plan) > 0
175178
else:
176179
assert stats.query_plan == ""
180+
181+
def test_explain(self, pool: QuerySessionPool):
182+
pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
183+
pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
184+
try:
185+
plan_fullscan = ""
186+
plan_lookup = ""
187+
188+
def callee(session: QuerySession):
189+
nonlocal plan_fullscan, plan_lookup
190+
plan_fullscan = session.explain("SELECT * FROM test_explain")
191+
plan_lookup = session.explain(
192+
"SELECT * FROM test_explain WHERE id = $id",
193+
{"$id": 1},
194+
)
195+
196+
pool.retry_operation_sync(callee)
197+
198+
plan_fulltext_string = json.dumps(plan_fullscan)
199+
assert "FullScan" in plan_fulltext_string
200+
201+
plan_lookup_string = json.dumps(plan_lookup)
202+
assert "Lookup" in plan_lookup_string
203+
finally:
204+
pool.execute_with_retries("DROP TABLE test_explain")

tests/query/test_query_session_pool.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
import pytest
24
import ydb
35
import time
@@ -200,3 +202,20 @@ def test_async_methods_after_stop_raise(self, pool: QuerySessionPool):
200202
pool.stop()
201203
with pytest.raises(ydb.SessionPoolClosed):
202204
pool.execute_with_retries_async("select 1;")
205+
206+
def test_explain_with_retries(self, pool: QuerySessionPool):
207+
pool.execute_with_retries("DROP TABLE IF EXISTS test_explain")
208+
pool.execute_with_retries("CREATE TABLE test_explain (id Int64, PRIMARY KEY (id))")
209+
try:
210+
plan = pool.explain_with_retries("SELECT * FROM test_explain")
211+
plan_string = json.dumps(plan)
212+
assert "FullScan" in plan_string
213+
214+
plan = pool.explain_with_retries(
215+
"SELECT * FROM test_explain WHERE id = $id",
216+
{"$id": 1},
217+
)
218+
plan_string = json.dumps(plan)
219+
assert "Lookup" in plan_string
220+
finally:
221+
pool.execute_with_retries("DROP TABLE test_explain")

ydb/aio/query/pool.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
Callable,
55
Optional,
66
List,
7+
Dict,
8+
Any,
79
)
810

911
from .session import (
@@ -194,6 +196,18 @@ async def wrapped_callee():
194196

195197
return await retry_operation_async(wrapped_callee, retry_settings)
196198

199+
async def explain_with_retries(
200+
self,
201+
query: str,
202+
parameters: Optional[dict] = None,
203+
*,
204+
retry_settings: Optional[RetrySettings] = None,
205+
) -> Dict[str, Any]:
206+
async def callee(session: QuerySession):
207+
return await session.explain(query, parameters)
208+
209+
return await self.retry_operation_async(callee, retry_settings)
210+
197211
async def stop(self):
198212
self._should_stop.set()
199213

ydb/aio/query/session.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import asyncio
2+
import json
23

34
from typing import (
45
Optional,
6+
Dict,
7+
Any,
58
)
69

710
from .base import AsyncResponseContextIterator
@@ -162,3 +165,16 @@ async def execute(
162165
),
163166
error_converter=stream_error_converter,
164167
)
168+
169+
async def explain(self, query: str, parameters: Optional[dict] = None) -> Dict[str, Any]:
170+
"""Explains query result
171+
:param query: YQL or SQL query.
172+
"""
173+
174+
res = await self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN)
175+
async for _ in res:
176+
pass
177+
178+
plan_text = self.last_query_stats.query_plan
179+
plan = json.loads(plan_text)
180+
return plan

ydb/query/pool.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
Callable,
55
Optional,
66
List,
7+
Dict,
8+
Any,
79
)
810
import time
911
import threading
@@ -270,6 +272,18 @@ def execute_with_retries_async(
270272
**kwargs,
271273
)
272274

275+
def explain_with_retries(
276+
self,
277+
query: str,
278+
parameters: Optional[dict] = None,
279+
*,
280+
retry_settings: Optional[RetrySettings] = None,
281+
) -> Dict[str, Any]:
282+
def callee(session: QuerySession):
283+
return session.explain(query, parameters)
284+
285+
return self.retry_operation_sync(callee, retry_settings)
286+
273287
def stop(self, timeout=None):
274288
acquire_timeout = timeout if timeout is not None else -1
275289
acquired = self._lock.acquire(timeout=acquire_timeout)

ydb/query/session.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import abc
22
import enum
3+
import json
34
import logging
45
import threading
56
from typing import (
67
Iterable,
78
Optional,
9+
Dict,
10+
Any,
811
)
912

1013
from . import base
@@ -371,3 +374,17 @@ def execute(
371374
settings=self._settings,
372375
),
373376
)
377+
378+
def explain(self, query: str, parameters: dict = None) -> Dict[str, Any]:
379+
"""Explains query result
380+
:param query: YQL or SQL query.
381+
:param parameters: dict with parameters and YDB types;
382+
"""
383+
384+
res = self.execute(query, parameters, exec_mode=base.QueryExecMode.EXPLAIN)
385+
for _ in res:
386+
pass
387+
388+
plan_text = self.last_query_stats.query_plan
389+
plan = json.loads(plan_text)
390+
return plan

0 commit comments

Comments
 (0)