Skip to content

Commit 4a8f015

Browse files
authored
Merge pull request #102 from FlorianLB/fix-dynamic-ack
fix: ack to the right stream in case of dynamic queue name
2 parents 0ff98e1 + cb635b6 commit 4a8f015

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

taskiq_redis/redis_broker.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,11 @@ async def kick(self, message: BrokerMessage) -> None:
260260
approximate=self.approximate,
261261
)
262262

263-
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
263+
def _ack_generator(self, id: str, queue_name: str) -> Callable[[], Awaitable[None]]:
264264
async def _ack() -> None:
265265
async with Redis(connection_pool=self.connection_pool) as redis_conn:
266266
await redis_conn.xack(
267-
self.queue_name,
267+
queue_name,
268268
self.consumer_group_name,
269269
id,
270270
)
@@ -287,12 +287,12 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
287287
noack=False,
288288
count=self.count,
289289
)
290-
for _, msg_list in fetched:
290+
for stream, msg_list in fetched:
291291
for msg_id, msg in msg_list:
292292
logger.debug("Received message: %s", msg)
293293
yield AckableMessage(
294294
data=msg[b"data"],
295-
ack=self._ack_generator(msg_id),
295+
ack=self._ack_generator(id=msg_id, queue_name=stream),
296296
)
297297
logger.debug("Starting fetching unacknowledged messages")
298298
for stream in [self.queue_name, *self.additional_streams.keys()]:
@@ -318,5 +318,5 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
318318
logger.debug("Received message: %s", msg)
319319
yield AckableMessage(
320320
data=msg[b"data"],
321-
ack=self._ack_generator(msg_id),
321+
ack=self._ack_generator(id=msg_id, queue_name=stream),
322322
)

taskiq_redis/redis_cluster_broker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -171,10 +171,10 @@ async def kick(self, message: BrokerMessage) -> None:
171171
approximate=self.approximate,
172172
)
173173

174-
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
174+
def _ack_generator(self, id: str, queue_name: str) -> Callable[[], Awaitable[None]]:
175175
async def _ack() -> None:
176176
await self.redis.xack(
177-
self.queue_name,
177+
queue_name,
178178
self.consumer_group_name,
179179
id,
180180
)
@@ -194,10 +194,10 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
194194
block=self.block,
195195
noack=False,
196196
)
197-
for _, msg_list in fetched:
197+
for stream, msg_list in fetched:
198198
for msg_id, msg in msg_list:
199199
logger.debug("Received message: %s", msg)
200200
yield AckableMessage(
201201
data=msg[b"data"],
202-
ack=self._ack_generator(msg_id),
202+
ack=self._ack_generator(id=msg_id, queue_name=stream),
203203
)

taskiq_redis/redis_sentinel_broker.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -239,11 +239,11 @@ async def kick(self, message: BrokerMessage) -> None:
239239
approximate=self.approximate,
240240
)
241241

242-
def _ack_generator(self, id: str) -> Callable[[], Awaitable[None]]:
242+
def _ack_generator(self, id: str, queue_name: str) -> Callable[[], Awaitable[None]]:
243243
async def _ack() -> None:
244244
async with self._acquire_master_conn() as redis_conn:
245245
await redis_conn.xack(
246-
self.queue_name,
246+
queue_name,
247247
self.consumer_group_name,
248248
id,
249249
)
@@ -264,10 +264,10 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
264264
block=self.block,
265265
noack=False,
266266
)
267-
for _, msg_list in fetched:
267+
for stream, msg_list in fetched:
268268
for msg_id, msg in msg_list:
269269
logger.debug("Received message: %s", msg)
270270
yield AckableMessage(
271271
data=msg[b"data"],
272-
ack=self._ack_generator(msg_id),
272+
ack=self._ack_generator(id=msg_id, queue_name=stream),
273273
)

0 commit comments

Comments
 (0)