Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,13 @@ pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
/// message is either delivered, or we eventually discover that
/// the channel has failed and it will be sent back on `return_channel`.
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
#[hyperactor::instrument_infallible]
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
self.do_post(message, Some(return_channel));
}

/// Enqueue a message to be sent on the channel.
#[hyperactor::instrument_infallible]
fn post(&self, message: M) {
self.do_post(message, None);
}
Expand Down Expand Up @@ -803,6 +805,7 @@ enum ChannelRxKind<M: RemoteMessage> {

#[async_trait]
impl<M: RemoteMessage> Rx<M> for ChannelRx<M> {
#[hyperactor::instrument]
async fn recv(&mut self) -> Result<M, ChannelError> {
match &mut self.inner {
ChannelRxKind::Local(rx) => rx.recv().await,
Expand Down
9 changes: 7 additions & 2 deletions hyperactor/src/channel/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ use tokio::time::Duration;
use tokio::time::Instant;
use tokio_util::net::Listener;
use tokio_util::sync::CancellationToken;
use tracing::Instrument;

use super::*;
use crate::RemoteMessage;
Expand Down Expand Up @@ -652,7 +653,11 @@ impl<M: RemoteMessage> NetTx<M> {
unacked,
}),
conn,
) if outbox.is_empty() && unacked.is_empty() => match receiver.recv().await {
) if outbox.is_empty() && unacked.is_empty() => match receiver
.recv()
.instrument(tracing::debug_span!("receiver.recv"))
.await
{
Some(msg) => match outbox.push_back(msg) {
Ok(()) => {
let running = State::Running(Deliveries { outbox, unacked });
Expand Down Expand Up @@ -828,7 +833,7 @@ impl<M: RemoteMessage> NetTx<M> {
// UnboundedReceiver::recv() is cancel safe.
// Only checking mspc channel when outbox is empty. In this way, we prioritize
// sending messages already in outbox.
work_result = receiver.recv(), if outbox.is_empty() => {
work_result = receiver.recv().instrument(tracing::debug_span!("receiver.recv")), if outbox.is_empty() => {
match work_result {
Some(msg) => {
match outbox.push_back(msg) {
Expand Down
2 changes: 2 additions & 0 deletions hyperactor/src/channel/net/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ impl<R: AsyncRead + Unpin> FrameReader<R> {
/// `max_frame_length`. **This error is fatal:** once returned,
/// the `FrameReader` must be dropped; the underlying connection
/// is no longer valid.
#[tracing::instrument(skip_all)]
pub async fn next(&mut self) -> io::Result<Option<Bytes>> {
loop {
match &mut self.state {
Expand Down Expand Up @@ -218,6 +219,7 @@ impl<W: AsyncWrite + Unpin, B: Buf> FrameWrite<W, B> {
/// returned futures at any time. Upon completion, the frame is guaranteed to be
/// written, unless an error was encountered, in which case the underlying stream
/// is in an undefined state.
#[tracing::instrument(skip_all)]
pub async fn send(&mut self) -> io::Result<()> {
loop {
if self.len_buf.has_remaining() {
Expand Down
1 change: 1 addition & 0 deletions hyperactor/src/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,7 @@ impl MailboxClient {
}

impl MailboxSender for MailboxClient {
#[hyperactor::instrument_infallible]
fn post_unchecked(
&self,
envelope: MessageEnvelope,
Expand Down
1 change: 1 addition & 0 deletions hyperactor_mesh/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ declare_attrs! {
/// Common implementation for `ActorMesh`s and `ActorMeshRef`s to cast
/// an `M`-typed message
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `CastError`.
#[hyperactor::instrument]
pub(crate) fn actor_mesh_cast<A, M>(
cx: &impl context::Actor,
actor_mesh_id: ActorMeshId,
Expand Down
2 changes: 2 additions & 0 deletions hyperactor_mesh/src/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ impl Handler<CommActorMode> for CommActor {
// TODO(T218630526): reliable casting for mutable topology
#[async_trait]
impl Handler<CastMessage> for CommActor {
#[hyperactor::instrument]
async fn handle(&mut self, cx: &Context<Self>, cast_message: CastMessage) -> Result<()> {
// Always forward the message to the root rank of the slice, casting starts from there.
let slice = cast_message.dest.slice.clone();
Expand Down Expand Up @@ -375,6 +376,7 @@ impl Handler<CastMessage> for CommActor {

#[async_trait]
impl Handler<ForwardMessage> for CommActor {
#[hyperactor::instrument]
async fn handle(&mut self, cx: &Context<Self>, fwd_message: ForwardMessage) -> Result<()> {
let ForwardMessage {
sender,
Expand Down
1 change: 1 addition & 0 deletions monarch_hyperactor/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ impl PanicFlag {

#[async_trait]
impl Handler<PythonMessage> for PythonActor {
#[hyperactor::instrument]
async fn handle(
&mut self,
cx: &Context<PythonActor>,
Expand Down
1 change: 1 addition & 0 deletions monarch_hyperactor/src/actor_mesh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ pub(crate) fn to_hy_sel(selection: &str) -> PyResult<Selection> {

#[pymethods]
impl PythonActorMesh {
#[hyperactor::instrument]
fn cast(
&self,
message: &PythonMessage,
Expand Down
Loading