Skip to content

Commit 69a062b

Browse files
committed
CommitOffset with readsessionid
1 parent 7de9687 commit 69a062b

File tree

5 files changed

+52
-2
lines changed

5 files changed

+52
-2
lines changed

tests/topics/test_topic_reader.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,20 @@ async def test_commit_offset_works(self, driver, topic_with_messages, topic_cons
7474
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
7575
)
7676

77+
async def test_commit_offset_with_session_id_works(self, driver, topic_with_messages, topic_consumer):
78+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79+
for out in ["123", "456", "789", "0"]:
80+
message = await reader.receive_message()
81+
assert message.data.decode() == out
82+
83+
await driver.topic_client.commit_offset(
84+
topic_with_messages,
85+
topic_consumer,
86+
message.partition_id,
87+
message.offset + 1,
88+
reader.read_session_id,
89+
)
90+
7791
async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
7892
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
7993
for out in ["123", "456", "789", "0"]:
@@ -213,6 +227,20 @@ def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consu
213227
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
214228
)
215229

230+
def test_commit_offset_with_session_id_works(self, driver_sync, topic_with_messages, topic_consumer):
231+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
232+
for out in ["123", "456", "789", "0"]:
233+
message = reader.receive_message()
234+
assert message.data.decode() == out
235+
236+
driver_sync.topic_client.commit_offset(
237+
topic_with_messages,
238+
topic_consumer,
239+
message.partition_id,
240+
message.offset + 1,
241+
reader.read_session_id,
242+
)
243+
216244
def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
217245
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
218246
for out in ["123", "456", "789", "0"]:

ydb/_grpc/grpcwrapper/ydb_topic.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,13 +143,15 @@ class CommitOffsetRequest(IToProto):
143143
consumer: str
144144
partition_id: int
145145
offset: int
146+
read_session_id: Optional[str]
146147

147148
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148149
return ydb_topic_pb2.CommitOffsetRequest(
149150
path=self.path,
150151
consumer=self.consumer,
151152
partition_id=self.partition_id,
152153
offset=self.offset,
154+
read_session_id=self.read_session_id,
153155
)
154156

155157

ydb/_topic_reader/topic_reader_asyncio.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ async def close(self, flush: bool = True):
190190
self._closed = True
191191
await self._reconnector.close(flush)
192192

193+
@property
194+
def read_session_id(self) -> Optional[str]:
195+
return self._reconnector.read_session_id
196+
193197

194198
class ReaderReconnector:
195199
_static_reader_reconnector_counter = AtomicCounter()
@@ -373,6 +377,12 @@ def _set_first_error(self, err: issues.Error):
373377
# skip if already has result
374378
pass
375379

380+
@property
381+
def read_session_id(self) -> Optional[str]:
382+
if not self._stream_reader:
383+
return None
384+
return self._stream_reader._session_id
385+
376386

377387
class ReaderStream:
378388
_static_id_counter = AtomicCounter()

ydb/_topic_reader/topic_reader_sync.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,3 +204,7 @@ def close(self, *, flush: bool = True, timeout: TimeoutType = None):
204204
def _check_closed(self):
205205
if self._closed:
206206
raise TopicReaderClosedError()
207+
208+
@property
209+
def read_session_id(self) -> Optional[str]:
210+
return self._async_reader.read_session_id

ydb/topic.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,15 @@ def tx_writer(
340340

341341
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
342342

343-
async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
343+
async def commit_offset(
344+
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
345+
) -> None:
344346
req = _ydb_topic.CommitOffsetRequest(
345347
path=path,
346348
consumer=consumer,
347349
partition_id=partition_id,
348350
offset=offset,
351+
read_session_id=read_session_id,
349352
)
350353

351354
await self._driver(
@@ -618,12 +621,15 @@ def tx_writer(
618621

619622
return TopicTxWriter(tx, self._driver, settings, _parent=self)
620623

621-
def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
624+
def commit_offset(
625+
self, path: str, consumer: str, partition_id: int, offset: int, read_session_id: Optional[str] = None
626+
) -> None:
622627
req = _ydb_topic.CommitOffsetRequest(
623628
path=path,
624629
consumer=consumer,
625630
partition_id=partition_id,
626631
offset=offset,
632+
read_session_id=read_session_id,
627633
)
628634

629635
self._driver(

0 commit comments

Comments
 (0)