Skip to content

Commit ff191ca

Browse files
authored
Merge pull request #44 from taskiq-python/feature/direct-routing
2 parents ddd8f73 + 212ff01 commit ff191ca

File tree

3 files changed

+59
-5
lines changed

3 files changed

+59
-5
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,4 +157,3 @@ broker = AioPikaBroker(
157157
```
158158

159159
This will ensure that the queue is created with your custom arguments, in addition to the broker's defaults.
160-

taskiq_aio_pika/broker.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,14 @@ async def kick(self, message: BrokerMessage) -> None:
266266
self._exchange_name,
267267
ensure=False,
268268
)
269-
await exchange.publish(rmq_message, routing_key=message.task_name)
269+
270+
routing_key = message.task_name
271+
272+
# Because direct exchange uses exact routing key for routing
273+
if self._exchange_type == ExchangeType.DIRECT:
274+
routing_key = self._routing_key
275+
276+
await exchange.publish(rmq_message, routing_key=routing_key)
270277
elif self._delayed_message_exchange_plugin:
271278
rmq_message.headers["x-delay"] = int(delay * 1000)
272279
exchange = await self.write_channel.get_exchange(

tests/test_broker.py

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,24 @@
22
import uuid
33

44
import pytest
5-
from aio_pika import Channel, Message
5+
from aio_pika import Channel, ExchangeType, Message
66
from aio_pika.exceptions import QueueEmpty
77
from taskiq import AckableMessage, BrokerMessage
88
from taskiq.utils import maybe_awaitable
99

1010
from taskiq_aio_pika.broker import AioPikaBroker
1111

1212

13-
async def get_first_task(broker: AioPikaBroker) -> AckableMessage: # type: ignore
13+
async def get_first_task(broker: AioPikaBroker) -> AckableMessage:
1414
"""
1515
Get first message from the queue.
1616
1717
:param broker: async message broker.
1818
:return: first message from listen method
1919
"""
20-
async for message in broker.listen(): # noqa: RET503
20+
async for message in broker.listen():
2121
return message
22+
return None # type: ignore
2223

2324

2425
@pytest.mark.anyio
@@ -219,3 +220,50 @@ async def test_delayed_message_with_plugin(
219220
await asyncio.sleep(2)
220221

221222
assert await main_queue.get()
223+
224+
225+
@pytest.mark.anyio
226+
async def test_direct_kick(
227+
broker: AioPikaBroker,
228+
test_channel: Channel,
229+
queue_name: str,
230+
exchange_name: str,
231+
) -> None:
232+
"""
233+
Test that messages are published and read correctly.
234+
235+
We kick the message and then try to listen to the queue,
236+
and check that message we got is the same as we sent.
237+
"""
238+
queue = await test_channel.get_queue(queue_name)
239+
exchange = await test_channel.get_exchange(exchange_name)
240+
await queue.delete()
241+
await exchange.delete()
242+
243+
broker._declare_exchange = True
244+
broker._exchange_type = ExchangeType.DIRECT
245+
broker._routing_key = "direct_routing_key"
246+
247+
await broker.startup()
248+
249+
await test_channel.get_queue(queue_name, ensure=True)
250+
await test_channel.get_exchange(exchange_name, ensure=True)
251+
252+
task_id = uuid.uuid4().hex
253+
task_name = uuid.uuid4().hex
254+
255+
sent = BrokerMessage(
256+
task_id=task_id,
257+
task_name=task_name,
258+
message=b"my_msg",
259+
labels={
260+
"label1": "val1",
261+
},
262+
)
263+
264+
await broker.kick(sent)
265+
266+
message = await asyncio.wait_for(get_first_task(broker), timeout=0.4)
267+
268+
assert message.data == sent.message
269+
await maybe_awaitable(message.ack())

0 commit comments

Comments
 (0)