@@ -64,6 +64,26 @@ async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic
6464
6565 assert message != batch .messages [0 ]
6666
67+ async def test_commit_offset_works (self , driver , topic_with_messages , topic_consumer ):
68+ for out in ["123" , "456" , "789" , "0" ]:
69+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
70+ message = await reader .receive_message ()
71+ assert message .data .decode () == out
72+
73+ await driver .topic_client .commit_offset (
74+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
75+ )
76+
77+ async def test_reader_reconnect_after_commit_offset (self , driver , topic_with_messages , topic_consumer ):
78+ async with driver .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
79+ for out in ["123" , "456" , "789" , "0" ]:
80+ message = await reader .receive_message ()
81+ assert message .data .decode () == out
82+
83+ await driver .topic_client .commit_offset (
84+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
85+ )
86+
6787 async def test_read_compressed_messages (self , driver , topic_path , topic_consumer ):
6888 async with driver .topic_client .writer (topic_path , codec = ydb .TopicCodec .GZIP ) as writer :
6989 await writer .write ("123" )
@@ -183,6 +203,26 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_
183203
184204 assert message != batch .messages [0 ]
185205
206+ def test_commit_offset_works (self , driver_sync , topic_with_messages , topic_consumer ):
207+ for out in ["123" , "456" , "789" , "0" ]:
208+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
209+ message = reader .receive_message ()
210+ assert message .data .decode () == out
211+
212+ driver_sync .topic_client .commit_offset (
213+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
214+ )
215+
216+ def test_reader_reconnect_after_commit_offset (self , driver_sync , topic_with_messages , topic_consumer ):
217+ with driver_sync .topic_client .reader (topic_with_messages , topic_consumer ) as reader :
218+ for out in ["123" , "456" , "789" , "0" ]:
219+ message = reader .receive_message ()
220+ assert message .data .decode () == out
221+
222+ driver_sync .topic_client .commit_offset (
223+ topic_with_messages , topic_consumer , message .partition_id , message .offset + 1
224+ )
225+
186226 def test_read_compressed_messages (self , driver_sync , topic_path , topic_consumer ):
187227 with driver_sync .topic_client .writer (topic_path , codec = ydb .TopicCodec .GZIP ) as writer :
188228 writer .write ("123" )
0 commit comments