@@ -168,6 +168,7 @@ def __init__(
168168 maxlen : Optional [int ] = None ,
169169 idle_timeout : int = 600000 , # 10 minutes
170170 unacknowledged_batch_size : int = 100 ,
171+ xread_count : Optional [int ] = 100 ,
171172 additional_streams : Optional [Dict [str , str ]] = None ,
172173 ** connection_kwargs : Any ,
173174 ) -> None :
@@ -189,6 +190,7 @@ def __init__(
189190 Better to set it to a bigger value, to avoid unnecessary calls.
190191 :param maxlen: sets the maximum length of the stream
191192 trims (the old values of) the stream each time a new element is added
193+ :param xread_count: number of messages to fetch from the stream at once.
192194 :param additional_streams: additional streams to read from.
193195 Each key is a stream name, value is a consumer id.
194196 :param redeliver_timeout: time in ms to wait before redelivering a message.
@@ -211,6 +213,7 @@ def __init__(
211213 self .additional_streams = additional_streams or {}
212214 self .idle_timeout = idle_timeout
213215 self .unacknowledged_batch_size = unacknowledged_batch_size
216+ self .count = xread_count
214217
215218 async def _declare_consumer_group (self ) -> None :
216219 """
@@ -276,6 +279,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
276279 },
277280 block = self .block ,
278281 noack = False ,
282+ count = self .count ,
279283 )
280284 for _ , msg_list in fetched :
281285 for msg_id , msg in msg_list :
0 commit comments