Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,12 @@ pub enum ActorStatus {
impl ActorStatus {
/// Tells whether the status is a terminal state.
pub fn is_terminal(&self) -> bool {
matches!(self, Self::Stopped | Self::Failed(_))
self.is_stopped() || self.is_failed()
}

/// Tells whether the status is a Stopped state.
pub fn is_stopped(&self) -> bool {
matches!(self, Self::Stopped)
}

/// Tells whether the status represents a failure.
Expand Down
65 changes: 46 additions & 19 deletions hyperactor/src/proc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,14 @@ impl<A: Actor> Instance<A> {
self.status_tx.send_replace(new.clone());
}

fn is_terminal(&self) -> bool {
self.status_tx.borrow().is_terminal()
}

fn is_stopped(&self) -> bool {
self.status_tx.borrow().is_stopped()
}

/// This instance's actor ID.
pub fn self_id(&self) -> &ActorId {
self.mailbox.actor_id()
Expand Down Expand Up @@ -1099,21 +1107,35 @@ impl<A: Actor> Instance<A> {
.run_actor_tree(&mut actor, actor_loop_receivers, &mut work_rx)
.await;

let (actor_status, event) = match result {
Ok(_) => (ActorStatus::Stopped, None),
Err(ActorError {
kind: box ActorErrorKind::UnhandledSupervisionEvent(event),
..
}) => (event.actor_status.clone(), Some(event)),
Err(err) => (
ActorStatus::Failed(err.to_string()),
Some(ActorSupervisionEvent::new(
self.cell.actor_id().clone(),
ActorStatus::Failed(err.to_string()),
None,
None,
)),
),
let event = match result {
Ok(_) => {
// actor should have been stopped by run_actor_tree
assert!(self.is_stopped());
None
}
Err(err) => {
assert!(!self.is_terminal());
match *err.kind {
ActorErrorKind::UnhandledSupervisionEvent(event) => {
// Currently only terminated actors are allowed to raise supervision events.
// If we want to change that in the future, we need to modify the exit
// status here too, because we use event's actor_status as this actor's
// terminal status.
assert!(event.actor_status.is_terminal());
self.change_status(event.actor_status.clone());
Some(event)
}
_ => {
self.change_status(ActorStatus::Failed(err.to_string()));
Some(ActorSupervisionEvent::new(
self.cell.actor_id().clone(),
ActorStatus::Failed(err.to_string()),
None,
None,
))
}
}
}
};

if let Some(parent) = self.cell.maybe_unlink_parent() {
Expand Down Expand Up @@ -1141,7 +1163,6 @@ impl<A: Actor> Instance<A> {
self.proc.handle_supervision_event(event);
}
}
self.change_status(actor_status);
}

/// Runs the actor, and manages its supervision tree. When the function returns,
Expand Down Expand Up @@ -1179,10 +1200,16 @@ impl<A: Actor> Instance<A> {
}
};

if let Err(ref err) = result {
tracing::error!("{}: actor failure: {}", self.self_id(), err);
match &result {
Ok(_) => assert!(self.is_stopped()),
Err(err) => {
tracing::error!("{}: actor failure: {}", self.self_id(), err);
assert!(!self.is_terminal());
// Send Stopping instead of Failed, because we still need to
// unlink child actors.
self.change_status(ActorStatus::Stopping);
}
}
self.change_status(ActorStatus::Stopping);

// After this point, we know we won't spawn any more children,
// so we can safely read the current child keys.
Expand Down
Loading