Skip to content

Commit 58f03c6

Browse files
pedro-psbmdellweg
andcommitted
Refactor dispatch and execute_task and add async version
We should be able to directly dispatch from async context using async-ready functions. Refactoring was done to make it easier to maintain both sync/async version of the respective functions. Co-authored-by: Matthias Dellweg <[email protected]>
1 parent 42d039d commit 58f03c6

File tree

7 files changed

+345
-124
lines changed

7 files changed

+345
-124
lines changed

pulpcore/app/models/repository.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,21 @@ def pull_through_add_content(self, content_artifact):
381381
body = {"repository_pk": self.pk, "add_content_units": [cpk], "remove_content_units": []}
382382
return dispatch(add_and_remove, kwargs=body, exclusive_resources=[self], immediate=True)
383383

384+
async def async_pull_through_add_content(self, content_artifact):
385+
cpk = content_artifact.content_id
386+
already_present = RepositoryContent.objects.filter(
387+
content__pk=cpk, repository=self, version_removed__isnull=True
388+
)
389+
if not cpk or await already_present.aexists():
390+
return None
391+
392+
from pulpcore.plugin.tasking import adispatch, aadd_and_remove
393+
394+
body = {"repository_pk": self.pk, "add_content_units": [cpk], "remove_content_units": []}
395+
return await adispatch(
396+
aadd_and_remove, kwargs=body, exclusive_resources=[self], immediate=True
397+
)
398+
384399
@hook(AFTER_UPDATE, when="retain_repo_versions", has_changed=True)
385400
def _cleanup_old_versions_hook(self):
386401
# Do not attempt to clean up anything, while there is a transaction involving repo versions

pulpcore/app/models/task.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -341,8 +341,20 @@ def unblock(self):
341341
self.unblocked_at = unblocked_at
342342
else:
343343
self.refresh_from_db()
344-
raise RuntimeError(
345-
_("Falied to set task {} unblocked in state '{}'.").format(self.pk, self.state)
344+
raise RuntimeError("Failed to set task {} unblocked in state '{}'.").format(
345+
self.pk, self.state
346+
)
347+
348+
async def aunblock(self):
349+
# This should be safe to be called without holding the lock.
350+
unblocked_at = timezone.now()
351+
rows = await Task.objects.filter(pk=self.pk).aupdate(unblocked_at=unblocked_at)
352+
if rows == 1:
353+
self.unblocked_at = unblocked_at
354+
else:
355+
await self.arefresh_from_db()
356+
raise RuntimeError("Failed to set task {} unblocked in state '{}'.").format(
357+
self.pk, self.state
346358
)
347359

348360
class Meta:

pulpcore/app/tasks/repository.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,3 +236,30 @@ def add_and_remove(repository_pk, add_content_units, remove_content_units, base_
236236
with repository.new_version(base_version=base_version) as new_version:
237237
new_version.remove_content(models.Content.objects.filter(pk__in=remove_content_units))
238238
new_version.add_content(models.Content.objects.filter(pk__in=add_content_units))
239+
240+
241+
async def aadd_and_remove(
242+
repository_pk, add_content_units, remove_content_units, base_version_pk=None
243+
):
244+
"""Aynsc version of add_and_remove."""
245+
repository = await models.Repository.objects.aget(pk=repository_pk)
246+
repository = await repository.acast()
247+
248+
if base_version_pk:
249+
base_version = await models.RepositoryVersion.objects.aget(pk=base_version_pk)
250+
else:
251+
base_version = None
252+
253+
if "*" in remove_content_units:
254+
latest = await repository.alatest_version()
255+
if latest:
256+
remove_content_units = latest.content.values_list("pk", flat=True)
257+
else:
258+
remove_content_units = []
259+
260+
def add_to_repository():
261+
with repository.new_version(base_version=base_version) as new_version:
262+
new_version.remove_content(models.Content.objects.filter(pk__in=remove_content_units))
263+
new_version.add_content(models.Content.objects.filter(pk__in=add_content_units))
264+
265+
await sync_to_async(add_to_repository)()

pulpcore/plugin/tasking.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Support plugins dispatching tasks
2-
from pulpcore.tasking.tasks import dispatch
2+
from pulpcore.tasking.tasks import dispatch, adispatch
33

44
from pulpcore.app.tasks import (
55
ageneral_update,
@@ -13,13 +13,14 @@
1313
reclaim_space,
1414
)
1515
from pulpcore.app.tasks.vulnerability_report import check_content
16-
from pulpcore.app.tasks.repository import add_and_remove
16+
from pulpcore.app.tasks.repository import add_and_remove, aadd_and_remove
1717

1818

1919
__all__ = [
2020
"ageneral_update",
2121
"check_content",
2222
"dispatch",
23+
"adispatch",
2324
"fs_publication_export",
2425
"fs_repo_version_export",
2526
"general_create",
@@ -29,4 +30,5 @@
2930
"orphan_cleanup",
3031
"reclaim_space",
3132
"add_and_remove",
33+
"aadd_and_remove",
3234
]

0 commit comments

Comments
 (0)