Skip to content

Commit 9b5532f

Browse files
dulinrileyfacebook-github-bot
authored andcommitted
Add casting support to resource::GetState messages
Summary: Part of: #1209 Use casting to implement the `supervision_events` API instead of iterating over all ProcMeshAgents. This will scale better as the size of the ProcMesh increases. Unfortunately, the Actor name for the mesh agent is not compatible with the v1::Name struct due to the missing uuid. Make `v1::Name` an enum to allow reserved names to be used for things like ActorMeshes. Differential Revision: D82687236
1 parent 798942e commit 9b5532f

File tree

5 files changed

+117
-32
lines changed

5 files changed

+117
-32
lines changed

hyperactor_mesh/src/proc_mesh/mesh_agent.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use enum_as_inner::EnumAsInner;
2020
use hyperactor::Actor;
2121
use hyperactor::ActorHandle;
2222
use hyperactor::ActorId;
23+
use hyperactor::Bind;
2324
use hyperactor::Context;
2425
use hyperactor::Data;
2526
use hyperactor::HandleClient;
@@ -31,6 +32,7 @@ use hyperactor::PortHandle;
3132
use hyperactor::PortRef;
3233
use hyperactor::ProcId;
3334
use hyperactor::RefClient;
35+
use hyperactor::Unbind;
3436
use hyperactor::actor::ActorStatus;
3537
use hyperactor::actor::remote::Remote;
3638
use hyperactor::channel;
@@ -167,7 +169,7 @@ impl State {
167169
handlers=[
168170
MeshAgentMessage,
169171
resource::CreateOrUpdate<ActorSpec>,
170-
resource::GetState<ActorState>
172+
resource::GetState<ActorState> { cast = true },
171173
]
172174
)]
173175
pub struct ProcMeshAgent {
@@ -406,7 +408,7 @@ pub struct ActorSpec {
406408
}
407409

408410
/// Actor state.
409-
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named)]
411+
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Named, Bind, Unbind)]
410412
pub struct ActorState {
411413
/// The actor's ID.
412414
pub actor_id: ActorId,

hyperactor_mesh/src/resource.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ use hyperactor::Handler;
1616
use hyperactor::Named;
1717
use hyperactor::PortRef;
1818
use hyperactor::RefClient;
19+
use hyperactor::RemoteMessage;
20+
use hyperactor::message::Bind;
21+
use hyperactor::message::Bindings;
22+
use hyperactor::message::Unbind;
1923
use serde::Deserialize;
2024
use serde::Serialize;
2125

@@ -70,3 +74,24 @@ pub struct GetState<S> {
7074
#[reply]
7175
pub reply: PortRef<State<S>>,
7276
}
77+
78+
// Cannot derive Bind and Unbind for this generic, implement manually.
79+
impl<T> Unbind for GetState<T>
80+
where
81+
T: RemoteMessage,
82+
T: Unbind,
83+
{
84+
fn unbind(&self, bindings: &mut Bindings) -> anyhow::Result<()> {
85+
self.reply.unbind(bindings)
86+
}
87+
}
88+
89+
impl<T> Bind for GetState<T>
90+
where
91+
T: RemoteMessage,
92+
T: Bind,
93+
{
94+
fn bind(&mut self, bindings: &mut Bindings) -> anyhow::Result<()> {
95+
self.reply.bind(bindings)
96+
}
97+
}

hyperactor_mesh/src/v1.rs

Lines changed: 50 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -164,27 +164,51 @@ pub type Result<T> = std::result::Result<T, Error>;
164164
Serialize,
165165
Deserialize
166166
)]
167-
pub struct Name(pub String, pub ShortUuid);
167+
pub enum Name {
168+
/// Normal names for most actors.
169+
Suffixed(String, ShortUuid),
170+
/// Reserved names for system actors without UUIDs.
171+
Reserved(String),
172+
}
168173

169174
impl Name {
170175
/// Create a new `Name` from a user-provided base name.
171176
pub fn new(name: impl Into<String>) -> Self {
177+
Self::new_with_uuid(name, Some(ShortUuid::generate()))
178+
}
179+
180+
/// Create a Reserved `Name` with no uuid. Only for use by system actors.
181+
pub(crate) fn new_reserved(name: impl Into<String>) -> Self {
182+
Self::new_with_uuid(name, None)
183+
}
184+
185+
fn new_with_uuid(name: impl Into<String>, uuid: Option<ShortUuid>) -> Self {
172186
let mut name = name.into();
173187
if name.is_empty() {
174188
name = "unnamed".to_string();
175189
}
176-
let uuid = ShortUuid::generate();
177-
Self(name, uuid)
190+
if let Some(uuid) = uuid {
191+
Self::Suffixed(name, uuid)
192+
} else {
193+
Self::Reserved(name)
194+
}
178195
}
179196

180197
/// The name portion of this `Name`.
181198
pub fn name(&self) -> &str {
182-
&self.0
199+
match self {
200+
Self::Suffixed(n, _) => n,
201+
Self::Reserved(n) => n,
202+
}
183203
}
184204

185205
/// The UUID portion of this `Name`.
206+
/// Only valid for Name::Suffixed, if called on Name::Reserved it'll panic.
186207
pub fn uuid(&self) -> &ShortUuid {
187-
&self.1
208+
match self {
209+
Self::Suffixed(_, uuid) => uuid,
210+
Self::Reserved(_) => panic!("Reserved name has no UUID"),
211+
}
188212
}
189213
}
190214

@@ -208,24 +232,33 @@ impl FromStr for Name {
208232
type Err = NameParseError;
209233

210234
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
211-
let (name, uuid) = s.split_once('-').ok_or(NameParseError::MissingSeparator)?;
212-
if name.is_empty() {
213-
return Err(NameParseError::MissingName);
235+
if let Some((name, uuid)) = s.split_once('-') {
236+
if name.is_empty() {
237+
return Err(NameParseError::MissingName);
238+
}
239+
if uuid.is_empty() {
240+
return Err(NameParseError::MissingName);
241+
}
242+
243+
Ok(Name::new_with_uuid(name.to_string(), Some(uuid.parse()?)))
244+
} else {
245+
if s.is_empty() {
246+
return Err(NameParseError::MissingName);
247+
}
248+
Ok(Name::new_reserved(s))
214249
}
215-
if uuid.is_empty() {
216-
return Err(NameParseError::MissingName);
217-
}
218-
219-
let name = name.to_string();
220-
let uuid = uuid.parse()?;
221-
Ok(Name(name, uuid))
222250
}
223251
}
224252

225253
impl std::fmt::Display for Name {
226254
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
227-
write!(f, "{}-", self.name())?;
228-
self.uuid().format(f, true /*raw*/)
255+
match self {
256+
Self::Suffixed(n, uuid) => {
257+
write!(f, "{}-", n)?;
258+
uuid.format(f, true /*raw*/)
259+
}
260+
Self::Reserved(n) => write!(f, "{}", n),
261+
}
229262
}
230263
}
231264

hyperactor_mesh/src/v1/actor_mesh.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -159,9 +159,10 @@ impl<A: Actor + RemoteActor> ActorMeshRef<A> {
159159
pub async fn supervision_events(
160160
&self,
161161
cx: &impl context::Actor,
162-
name: Name,
163162
) -> v1::Result<ValueMesh<Vec<ActorSupervisionEvent>>> {
164-
self.proc_mesh.supervision_events(cx, name).await
163+
self.proc_mesh
164+
.supervision_events(cx, self.name.clone())
165+
.await
165166
}
166167
}
167168

@@ -452,12 +453,8 @@ mod tests {
452453
// status such that when a process switches to unhealthy it sets a
453454
// supervision event.
454455
let actor_mesh_ref = actor_mesh.freeze();
455-
let child_name_clone = child_name.clone();
456456
let supervision_task = tokio::spawn(async move {
457-
match actor_mesh_ref
458-
.supervision_events(&instance, child_name_clone)
459-
.await
460-
{
457+
match actor_mesh_ref.supervision_events(&instance).await {
461458
Ok(events) => {
462459
for event_list in events.values() {
463460
assert!(!event_list.is_empty());

hyperactor_mesh/src/v1/proc_mesh.rs

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -357,18 +357,46 @@ impl ProcMeshRef {
357357
vm.join().await.transpose()
358358
}
359359

360+
fn agent_mesh(&self) -> ActorMeshRef<ProcMeshAgent> {
361+
let actor_refs = self.values().map(|p| p.agent).collect::<Vec<_>>();
362+
let agent_name = actor_refs.first().unwrap().actor_id().name();
363+
// This name must match the ProcMeshAgent name, which can change depending on the allocator.
364+
ActorMeshRef::new(Name::new_reserved(agent_name), self.clone(), actor_refs)
365+
}
366+
360367
/// The supervision events of procs in this mesh.
361368
pub async fn supervision_events(
362369
&self,
363370
cx: &impl context::Actor,
364371
name: Name,
365372
) -> v1::Result<ValueMesh<Vec<ActorSupervisionEvent>>> {
366-
let vm: ValueMesh<_> = self.map_into(|proc_ref| {
367-
let proc_ref = proc_ref.clone();
368-
let name = name.clone();
369-
async move { proc_ref.supervision_events(cx, name).await }
370-
});
371-
vm.join().await.transpose()
373+
let agent_mesh = self.agent_mesh();
374+
let (port, mut rx) = cx.mailbox().open_port::<resource::State<ActorState>>();
375+
agent_mesh.cast(
376+
cx,
377+
resource::GetState::<ActorState> {
378+
name: name.clone(),
379+
reply: port.bind(),
380+
},
381+
)?;
382+
let expected = self.ranks.len();
383+
let mut states = Vec::with_capacity(expected);
384+
for _ in 0..expected {
385+
let state = rx.recv().await?;
386+
states.push(state);
387+
}
388+
let vm = states
389+
.into_iter()
390+
.map(|state| {
391+
if let Some(state) = state.state {
392+
state.supervision_events
393+
} else {
394+
// Empty vec for ranks with no supervision events.
395+
Vec::new()
396+
}
397+
})
398+
.collect_mesh::<ValueMesh<Vec<_>>>(self.region.clone())?;
399+
Ok(vm)
372400
}
373401

374402
/// Spawn an actor on all of the procs in this mesh, returning a new ActorMesh.

0 commit comments

Comments
 (0)