-
Couldn't load subscription status.
- Fork 257
Fix issues with resuming async tasks awaiting a future #1469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: rolling
Are you sure you want to change the base?
Conversation
47a7eac to
eeea7b0
Compare
eeea7b0 to
cb2e8bb
Compare
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
e5486c8 to
c417824
Compare
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
c417824 to
458a73c
Compare
Co-authored-by: Shane Loretz <[email protected]> Signed-off-by: Błażej Sowa <[email protected]>
|
After looking at #971, I revised my solution to not modify the API of Task or Future. Changes:
Improvements:
Regressions:
|
|
@nadavelkabets Could you take a look? |
I'll try to get to it this weekend |
fbda79c to
dfa0904
Compare
I merged your changes. I'll try to attend the meeting if I'll have nothing important to do. I would prefer you present the changes though. btw. the google calendar link does not seem to work |
|
Thank you @nadavelkabets for doing a huge amount of work here with the review and thanks to @bjsowa for iterating. I'm catching up on the context of the conversation, but it appears to be in good shape, has this run through CI yet? |
Hey @mjcarroll, thanks for the kind words, it’s been a real pleasure to work with @bjsowa on this. |
Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
|
Pulls: #1469 |
rclpy/rclpy/executors.py
Outdated
| :param task: A task to be run in the executor. | ||
| """ | ||
| with self._tasks_lock: | ||
| self._tasks.append((task, None, None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I now realize that we still have some topics to discuss.
We are changing behavior here - previously, the executor always yielded node and entity, even for blocked tasks. Now, any task that is resumed using _call_task_in_next_spin will yield None for entity and node from the second time onwards.
SingleThreadedExecutor and MultiThreadedExecutor ignore these yielded values, but the node argument is used to filter tasks. I believe that unlike before, the new code will keep running tasks created by a removed node for example.
for task_trio in tasks:
task, entity, node = task_trio
if node is None or node in nodes_to_use:
We should discuss this behavior change further before proceeding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you see any way this can be fixed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but that depends on the intended behavior.
If we are okay with the "set and forget" approach, no change is required.
Otherwise, for each node in the executor we can hold a set of running tasks.
When a node is removed from the executor, all tasks related to it should be cancelled.
It's also possible to initialize a task with the node that it originated from as a property instead of this task_trio.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that holding tuples of (task, entity, node) does not really make sense now as the entity and node information is never added after I removed it from _make_handler. Maybe we should find some other way to track which node created the task?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nadavelkabets any ideas?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized that holding tuples of
(task, entity, node)does not really make sense now as the entity and node information is never added after I removed it from_make_handler.
Exactly. I would like to discuss this topic with core maintainers to get their opinion.
I'm not sure if we should avoid changing this behavior or if this is a welcomed change.
rclpy/rclpy/executors.py
Outdated
| else: | ||
| # Asked not to execute these tasks, so don't do them yet | ||
| with self._tasks_lock: | ||
| self._tasks.append(task_trio) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we maintain this behavior if the node is not in nodes_to_use?
We get this infinity loop that occurs in the following scenario
- A task created by an entity callback is awaiting something
- The node of the task is removed from the executor
In this case we will keep removing and adding the task forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could change it to something like this:
# Yield tasks in-progress before waiting for new work
with self._tasks_lock:
# Retrieve a list of tasks that can be executed now
tasks = [
(task, entity, node) for (task, entity, node) in self._tasks
if node is None or node in nodes_to_use
]
# Remove tasks that are going to be executed now from the executor
# Tasks that need to be executed again will add themselves back to the executor
self._tasks = [
(task, entity, node) for (task, entity, node) in self._tasks
if node is not None and node not in nodes_to_use
]
yield from tasksNot much of an optimization but imo looks cleaner and we don't remove and add the same tasks.
|
@mjcarroll @fujitatomoya @sloretz |
|
@mjcarroll @fujitatomoya @sloretz @wjwwood |
|
is there a timeline to when this gets merged? can i help pick up some work here if needed to get this across the finish line? |
Hey, we have completed most of the work, and only have some minor details to iron out. |
|
@bjsowa
My thought process:
The only way to solve this was to keep the same behavior as it is now, filtering the tasks list each iteration of the executor. |
I also tried to solve this issue that annoyed me, by cancelling all tasks related to a node when it is removed from the executor. I think this should be the default behavior, but currently the behavior of rclpy.spin_once adds and removes the node from the executor each call which means that my fix is changing it's behavior because async tasks will get cancelled. For now I left it out of my PR and I'm thinking of another solution, but that should not block this PR from proceeding. For now we maintain the current behavior. |
|
Thanks @nadavelkabets . I'll try to take a look later today. Just a quick thought. Couldn't we check if the task is done after calling it's handler and remove it from |
Good thought, I think this might work. I'll try to implement that. |
|
Follow up: Unfortunately, _pending_task is currently an internal implementation detail and not API, and it does not exist in EventsExecutor. We could add an empty function def there, but I don't like that. |
I was thinking just adding a check after calling the task in SingleThreadedExecutor: handler()
if handler.done():
del self._pending_tasks[handler]Of course with added locks for thread safety. |
I had this thought it mind. The issue I faced - the handler is yielded to and called from the inherited SingleThreadedExecutor and MultiThreadedExecutor classes, each implementing its own _spin_once_impl. This change will break behavior for wait_for_ready_callbacks users. While it's probably an uncommon use case, it's public api. |
|
Understood. I think I'm fine with checking the |
If you're fine with my solution, let's merge it to this branch and I'll go over this PR again. |
* Add TaskData to store source_entity and source_node * Fix failing test Signed-off-by: Błażej Sowa <[email protected]>
Signed-off-by: Błażej Sowa <[email protected]>
036b728 to
5ce6fbd
Compare
done |
|
@nadavelkabets Any way I can help to push this further? @mjcarroll Could we run the CI again? |
Description
Fixes #1467
Fixes #1405
Fixes #1462
Is this user-facing behavior change?
The_pending()method in Future has been renamed topending()Did you use Generative AI?
Some of the comments and docstrings might be partially written by Github Copilot.
Additional Information
For #1467, I propagate a guard condition from a task to the awaited future and add this guard condition to the wait set in the executor. When a future is completed by an entity not controller by the executor (e.g. a separate thread), the guard condition is triggered and wakes up the executor so it can resume the task.For #1405, I just had to make sure the task is not yielded from_wait_for_ready_callbackswhen it is not ready to be resumed, hence theTask.ready()method.EDIT: Check this comment