Skip to content

Commit daa561d

Browse files
pzhan9facebook-github-bot
authored andcommitted
Remove USE_STANDIN_ACTOR_MESH and any dead code along with that (#1273)
Summary: We have been using multicast for ~1 month without any issues. It is time to remove this gate. Differential Revision: D82691879
1 parent fb77648 commit daa561d

File tree

8 files changed

+12
-278
lines changed

8 files changed

+12
-278
lines changed

monarch_hyperactor/src/mailbox.rs

Lines changed: 0 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use hyperactor::mailbox::monitored_return_handle;
3333
use hyperactor::message::Bind;
3434
use hyperactor::message::Bindings;
3535
use hyperactor::message::Unbind;
36-
use hyperactor_mesh::comm::multicast::set_cast_info_on_headers;
3736
use monarch_types::PickledPyObject;
3837
use monarch_types::py_global;
3938
use pyo3::IntoPyObjectExt;
@@ -52,7 +51,6 @@ use crate::proc::PyActorId;
5251
use crate::pytokio::PyPythonTask;
5352
use crate::pytokio::PythonTask;
5453
use crate::runtime::signal_safe_block_on;
55-
use crate::shape::PyShape;
5654

5755
#[derive(Clone, Debug)]
5856
#[pyclass(
@@ -139,37 +137,6 @@ impl PyMailbox {
139137
Ok(())
140138
}
141139

142-
pub(super) fn post_cast(
143-
&self,
144-
dest: &PyActorId,
145-
rank: usize,
146-
shape: &PyShape,
147-
message: &PythonMessage,
148-
) -> PyResult<()> {
149-
let port_id = dest.inner.port_id(PythonMessage::port());
150-
let mut headers = Attrs::new();
151-
set_cast_info_on_headers(
152-
&mut headers,
153-
rank,
154-
shape.inner.clone(),
155-
self.inner.actor_id().clone(),
156-
);
157-
let message = Serialized::serialize(message).map_err(|err| {
158-
PyRuntimeError::new_err(format!(
159-
"failed to serialize message ({:?}) to Serialized: {}",
160-
message, err
161-
))
162-
})?;
163-
let envelope =
164-
MessageEnvelope::new(self.inner.actor_id().clone(), port_id, message, headers);
165-
let return_handle = self
166-
.inner
167-
.bound_return_handle()
168-
.unwrap_or(monitored_return_handle());
169-
self.inner.post(envelope, return_handle);
170-
Ok(())
171-
}
172-
173140
#[getter]
174141
pub(super) fn actor_id(&self) -> PyActorId {
175142
PyActorId {

monarch_hyperactor/src/proc_mesh.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ use crate::mailbox::PyMailbox;
4949
use crate::pytokio::PyPythonTask;
5050
use crate::pytokio::PyShared;
5151
use crate::pytokio::PythonTask;
52-
use crate::runtime::get_tokio_runtime;
5352
use crate::shape::PyRegion;
5453
use crate::supervision::SupervisionError;
5554
use crate::supervision::Unhealthy;
@@ -340,7 +339,6 @@ impl PyProcMesh {
340339
proc_mesh: &mut PyShared,
341340
name: String,
342341
actor: Py<PyType>,
343-
emulated: bool,
344342
) -> PyResult<PyObject> {
345343
let task = proc_mesh.task()?.take_task()?;
346344
let meshimpl = async move {
@@ -368,18 +366,11 @@ impl PyProcMesh {
368366
actor_events,
369367
)))
370368
};
371-
if emulated {
372-
// we give up on doing mesh spawn async for the emulated old version
373-
// it is too complicated to make both work.
374-
let r = get_tokio_runtime().block_on(meshimpl)?;
375-
Python::with_gil(|py| r.into_py_any(py))
376-
} else {
377-
let r = PythonActorMesh::new(async move {
369+
let r = PythonActorMesh::new(async move {
378370
let meshimpl: Box<dyn ActorMeshProtocol> = meshimpl.await?;
379371
Ok(meshimpl)
380372
});
381-
Python::with_gil(|py| r.into_py_any(py))
382-
}
373+
Python::with_gil(|py| r.into_py_any(py))
383374
}
384375

385376
// User can call this to monitor the proc mesh events. This will override

python/monarch/_rust_bindings/monarch_hyperactor/actor_mesh.pyi

Lines changed: 1 addition & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ from typing_extensions import Self
1717

1818
class ActorMeshProtocol(Protocol):
1919
"""
20-
Protocol defining the common interface for actor mesh, mesh ref and _ActorMeshRefImpl.
20+
Protocol defining the common interface for actor mesh and mesh ref.
2121
"""
2222

2323
def cast(
@@ -35,34 +35,6 @@ class ActorMeshProtocol(Protocol):
3535
class PythonActorMesh(ActorMeshProtocol):
3636
pass
3737

38-
class PythonActorMeshImpl:
39-
def get_supervision_event(self) -> ActorSupervisionEvent | None:
40-
"""
41-
Returns supervision event if there is any.
42-
"""
43-
...
44-
45-
def get(self, rank: int) -> ActorId | None:
46-
"""
47-
Get the actor id for the actor at the given rank.
48-
"""
49-
...
50-
51-
def stop(self) -> PythonTask[None]:
52-
"""
53-
Stop all actors that are part of this mesh.
54-
Using this mesh after stop() is called will raise an Exception.
55-
"""
56-
...
57-
58-
def supervision_event(self) -> "Optional[Shared[Exception]]": ...
59-
@property
60-
def stopped(self) -> bool:
61-
"""
62-
If the mesh has been stopped.
63-
"""
64-
...
65-
6638
@final
6739
class ActorSupervisionEvent:
6840
@property

python/monarch/_rust_bindings/monarch_hyperactor/mailbox.pyi

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -162,15 +162,6 @@ class Mailbox:
162162
"""
163163
...
164164

165-
def post_cast(
166-
self, dest: ActorId, rank: int, shape: Shape, message: PythonMessage
167-
) -> None:
168-
"""
169-
Post a message to the provided actor. It will be handled using the handle_cast
170-
endpoint as if the destination was `rank` of `shape`.
171-
"""
172-
...
173-
174165
def undeliverable_receiver(self) -> UndeliverablePortReceiver:
175166
"""
176167
Open a port to receive undeliverable messages.

python/monarch/_rust_bindings/monarch_hyperactor/proc_mesh.pyi

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,7 @@ from typing import Any, AsyncIterator, final, Literal, overload, Type, TYPE_CHEC
1010

1111
if TYPE_CHECKING:
1212
from monarch._rust_bindings.monarch_hyperactor.actor import Actor
13-
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import (
14-
PythonActorMesh,
15-
PythonActorMeshImpl,
16-
)
13+
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
1714

1815
from monarch._rust_bindings.monarch_hyperactor.alloc import Alloc
1916
from monarch._rust_bindings.monarch_hyperactor.context import Instance
@@ -50,7 +47,7 @@ class ProcMesh:
5047

5148
@staticmethod
5249
def spawn_async(
53-
proc_mesh: Shared["ProcMesh"], name: str, actor: Type["Actor"], emulated: bool
50+
proc_mesh: Shared["ProcMesh"], name: str, actor: Type["Actor"]
5451
) -> PythonActorMesh: ...
5552
async def monitor(self) -> ProcMeshMonitor:
5653
"""

python/monarch/_src/actor/actor_mesh.py

Lines changed: 2 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import inspect
1414
import itertools
1515
import logging
16-
import random
1716
from abc import abstractproperty
1817

1918
from dataclasses import dataclass
@@ -48,16 +47,11 @@
4847
PythonMessage,
4948
PythonMessageKind,
5049
)
51-
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import (
52-
PythonActorMesh,
53-
PythonActorMeshImpl,
54-
)
50+
from monarch._rust_bindings.monarch_hyperactor.actor_mesh import PythonActorMesh
5551
from monarch._rust_bindings.monarch_hyperactor.context import Instance as HyInstance
5652
from monarch._rust_bindings.monarch_hyperactor.mailbox import (
5753
Mailbox,
58-
OncePortReceiver as HyOncePortReceiver, # noqa: F401
5954
OncePortRef,
60-
PortReceiver as HyPortReceiver, # noqa: F401
6155
PortRef,
6256
UndeliverableMessageEnvelope,
6357
)
@@ -71,8 +65,6 @@
7165
Region,
7266
Shape,
7367
)
74-
from monarch._rust_bindings.monarch_hyperactor.supervision import SupervisionError
75-
7668
from monarch._rust_bindings.monarch_hyperactor.value_mesh import (
7769
ValueMesh as HyValueMesh,
7870
)
@@ -251,10 +243,6 @@ def set(debug_context: "DebugContext") -> None:
251243
R = TypeVar("R")
252244
A = TypeVar("A")
253245

254-
# keep this load balancing deterministic, but
255-
# equally distributed.
256-
_load_balancing_seed = random.Random(4)
257-
258246

259247
class _SingletonActorAdapator:
260248
def __init__(self, inner: ActorId, region: Optional[Region] = None) -> None:
@@ -287,159 +275,6 @@ async def empty():
287275
return PythonTask.from_coroutine(empty())
288276

289277

290-
# standin class for whatever is the serializable python object we use
291-
# to name an actor mesh. Hacked up today because ActorMesh
292-
# isn't plumbed to non-clients
293-
class _ActorMeshRefImpl:
294-
def __init__(
295-
self,
296-
mailbox: Mailbox,
297-
hy_actor_mesh: Optional[PythonActorMeshImpl],
298-
proc_mesh: "Optional[ProcMesh]",
299-
shape: Shape,
300-
actor_ids: List[ActorId],
301-
) -> None:
302-
self._mailbox = mailbox
303-
self._actor_mesh = hy_actor_mesh
304-
# actor meshes do not have a way to look this up at the moment,
305-
# so we fake it here
306-
self._proc_mesh = proc_mesh
307-
self._shape = shape
308-
self._please_replace_me_actor_ids = actor_ids
309-
310-
@staticmethod
311-
def from_hyperactor_mesh(
312-
mailbox: Mailbox,
313-
shape: Shape,
314-
hy_actor_mesh: PythonActorMeshImpl,
315-
proc_mesh: "ProcMesh",
316-
) -> "_ActorMeshRefImpl":
317-
return _ActorMeshRefImpl(
318-
mailbox,
319-
hy_actor_mesh,
320-
proc_mesh,
321-
shape,
322-
[cast(ActorId, hy_actor_mesh.get(i)) for i in range(len(shape))],
323-
)
324-
325-
def __getstate__(
326-
self,
327-
) -> Tuple[Shape, List[ActorId], Mailbox]:
328-
return self._shape, self._please_replace_me_actor_ids, self._mailbox
329-
330-
def __setstate__(
331-
self,
332-
state: Tuple[Shape, List[ActorId], Mailbox],
333-
) -> None:
334-
self._actor_mesh = None
335-
self._shape, self._please_replace_me_actor_ids, self._mailbox = state
336-
337-
def _check_state(self) -> None:
338-
# This is temporary until we have real cast integration here. We need to actively check
339-
# supervision error here is because all communication is done through direct mailbox sending
340-
# and not through comm actor casting.
341-
# TODO: remove this when casting integration is done.
342-
if self._actor_mesh is not None:
343-
if self._actor_mesh.stopped:
344-
raise SupervisionError(
345-
"actor mesh is unhealthy with reason: actor mesh is stopped due to proc mesh shutdown. "
346-
"`PythonActorMesh` has already been stopped."
347-
)
348-
349-
event = self._actor_mesh.get_supervision_event()
350-
if event is not None:
351-
raise SupervisionError(f"actor mesh is unhealthy with reason: {event}")
352-
353-
def cast(
354-
self,
355-
message: PythonMessage,
356-
selection: str,
357-
instance: HyInstance,
358-
) -> None:
359-
self._check_state()
360-
361-
# TODO: use the actual actor mesh when available. We cannot currently use it
362-
# directly because we risk bifurcating the message delivery paths from the same
363-
# client, since slicing the mesh will produce a reference, which calls actors
364-
# directly. The reason these paths are bifurcated is that actor meshes will
365-
# use multicasting, while direct actor comms do not. Separately we need to decide
366-
# whether actor meshes are ordered with actor references.
367-
#
368-
# The fix is to provide a first-class reference into Python, and always call "cast"
369-
# on it, including for load balanced requests.
370-
if selection == "choose":
371-
idx = _load_balancing_seed.randrange(len(self._shape))
372-
actor_rank = self._shape.ndslice[idx]
373-
self._mailbox.post(self._please_replace_me_actor_ids[actor_rank], message)
374-
elif selection == "all":
375-
# replace me with actual remote actor mesh
376-
call_shape = Shape(
377-
self._shape.labels, NDSlice.new_row_major(self._shape.ndslice.sizes)
378-
)
379-
for i, rank in enumerate(self._shape.ranks()):
380-
self._mailbox.post_cast(
381-
self._please_replace_me_actor_ids[rank],
382-
i,
383-
call_shape,
384-
message,
385-
)
386-
elif isinstance(selection, int):
387-
try:
388-
self._mailbox.post(
389-
self._please_replace_me_actor_ids[selection], message
390-
)
391-
except IndexError:
392-
raise IndexError(
393-
f"Tried to send to an out-of-range rank {selection}: "
394-
f"mesh has {len(self._please_replace_me_actor_ids)} elements."
395-
)
396-
else:
397-
raise ValueError(f"invalid selection: {selection}")
398-
399-
def __len__(self) -> int:
400-
return len(self._shape)
401-
402-
@property
403-
def _name_pid(self):
404-
actor_id0 = self._please_replace_me_actor_ids[0]
405-
return actor_id0.actor_name, actor_id0.pid
406-
407-
@property
408-
def shape(self) -> Shape:
409-
return self._shape
410-
411-
@property
412-
def proc_mesh(self) -> Optional["ProcMesh"]:
413-
return self._proc_mesh
414-
415-
def new_with_region(self, region: Region) -> "_ActorMeshRefImpl":
416-
return _ActorMeshRefImpl(
417-
self._mailbox,
418-
None,
419-
None,
420-
region.as_shape(),
421-
self._please_replace_me_actor_ids,
422-
)
423-
424-
def supervision_event(self) -> "Optional[Shared[Exception]]":
425-
if self._actor_mesh is None:
426-
return None
427-
return self._actor_mesh.supervision_event()
428-
429-
def stop(self) -> PythonTask[None]:
430-
async def task():
431-
if self._actor_mesh is not None:
432-
self._actor_mesh.stop()
433-
434-
return PythonTask.from_coroutine(task())
435-
436-
def initialized(self) -> PythonTask[None]:
437-
async def task():
438-
pass
439-
440-
return PythonTask.from_coroutine(task())
441-
442-
443278
class ActorEndpoint(Endpoint[P, R]):
444279
def __init__(
445280
self,
@@ -1148,7 +983,7 @@ def _endpoint(
1148983
def _create(
1149984
cls,
1150985
Class: Type[T],
1151-
actor_mesh: "PythonActorMesh | PythonActorMeshImpl",
986+
actor_mesh: "PythonActorMesh",
1152987
mailbox: Mailbox,
1153988
shape: Shape,
1154989
proc_mesh: "ProcMesh",
@@ -1158,11 +993,6 @@ def _create(
1158993
*args: Any,
1159994
**kwargs: Any,
1160995
) -> "ActorMesh[T]":
1161-
if isinstance(actor_mesh, PythonActorMeshImpl):
1162-
actor_mesh = _ActorMeshRefImpl.from_hyperactor_mesh(
1163-
mailbox, shape, actor_mesh, proc_mesh
1164-
)
1165-
1166996
mesh = cls(Class, actor_mesh, shape, proc_mesh)
1167997

1168998
async def null_func(*_args: Iterable[Any], **_kwargs: Dict[str, Any]) -> None:

0 commit comments

Comments
 (0)