Skip to content

Commit ea7387f

Browse files
committed
Fix topic stream error handling
1 parent 92b60c2 commit ea7387f

File tree

4 files changed

+37
-5
lines changed

4 files changed

+37
-5
lines changed

ydb/_grpc/grpcwrapper/common_utils.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import contextvars
77
import datetime
88
import functools
9+
import logging
910
import typing
1011
from typing import (
1112
Optional,
@@ -37,6 +38,8 @@
3738
from ...settings import BaseRequestSettings
3839
from ..._constants import DEFAULT_LONG_STREAM_TIMEOUT
3940

41+
logger = logging.getLogger(__name__)
42+
4043

4144
class IFromProto(abc.ABC):
4245
@staticmethod
@@ -193,28 +196,32 @@ def _clean_executor(self, wait: bool):
193196

194197
async def _start_asyncio_driver(self, driver: DriverIO, stub, method):
195198
requests_iterator = QueueToIteratorAsyncIO(self.from_client_grpc)
196-
stream_call = await driver(
199+
stream_call, on_disconnected_lambda = await driver(
197200
requests_iterator,
198201
stub,
199202
method,
200203
settings=self._stream_settings,
204+
include_disconnected_lambda_to_result=True,
201205
)
202206
self._stream_call = stream_call
207+
self._on_disconnected_lambda = on_disconnected_lambda
203208
self.from_server_grpc = stream_call.__aiter__()
204209

205210
async def _start_sync_driver(self, driver: Driver, stub, method):
206211
requests_iterator = AsyncQueueToSyncIteratorAsyncIO(self.from_client_grpc)
207212
self._wait_executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
208213

209-
stream_call = await to_thread(
214+
stream_call, on_disconnected_lambda = await to_thread(
210215
driver,
211216
requests_iterator,
212217
stub,
213218
method,
214219
executor=self._wait_executor,
215220
settings=self._stream_settings,
221+
include_disconnected_lambda_to_result=True,
216222
)
217223
self._stream_call = stream_call
224+
self._on_disconnected_lambda = on_disconnected_lambda
218225
self.from_server_grpc = SyncToAsyncIterator(stream_call.__iter__(), self._wait_executor)
219226

220227
async def receive(self, timeout: Optional[int] = None) -> Any:
@@ -230,6 +237,11 @@ async def get_response():
230237
grpc_message = await asyncio.wait_for(get_response(), timeout)
231238

232239
except (grpc.RpcError, grpc.aio.AioRpcError) as e:
240+
if self._on_disconnected_lambda:
241+
coro = self._on_disconnected_lambda()
242+
if asyncio.iscoroutine(coro):
243+
await coro
244+
233245
raise connection._rpc_error_handler(self._connection_state, e)
234246

235247
issues._process_response(grpc_message)

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,12 +248,15 @@ async def _connection_loop(self):
248248
self._state_changed.set()
249249
await self._stream_reader.wait_error()
250250
except BaseException as err:
251+
logger.debug("reader %s, attempt %s connection loop error %s", self._id, attempt, err)
251252
retry_info = check_retriable_error(err, self._settings._retry_settings(), attempt)
252253
if not retry_info.is_retriable:
253254
logger.debug("reader %s stop connection loop due to %s", self._id, err)
254255
self._set_first_error(err)
255256
return
256257

258+
logger.debug("sleep before retry for %s seconds", retry_info.sleep_timeout_seconds)
259+
257260
await asyncio.sleep(retry_info.sleep_timeout_seconds)
258261

259262
attempt += 1
@@ -498,6 +501,9 @@ async def _start(self, stream: IGrpcWrapperAsyncIO, init_message: StreamReadMess
498501
) # type: StreamReadMessage.FromServer
499502
except asyncio.TimeoutError:
500503
raise TopicReaderError("Timeout waiting for init response")
504+
except Exception as e:
505+
logger.debug("reader stream %s init request error %s", self._id, e)
506+
raise e
501507

502508
if isinstance(init_response.server_message, StreamReadMessage.InitResponse):
503509
self._session_id = init_response.server_message.session_id

ydb/aio/pool.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,7 @@ async def __call__(
259259
wrap_args=(),
260260
preferred_endpoint=None,
261261
fast_fail=False,
262+
include_disconnected_lambda_to_result=False,
262263
):
263264
if self._stopped:
264265
raise issues.Error("Driver was stopped")
@@ -270,12 +271,18 @@ async def __call__(
270271
self._discovery.notify_disconnected()
271272
raise
272273

273-
return await connection(
274+
on_disconnected = self._on_disconnected(connection)
275+
276+
res = await connection(
274277
request,
275278
stub,
276279
rpc_name,
277280
wrap_result,
278281
settings,
279282
wrap_args,
280-
self._on_disconnected(connection),
283+
on_disconnected,
281284
)
285+
286+
if include_disconnected_lambda_to_result:
287+
return res, on_disconnected
288+
return res

ydb/pool.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,6 +437,7 @@ def __call__(
437437
settings=None,
438438
wrap_args=(),
439439
preferred_endpoint=None,
440+
include_disconnected_lambda_to_result=False,
440441
):
441442
"""
442443
Synchronously sends request constructed by client library
@@ -461,16 +462,22 @@ def __call__(
461462
self._discovery_thread.notify_disconnected()
462463
raise
463464

465+
on_disconnected = lambda: self._on_disconnected(connection)
466+
464467
res = connection(
465468
request,
466469
stub,
467470
rpc_name,
468471
wrap_result,
469472
settings,
470473
wrap_args,
471-
lambda: self._on_disconnected(connection),
474+
on_disconnected,
472475
)
476+
473477
tracing.trace(self.tracer, {"response": res}, trace_level=tracing.TraceLevel.DEBUG)
478+
479+
if include_disconnected_lambda_to_result:
480+
return res, on_disconnected
474481
return res
475482

476483
@_utilities.wrap_async_call_exceptions

0 commit comments

Comments
 (0)