Skip to content

Conversation

@bjsowa
Copy link

@bjsowa bjsowa commented Jun 7, 2025

Description

Fixes #1467
Fixes #1405
Fixes #1462

Is this user-facing behavior change?

  • The _pending() method in Future has been renamed to pending()
  • The user no longer has to do any workarounds with waking up the executor when a coroutine gets stuck awaiting a future that is already finished.

Did you use Generative AI?

Some of the comments and docstrings might be partially written by Github Copilot.

Additional Information

For #1467, I propagate a guard condition from a task to the awaited future and add this guard condition to the wait set in the executor. When a future is completed by an entity not controller by the executor (e.g. a separate thread), the guard condition is triggered and wakes up the executor so it can resume the task.

For #1405, I just had to make sure the task is not yielded from _wait_for_ready_callbacks when it is not ready to be resumed, hence the Task.ready() method.

EDIT: Check this comment

@bjsowa bjsowa force-pushed the fix/async-task-resume branch from 47a7eac to eeea7b0 Compare June 7, 2025 01:17
@bjsowa bjsowa changed the title Fix/async task resume Fix issues with resuming async tasks awaiting a future Jun 7, 2025
@bjsowa bjsowa force-pushed the fix/async-task-resume branch from eeea7b0 to cb2e8bb Compare June 7, 2025 13:34
@sloretz sloretz self-requested a review June 12, 2025 17:30
@bjsowa bjsowa force-pushed the fix/async-task-resume branch from e5486c8 to c417824 Compare July 9, 2025 11:11
@bjsowa bjsowa force-pushed the fix/async-task-resume branch from c417824 to 458a73c Compare July 9, 2025 11:13
Co-authored-by: Shane Loretz <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
@bjsowa
Copy link
Author

bjsowa commented Jul 9, 2025

After looking at #971, I revised my solution to not modify the API of Task or Future.

Changes:

  • Added a call_soon method which schedules the task to be run in the next spin and wakes up the executor.
  • The tasks schedule themselves to be run in the next spin, using the new call_soon method, if they are ready to be resumed.
  • executor._tasks list now contains only tasks that are ready to run or resumed, not all tasks.

Improvements:

Regressions:

  • When a coroutine task awaits other task, when the other task finishes, the coroutine task needs 2 spins to be resumed as the first spin will only execute a callback which schedules the coroutine task to be run in next spin. This is reflected in the test_create_task_dependent_coroutines test.
  • There are 2 tests now that result in a Segmentation fault when run with the EventsExecutor. If this solution is accepted I might look into fixing it, any help would be appreciated.

@bjsowa bjsowa requested a review from nadavelkabets July 9, 2025 11:56
@bjsowa
Copy link
Author

bjsowa commented Jul 15, 2025

@nadavelkabets Could you take a look?

@nadavelkabets
Copy link
Contributor

nadavelkabets commented Jul 15, 2025

@nadavelkabets Could you take a look?

I'll try to get to it this weekend

@bjsowa bjsowa force-pushed the fix/async-task-resume branch from fbda79c to dfa0904 Compare September 13, 2025 22:40
@bjsowa
Copy link
Author

bjsowa commented Sep 13, 2025

I fixed the issue you were experiencing and tests are passing for me. You should merge the changes to your branch bjsowa#1.

I'm really happy with this PR. I'll look at the code once again after you merge the fixes for final approval. To merge this, we also need the approval of a project maintainer. I think it's best to present the changes in the working group meeting this Friday. Meeting link is https://calendar.app.google/2LNP3u8tdffvyCPXA. Do you want to present the changes or should I?

I merged your changes. I'll try to attend the meeting if I'll have nothing important to do. I would prefer you present the changes though.

btw. the google calendar link does not seem to work

@mjcarroll
Copy link
Member

Thank you @nadavelkabets for doing a huge amount of work here with the review and thanks to @bjsowa for iterating. I'm catching up on the context of the conversation, but it appears to be in good shape, has this run through CI yet?

@nadavelkabets
Copy link
Contributor

I'm catching up on the context of the conversation, but it appears to be in good shape, has this run through CI yet?

Hey @mjcarroll, thanks for the kind words, it’s been a real pleasure to work with @bjsowa on this.
Would be great to run CI.

@mjcarroll
Copy link
Member

Pulls: #1469
Gist: https://gist.githubusercontent.com/mjcarroll/a8ebf9a9e1304615ce9ac8681376c7fb/raw/ac292e0fb5a520b80c95e61757fff89b782ebfc0/ros2.repos
BUILD args:
TEST args:
ROS Distro: rolling
Job: ci_launcher
ci_launcher ran: https://ci.ros2.org/job/ci_launcher/16957

  • Linux Build Status
  • Linux-aarch64 Build Status
  • Linux-rhel Build Status
  • Windows Build Status

:param task: A task to be run in the executor.
"""
with self._tasks_lock:
self._tasks.append((task, None, None))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I now realize that we still have some topics to discuss.
We are changing behavior here - previously, the executor always yielded node and entity, even for blocked tasks. Now, any task that is resumed using _call_task_in_next_spin will yield None for entity and node from the second time onwards.
SingleThreadedExecutor and MultiThreadedExecutor ignore these yielded values, but the node argument is used to filter tasks. I believe that unlike before, the new code will keep running tasks created by a removed node for example.

            for task_trio in tasks:
                task, entity, node = task_trio
                if node is None or node in nodes_to_use:

We should discuss this behavior change further before proceeding.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you see any way this can be fixed?

Copy link
Contributor

@nadavelkabets nadavelkabets Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but that depends on the intended behavior.
If we are okay with the "set and forget" approach, no change is required.
Otherwise, for each node in the executor we can hold a set of running tasks.
When a node is removed from the executor, all tasks related to it should be cancelled.
It's also possible to initialize a task with the node that it originated from as a property instead of this task_trio.

Copy link
Author

@bjsowa bjsowa Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that holding tuples of (task, entity, node) does not really make sense now as the entity and node information is never added after I removed it from _make_handler. Maybe we should find some other way to track which node created the task?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nadavelkabets any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that holding tuples of (task, entity, node) does not really make sense now as the entity and node information is never added after I removed it from _make_handler.

Exactly. I would like to discuss this topic with core maintainers to get their opinion.
I'm not sure if we should avoid changing this behavior or if this is a welcomed change.

else:
# Asked not to execute these tasks, so don't do them yet
with self._tasks_lock:
self._tasks.append(task_trio)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we maintain this behavior if the node is not in nodes_to_use?
We get this infinity loop that occurs in the following scenario

  • A task created by an entity callback is awaiting something
  • The node of the task is removed from the executor

In this case we will keep removing and adding the task forever.

Copy link
Author

@bjsowa bjsowa Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could change it to something like this:

# Yield tasks in-progress before waiting for new work
with self._tasks_lock:
    # Retrieve a list of tasks that can be executed now
    tasks = [
        (task, entity, node) for (task, entity, node) in self._tasks
        if node is None or node in nodes_to_use
    ]
    # Remove tasks that are going to be executed now from the executor
    # Tasks that need to be executed again will add themselves back to the executor
    self._tasks = [
        (task, entity, node) for (task, entity, node) in self._tasks
        if node is not None and node not in nodes_to_use
    ]

yield from tasks

Not much of an optimization but imo looks cleaner and we don't remove and add the same tasks.

@nadavelkabets
Copy link
Contributor

@mjcarroll @fujitatomoya @sloretz
These aren't dealbreakers but nonetheless I'd appreciate your feedback.

@nadavelkabets
Copy link
Contributor

nadavelkabets commented Sep 23, 2025

@mjcarroll @fujitatomoya @sloretz @wjwwood
Hey, would appreciate your feedback.
My intuition is that once an async callback is fired it should be executed to completion even if the node is removed from the executor.
Could that cause unexpected issues? For example, a service server callback might send a response and I'm not sure that's wanted if the node is removed.
Perhaps a node should cancel its own tasks if it's removed.

@shaur-k
Copy link

shaur-k commented Sep 30, 2025

is there a timeline to when this gets merged? can i help pick up some work here if needed to get this across the finish line?

@nadavelkabets
Copy link
Contributor

is there a timeline to when this gets merged? can i help pick up some work here if needed to get this across the finish line?

Hey, we have completed most of the work, and only have some minor details to iron out.
I await feedback from other maintainers, we should discuss this at the working group meeting.

@nadavelkabets
Copy link
Contributor

nadavelkabets commented Oct 13, 2025

@bjsowa
I came up with a solution, and opened a PR in your fork bjsowa#2.
Currently, we store the source entity and source node in the same tuple that holds the task itself.
Our refactor changed that, causing 2 problems:

  1. The executor no longer held reference to the task (that could lead to unexpected garbage collection)
  2. We no longer stored the entity and node data for tasks

My thought process:

  • The easiest solution is to add the entity and node as properties of the Task class. I chose to avoid this solution to keep our Task class as close as possible to asyncio, and because I believe that a task should have no interest in what entity or node it originated from.
  • We could add a node and entity arguments to the Task's __call__ function, passing them each iteration of the task, this way the task could reschedule itself with them in the tasks tuple. This seemed irrelevant because it both changes wait_for_ready_callbacks api and behavior, and also differs from the asyncio task implementation.
  • I then thought that the best course of action is to store the source entity and node information in a dictionary for each pending task. I initially added a done_callback to each task to remove itself from the dictionary after finishing execution, then to discover that tests are failing because they were calling spin_once the exact number of times expected, but the newly added done callback added a new, unexpected event.

The only way to solve this was to keep the same behavior as it is now, filtering the tasks list each iteration of the executor.
Currently this entity and node thing is not covered by unit tests. Perhaps we should add some tests for that.
I have to admit I'm not that fond of my solution as it retains much of the inefficiencies of the current executor, but it fixes our issue nonetheless.
What do you think?

@nadavelkabets
Copy link
Contributor

nadavelkabets commented Oct 13, 2025

Should we maintain this behavior if the node is not in nodes_to_use? We get this infinity loop that occurs in the following scenario

  • A task created by an entity callback is awaiting something
  • The node of the task is removed from the executor

In this case we will keep removing and adding the task forever.

I also tried to solve this issue that annoyed me, by cancelling all tasks related to a node when it is removed from the executor. I think this should be the default behavior, but currently the behavior of rclpy.spin_once adds and removes the node from the executor each call which means that my fix is changing it's behavior because async tasks will get cancelled. For now I left it out of my PR and I'm thinking of another solution, but that should not block this PR from proceeding. For now we maintain the current behavior.

@bjsowa
Copy link
Author

bjsowa commented Oct 13, 2025

Thanks @nadavelkabets . I'll try to take a look later today. Just a quick thought. Couldn't we check if the task is done after calling it's handler and remove it from _pending_tasks then, instead of waiting for next spin?

@nadavelkabets
Copy link
Contributor

Thanks @nadavelkabets . I'll try to take a look later today. Just a quick thought. Couldn't we check if the task is done after calling it's handler and remove it from _pending_tasks then, instead of waiting for next spin?

Good thought, I think this might work. I'll try to implement that.

@nadavelkabets
Copy link
Contributor

nadavelkabets commented Oct 13, 2025

Follow up:
To implement that, we have to edit Task's _complete_task class and add:

        executor = self._executor()
        del executor._pending_tasks[self]

Unfortunately, _pending_task is currently an internal implementation detail and not API, and it does not exist in EventsExecutor. We could add an empty function def there, but I don't like that.
The other alternative means wrapping each callback in the SingleThreadedExecutor before initializing the Task which I also don't like.

@bjsowa
Copy link
Author

bjsowa commented Oct 13, 2025

Follow up: To implement that, we have to edit Task's _complete_task class and add:

        executor = self._executor()
        del executor._pending_tasks[self]

Unfortunately, _pending_task is currently an internal implementation detail and not API, and it does not exist in EventsExecutor. We could add an empty function def there, but I don't like that. The other alternative means wrapping each callback in the SingleThreadedExecutor before initializing the Task which I also don't like.

I was thinking just adding a check after calling the task in SingleThreadedExecutor:

handler()
if handler.done():
    del self._pending_tasks[handler]

Of course with added locks for thread safety.

@nadavelkabets
Copy link
Contributor

I was thinking just adding a check after calling the task in SingleThreadedExecutor:

handler()

if handler.done():

    del self._pending_tasks[handler]

Of course with added locks for thread safety.

I had this thought it mind. The issue I faced - the handler is yielded to and called from the inherited SingleThreadedExecutor and MultiThreadedExecutor classes, each implementing its own _spin_once_impl. This change will break behavior for wait_for_ready_callbacks users. While it's probably an uncommon use case, it's public api.

@bjsowa
Copy link
Author

bjsowa commented Oct 13, 2025

Understood. I think I'm fine with checking the _pending_tasks on each spin. As long as we can finally fix the issues with async tasks.

@nadavelkabets
Copy link
Contributor

Understood. I think I'm fine with checking the _pending_tasks on each spin. As long as we can finally fix the issues with async tasks.

If you're fine with my solution, let's merge it to this branch and I'll go over this PR again.

nadavelkabets and others added 2 commits October 14, 2025 00:11
* Add TaskData to store source_entity and source_node

* Fix failing test

Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
@bjsowa bjsowa force-pushed the fix/async-task-resume branch from 036b728 to 5ce6fbd Compare October 13, 2025 22:11
@bjsowa
Copy link
Author

bjsowa commented Oct 13, 2025

Understood. I think I'm fine with checking the _pending_tasks on each spin. As long as we can finally fix the issues with async tasks.

If you're fine with my solution, let's merge it to this branch and I'll go over this PR again.

done

@bjsowa bjsowa requested a review from nadavelkabets October 18, 2025 17:23
@bjsowa
Copy link
Author

bjsowa commented Oct 27, 2025

@nadavelkabets Any way I can help to push this further?

@mjcarroll Could we run the CI again?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

4 participants