Skip to content

Commit 1034306

Browse files
authored
Merge branch 'taskiq-python:main' into anton/try_except_listen
2 parents ece9df8 + 254fae4 commit 1034306

File tree

8 files changed

+710
-515
lines changed

8 files changed

+710
-515
lines changed

.flake8

Lines changed: 0 additions & 125 deletions
This file was deleted.

.github/workflows/release.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ jobs:
1818
uses: knowsuchagency/poetry-install@v1
1919
env:
2020
POETRY_VIRTUALENVS_CREATE: false
21+
- name: Set version
22+
run: poetry version "${{ github.ref_name }}"
2123
- name: Release package
2224
env:
2325
POETRY_PYPI_TOKEN_PYPI: ${{ secrets.PYPI_TOKEN }}

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ jobs:
3737
steps:
3838
- uses: actions/checkout@v4
3939
- name: Set up Redis instance and Redis cluster
40-
run: docker-compose up -d
40+
run: docker compose up -d
4141
- name: Set up Python
4242
uses: actions/setup-python@v2
4343
with:

.pre-commit-config.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ repos:
1818
hooks:
1919
- id: black
2020
name: Format with Black
21-
entry: black
21+
entry: poetry run black
2222
language: system
2323
types: [python]
2424

@@ -29,12 +29,13 @@ repos:
2929
pass_filenames: false
3030
types: [python]
3131
args:
32+
- check
3233
- "--fix"
3334
- "taskiq_redis"
3435
- "tests"
3536

3637
- id: mypy
3738
name: Validate types with MyPy
38-
entry: mypy
39+
entry: poetry run mypy
3940
language: system
4041
types: [ python ]

poetry.lock

Lines changed: 432 additions & 385 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "taskiq-redis"
3-
version = "1.0.0"
3+
version = "0.0.0"
44
description = "Redis integration for taskiq"
55
authors = ["taskiq-team <[email protected]>"]
66
readme = "README.md"
@@ -39,7 +39,7 @@ pytest-env = "^0.6.2"
3939
fakeredis = "^2"
4040
pre-commit = "^2.20.0"
4141
pytest-xdist = { version = "^2.5.0", extras = ["psutil"] }
42-
ruff = "^0.1.0"
42+
ruff = "^0"
4343
types-redis = "^4.6.0.20240425"
4444

4545
[tool.mypy]

taskiq_redis/redis_backend.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
from taskiq.abc.result_backend import TaskiqResult
2020
from taskiq.abc.serializer import TaskiqSerializer
2121
from taskiq.compat import model_dump, model_validate
22+
from taskiq.depends.progress_tracker import TaskProgress
2223
from taskiq.serializers import PickleSerializer
2324

2425
from taskiq_redis.exceptions import (
@@ -41,6 +42,8 @@
4142

4243
_ReturnType = TypeVar("_ReturnType")
4344

45+
PROGRESS_KEY_SUFFIX = "__progress"
46+
4447

4548
class RedisAsyncResultBackend(AsyncResultBackend[_ReturnType]):
4649
"""Async result based on redis."""
@@ -174,6 +177,55 @@ async def get_result(
174177

175178
return taskiq_result
176179

180+
async def set_progress(
181+
self,
182+
task_id: str,
183+
progress: TaskProgress[_ReturnType],
184+
) -> None:
185+
"""
186+
Sets task progress in redis.
187+
188+
Dumps TaskProgress instance into the bytes and writes
189+
it to redis with a standard suffix on the task_id as the key
190+
191+
:param task_id: ID of the task.
192+
:param result: task's TaskProgress instance.
193+
"""
194+
redis_set_params: Dict[str, Union[str, int, bytes]] = {
195+
"name": task_id + PROGRESS_KEY_SUFFIX,
196+
"value": self.serializer.dumpb(model_dump(progress)),
197+
}
198+
if self.result_ex_time:
199+
redis_set_params["ex"] = self.result_ex_time
200+
elif self.result_px_time:
201+
redis_set_params["px"] = self.result_px_time
202+
203+
async with Redis(connection_pool=self.redis_pool) as redis:
204+
await redis.set(**redis_set_params) # type: ignore
205+
206+
async def get_progress(
207+
self,
208+
task_id: str,
209+
) -> Union[TaskProgress[_ReturnType], None]:
210+
"""
211+
Gets progress results from the task.
212+
213+
:param task_id: task's id.
214+
:return: task's TaskProgress instance.
215+
"""
216+
async with Redis(connection_pool=self.redis_pool) as redis:
217+
result_value = await redis.get(
218+
name=task_id + PROGRESS_KEY_SUFFIX,
219+
)
220+
221+
if result_value is None:
222+
return None
223+
224+
return model_validate(
225+
TaskProgress[_ReturnType],
226+
self.serializer.loadb(result_value),
227+
)
228+
177229

178230
class RedisAsyncClusterResultBackend(AsyncResultBackend[_ReturnType]):
179231
"""Async result backend based on redis cluster."""
@@ -301,6 +353,53 @@ async def get_result(
301353

302354
return taskiq_result
303355

356+
async def set_progress(
357+
self,
358+
task_id: str,
359+
progress: TaskProgress[_ReturnType],
360+
) -> None:
361+
"""
362+
Sets task progress in redis.
363+
364+
Dumps TaskProgress instance into the bytes and writes
365+
it to redis with a standard suffix on the task_id as the key
366+
367+
:param task_id: ID of the task.
368+
:param result: task's TaskProgress instance.
369+
"""
370+
redis_set_params: Dict[str, Union[str, int, bytes]] = {
371+
"name": task_id + PROGRESS_KEY_SUFFIX,
372+
"value": self.serializer.dumpb(model_dump(progress)),
373+
}
374+
if self.result_ex_time:
375+
redis_set_params["ex"] = self.result_ex_time
376+
elif self.result_px_time:
377+
redis_set_params["px"] = self.result_px_time
378+
379+
await self.redis.set(**redis_set_params) # type: ignore
380+
381+
async def get_progress(
382+
self,
383+
task_id: str,
384+
) -> Union[TaskProgress[_ReturnType], None]:
385+
"""
386+
Gets progress results from the task.
387+
388+
:param task_id: task's id.
389+
:return: task's TaskProgress instance.
390+
"""
391+
result_value = await self.redis.get( # type: ignore[attr-defined]
392+
name=task_id + PROGRESS_KEY_SUFFIX,
393+
)
394+
395+
if result_value is None:
396+
return None
397+
398+
return model_validate(
399+
TaskProgress[_ReturnType],
400+
self.serializer.loadb(result_value),
401+
)
402+
304403

305404
class RedisAsyncSentinelResultBackend(AsyncResultBackend[_ReturnType]):
306405
"""Async result based on redis sentinel."""
@@ -439,6 +538,55 @@ async def get_result(
439538

440539
return taskiq_result
441540

541+
async def set_progress(
542+
self,
543+
task_id: str,
544+
progress: TaskProgress[_ReturnType],
545+
) -> None:
546+
"""
547+
Sets task progress in redis.
548+
549+
Dumps TaskProgress instance into the bytes and writes
550+
it to redis with a standard suffix on the task_id as the key
551+
552+
:param task_id: ID of the task.
553+
:param result: task's TaskProgress instance.
554+
"""
555+
redis_set_params: Dict[str, Union[str, int, bytes]] = {
556+
"name": task_id + PROGRESS_KEY_SUFFIX,
557+
"value": self.serializer.dumpb(model_dump(progress)),
558+
}
559+
if self.result_ex_time:
560+
redis_set_params["ex"] = self.result_ex_time
561+
elif self.result_px_time:
562+
redis_set_params["px"] = self.result_px_time
563+
564+
async with self._acquire_master_conn() as redis:
565+
await redis.set(**redis_set_params) # type: ignore
566+
567+
async def get_progress(
568+
self,
569+
task_id: str,
570+
) -> Union[TaskProgress[_ReturnType], None]:
571+
"""
572+
Gets progress results from the task.
573+
574+
:param task_id: task's id.
575+
:return: task's TaskProgress instance.
576+
"""
577+
async with self._acquire_master_conn() as redis:
578+
result_value = await redis.get(
579+
name=task_id + PROGRESS_KEY_SUFFIX,
580+
)
581+
582+
if result_value is None:
583+
return None
584+
585+
return model_validate(
586+
TaskProgress[_ReturnType],
587+
self.serializer.loadb(result_value),
588+
)
589+
442590
async def shutdown(self) -> None:
443591
"""Shutdown sentinel connections."""
444592
for sentinel in self.sentinel.sentinels:

0 commit comments

Comments
 (0)