Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions hyperactor_mesh/src/proc_mesh/mesh_agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use enum_as_inner::EnumAsInner;
use hyperactor::Actor;
use hyperactor::ActorHandle;
use hyperactor::ActorId;
use hyperactor::Bind;
use hyperactor::Context;
use hyperactor::Data;
use hyperactor::HandleClient;
Expand All @@ -31,6 +32,7 @@ use hyperactor::PortHandle;
use hyperactor::PortRef;
use hyperactor::ProcId;
use hyperactor::RefClient;
use hyperactor::Unbind;
use hyperactor::actor::ActorStatus;
use hyperactor::actor::remote::Remote;
use hyperactor::channel;
Expand Down Expand Up @@ -167,7 +169,7 @@ impl State {
handlers=[
MeshAgentMessage,
resource::CreateOrUpdate<ActorSpec>,
resource::GetState<ActorState>
resource::GetState<ActorState> { cast = true },
]
)]
pub struct ProcMeshAgent {
Expand Down Expand Up @@ -425,7 +427,7 @@ pub struct ActorSpec {
}

/// Actor state.
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
pub struct ActorState {
/// The actor's ID.
pub actor_id: ActorId,
Expand Down
37 changes: 37 additions & 0 deletions hyperactor_mesh/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ use hyperactor::Handler;
use hyperactor::Named;
use hyperactor::PortRef;
use hyperactor::RefClient;
use hyperactor::RemoteMessage;
use hyperactor::message::Bind;
use hyperactor::message::Bindings;
use hyperactor::message::Unbind;
use serde::Deserialize;
use serde::Serialize;

Expand Down Expand Up @@ -70,3 +74,36 @@ pub struct GetState<S> {
#[reply]
pub reply: PortRef<State<S>>,
}

// Cannot derive Bind and Unbind for this generic, implement manually.
impl<S> Unbind for GetState<S>
where
S: RemoteMessage,
S: Unbind,
{
fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
self.reply.unbind(bindings)
}
}

impl<S> Bind for GetState<S>
where
S: RemoteMessage,
S: Bind,
{
fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
self.reply.bind(bindings)
}
}

impl<S> Clone for GetState<S>
where
S: RemoteMessage,
{
fn clone(&self) -> Self {
Self {
name: self.name.clone(),
reply: self.reply.clone(),
}
}
}
67 changes: 50 additions & 17 deletions hyperactor_mesh/src/v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,27 +167,51 @@ pub type Result<T> = std::result::Result<T, Error>;
Serialize,
Deserialize
)]
pub struct Name(pub String, pub ShortUuid);
pub enum Name {
/// Normal names for most actors.
Suffixed(String, ShortUuid),
/// Reserved names for system actors without UUIDs.
Reserved(String),
}

impl Name {
/// Create a new `Name` from a user-provided base name.
pub fn new(name: impl Into<String>) -> Self {
Self::new_with_uuid(name, Some(ShortUuid::generate()))
}

/// Create a Reserved `Name` with no uuid. Only for use by system actors.
pub(crate) fn new_reserved(name: impl Into<String>) -> Self {
Self::new_with_uuid(name, None)
}

fn new_with_uuid(name: impl Into<String>, uuid: Option<ShortUuid>) -> Self {
let mut name = name.into();
if name.is_empty() {
name = "unnamed".to_string();
}
let uuid = ShortUuid::generate();
Self(name, uuid)
if let Some(uuid) = uuid {
Self::Suffixed(name, uuid)
} else {
Self::Reserved(name)
}
}

/// The name portion of this `Name`.
pub fn name(&self) -> &str {
&self.0
match self {
Self::Suffixed(n, _) => n,
Self::Reserved(n) => n,
}
}

/// The UUID portion of this `Name`.
/// Only valid for Name::Suffixed, if called on Name::Reserved it'll panic.
pub fn uuid(&self) -> &ShortUuid {
&self.1
match self {
Self::Suffixed(_, uuid) => uuid,
Self::Reserved(_) => panic!("Reserved name has no UUID"),
}
}
}

Expand All @@ -211,24 +235,33 @@ impl FromStr for Name {
type Err = NameParseError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let (name, uuid) = s.split_once('-').ok_or(NameParseError::MissingSeparator)?;
if name.is_empty() {
return Err(NameParseError::MissingName);
if let Some((name, uuid)) = s.split_once('-') {
if name.is_empty() {
return Err(NameParseError::MissingName);
}
if uuid.is_empty() {
return Err(NameParseError::MissingName);
}

Ok(Name::new_with_uuid(name.to_string(), Some(uuid.parse()?)))
} else {
if s.is_empty() {
return Err(NameParseError::MissingName);
}
Ok(Name::new_reserved(s))
}
if uuid.is_empty() {
return Err(NameParseError::MissingName);
}

let name = name.to_string();
let uuid = uuid.parse()?;
Ok(Name(name, uuid))
}
}

impl std::fmt::Display for Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}-", self.name())?;
self.uuid().format(f, true /*raw*/)
match self {
Self::Suffixed(n, uuid) => {
write!(f, "{}-", n)?;
uuid.format(f, true /*raw*/)
}
Self::Reserved(n) => write!(f, "{}", n),
}
}
}

Expand Down
13 changes: 5 additions & 8 deletions hyperactor_mesh/src/v1/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,10 @@ impl<A: Actor + RemoteActor> ActorMeshRef<A> {
pub async fn supervision_events(
&self,
cx: &impl context::Actor,
name: Name,
) -> v1::Result<ValueMesh<Vec<ActorSupervisionEvent>>> {
self.proc_mesh.supervision_events(cx, name).await
self.proc_mesh
.supervision_events(cx, self.name.clone())
.await
}
}

Expand Down Expand Up @@ -455,7 +456,7 @@ mod tests {
}

#[async_timed_test(timeout_secs = 30)]
async fn test_status() {
async fn test_supervision_events() {
hyperactor_telemetry::initialize_logging_for_test();

let instance = testing::instance().await;
Expand Down Expand Up @@ -489,12 +490,8 @@ mod tests {
// Now that all ranks have completed, set up a continuous poll of the
// status such that when a process switches to unhealthy it sets a
// supervision event.
let child_name_clone = child_name.clone();
let supervision_task = tokio::spawn(async move {
match actor_mesh
.supervision_events(instance, child_name_clone)
.await
{
match actor_mesh.supervision_events(&instance).await {
Ok(events) => {
for event_list in events.values() {
assert!(!event_list.is_empty());
Expand Down
43 changes: 36 additions & 7 deletions hyperactor_mesh/src/v1/proc_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use hyperactor::supervision::ActorSupervisionEvent;
use ndslice::Extent;
use ndslice::ViewExt as _;
use ndslice::view;
use ndslice::view::CollectMeshExt;
use ndslice::view::MapIntoExt;
use ndslice::view::Ranked;
use ndslice::view::Region;
Expand All @@ -48,6 +49,7 @@ use crate::proc_mesh::mesh_agent::ProcMeshAgent;
use crate::resource;
use crate::v1;
use crate::v1::ActorMesh;
use crate::v1::ActorMeshRef;
use crate::v1::Error;
use crate::v1::HostMeshRef;
use crate::v1::Name;
Expand Down Expand Up @@ -93,6 +95,7 @@ impl ProcRef {
}

/// Get the supervision events for one actor with the given name.
#[allow(dead_code)]
async fn supervision_events(
&self,
cx: &impl context::Actor,
Expand Down Expand Up @@ -453,18 +456,45 @@ impl ProcMeshRef {
vm.join().await.transpose()
}

fn agent_mesh(&self) -> ActorMeshRef<ProcMeshAgent> {
let agent_name = self.ranks.first().unwrap().agent.actor_id().name();
// This name must match the ProcMeshAgent name, which can change depending on the allocator.
ActorMeshRef::new(Name::new_reserved(agent_name), self.clone())
}

/// The supervision events of procs in this mesh.
pub async fn supervision_events(
&self,
cx: &impl context::Actor,
name: Name,
) -> v1::Result<ValueMesh<Vec<ActorSupervisionEvent>>> {
let vm: ValueMesh<_> = self.map_into(|proc_ref| {
let proc_ref = proc_ref.clone();
let name = name.clone();
async move { proc_ref.supervision_events(cx, name).await }
});
vm.join().await.transpose()
let agent_mesh = self.agent_mesh();
let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
agent_mesh.cast(
cx,
resource::GetState::<ActorState> {
name: name.clone(),
reply: port.bind(),
},
)?;
let expected = self.ranks.len();
let mut states = Vec::with_capacity(expected);
for _ in 0..expected {
let state = rx.recv().await?;
states.push(state);
}
let vm = states
.into_iter()
.map(|state| {
if let Some(state) = state.state {
state.supervision_events
} else {
// Empty vec for ranks with no supervision events.
Vec::new()
}
})
.collect_mesh::<ValueMesh<Vec<_>>>(self.region.clone())?;
Ok(vm)
}

/// Spawn an actor on all of the procs in this mesh, returning a new ActorMesh.
Expand Down Expand Up @@ -600,7 +630,6 @@ mod tests {
use timed_test::async_timed_test;

use crate::v1::ActorMesh;
use crate::v1::ActorMeshRef;
use crate::v1::testactor;
use crate::v1::testing;

Expand Down