Skip to content

Conversation

Matthew-Whitlock
Copy link

@Matthew-Whitlock Matthew-Whitlock commented Aug 26, 2025

There's still work to be done, and this is currently only very trivially tested. I'm opening this now to check in on the general reception to these changes.

The broad strokes:

  • When a daemon dies, its highest-rank child is promoted to its position in the routing tree
    • That child's highest-rank child is promoted up to promote the new hole, and so on
    • This approach minimizes the number of edge changes for each daemon, while still ensuring that each rank's understanding of the routing tree is only based on the failures it knows about, not the order they occurred in.
  • Xcasts are now inherently fault tolerant
    • They will execute in the same local order on all ranks
    • They are guaranteed to reach all ranks and their final message delivery occurs only once on each rank
  • Using the fault-tolerant xcasts, we can establish a globally consistent view of failure counts and ordering, which will simplify making the other operations resilient
    • For now, other components get a call notifying them of the failure and just mark the job state as COMM_FAILURE if they had any ongoing operations.

As it stands, this is only tested with small node counts and a very simple LAMMPS job, with Fenix used for process recovery. It works fault-free, with user process failures, and with daemon failures via killing the processes. Better testing will be important though.
(I also haven't tested since rebasing on some other commits. There didn't seem to be any big conflicts, but if there are obvious errors trying to build I'll fix that)

Copy link

Hello! The Git Commit Checker CI bot found a few problems with this PR:

a78c778: Initial fault tolerance commit

  • check_signed_off: does not contain a valid Signed-off-by line

Please fix these problems and, if necessary, force-push new commits back up to the PR branch. Thanks!

1 similar comment
Copy link

Hello! The Git Commit Checker CI bot found a few problems with this PR:

a78c778: Initial fault tolerance commit

  • check_signed_off: does not contain a valid Signed-off-by line

Please fix these problems and, if necessary, force-push new commits back up to the PR branch. Thanks!

@rhc54
Copy link
Contributor

rhc54 commented Aug 26, 2025

Intriguing! Definitely appreciated - confess I wasn't expecting it, but thanks for tackling it.

I haven't looked it over in-depth, but will try to do so as time permits. Offhand, I certainly have no objection to pursuing it.

Few thoughts, purely FWIW. In the past, there was a resilient form of PRRTE. Little different approach - what we did was to have all children of a failed parent reconnect to their grandparent. When the parent restarted, we simply had the children move back to it. The resilience, therefore, was at the individual message level and not the operation.

This required that we number the messages being sent so that the children could request a resend of any message lost when the parent died. Worked okay. Requires, however, that you return a failed status on any collective that required participation from the failed daemon since you couldn't know if the daemon had something to contribute.

You'll face the same issues here, of course. The xcast isn't impacted in that way because it is a one-way blast. However, fence and group construct operations are collectives that require contributions, so you might simply have to abort those with an appropriate error.

Then we have the direct modex exchange, and all the direct daemon-to-HNP messaging that occurs (e.g., triggering an HNP-led xcast to the other daemons, or updating proc status). These are simple direct messages and not collectives. Quite a few of those, really.

I'd be interested to hear any thoughts you have on how to apply this approach to those areas?

@Matthew-Whitlock
Copy link
Author

Matthew-Whitlock commented Aug 27, 2025

Few thoughts, purely FWIW. In the past, there was a resilient form of PRRTE. Little different approach - what we did was to have all children of a failed parent reconnect to their grandparent. When the parent restarted, we simply had the children move back to it. The resilience, therefore, was at the individual message level and not the operation.

I saw you mention in some closed issues about this that there was some support in the past. Can you tell me more about why that's no longer supported? I think a mix of message-level and operation-level resilience would be ideal. Something like a PRTE_RML_SEND_RELIABLY(...) would be helpful for, e.g., the IOF component.

This required that we number the messages being sent so that the children could request a resend of any message lost when the parent died. Worked okay. Requires, however, that you return a failed status on any collective that required participation from the failed daemon since you couldn't know if the daemon had something to contribute.

You'll face the same issues here, of course. The xcast isn't impacted in that way because it is a one-way blast. However, fence and group construct operations are collectives that require contributions, so you might simply have to abort those with an appropriate error.

Yeah, I think returning an error is the most straightforward default result. It also seems reasonable to support an attribute that requests continuing past faults and returns a list of nodes that died.

Then we have the direct modex exchange, and all the direct daemon-to-HNP messaging that occurs (e.g., triggering an HNP-led xcast to the other daemons, or updating proc status). These are simple direct messages and not collectives. Quite a few of those, really.

In some cases, like the message triggering an xcast, it seems reasonable to do a simple message replay. For the xcast trigger, I'm replaying it if the initiator hasn't seen the xcast yet. In that case, we can assert that the HNP will have enough information to simply discard it if the original message wasn't lost.

In most cases, I agree that launching reliable messages makes the most sense. The simplest approach would be to keep a 1:1 send/recv count for each remote node. If that seems too costly memory-wise, I have some mostly-formed thoughts on getting away with generating a unique, but not necessarily sequential, ID for each sent message and holding the most-recently-received ID from each node that is actively communicating with a given node. This would be relying on message ACKs, though, so we trade some memory usage for some network usage.
Here it becomes handy to have the concept of a globally-consistent failure state independent from the local failure knowledge/state - we can include a failure 'epoch' on these reliable messages to ensure correct ordering even when only some messages are lost due to a failure and/or the path between two nodes grows/shrinks while messages are in transit.

@rhc54
Copy link
Contributor

rhc54 commented Aug 27, 2025

I saw you mention in some closed issues about this that there was some support in the past. Can you tell me more about why that's no longer supported? I think a mix of message-level and operation-level resilience would be ideal. Something like a PRTE_RML_SEND_RELIABLY(...) would be helpful for, e.g., the IOF component.

Back in 2009, I was recruited to create the Open Resilient Cluster Manager (ORCM) - basically, took what was then ORTE and added resilient messaging (for all messages) and a "resilient" mapper that placed processes that died due to the local daemon failure so they could be restarted. Was eventually absorbed into a corporate product and I moved back to doing other things.

Once I started focusing on HPC again back in 2014, I took the then-current state of ORTE and made it into PRRTE. Looked at porting over the resilient messaging from ORCM, but there was zero demand for it, and so I left it behind. At this point, I'm not sure it would be worth the effort to try and bring that code forward.

Yeah, I think returning an error is the most straightforward default result. It also seems reasonable to support an attribute that requests continuing past faults and returns a list of nodes that died.

It's not too hard to deal with single faults - you get into significant problems when dealing with multiple simultaneous or cascade daemon failures. At some point, you just have to throw up your hands and surrender.

In most cases, I agree that launching reliable messages makes the most sense. The simplest approach would be to keep a 1:1 send/recv count for each remote node. If that seems too costly memory-wise, I have some mostly-formed thoughts on getting away with generating a unique, but not necessarily sequential, ID for each sent message and holding the most-recently-received ID from each node that is actively communicating with a given node. This would be relying on message ACKs, though, so we trade some memory usage for some network usage.

The radix fanout typically isn't all that large (default is 64, I believe), so what I had done was create a small ring buffer for each endpoint. Don't need much as the message dwell time in any process is small since there is an async progress thread sending/receiving them. Insert the message number in the header. When a daemon fails, the child connects to the grandparent and passes along the last message number it sent (and who it was going to) and received. Grandparent checks its ring buffer and sends anything the child missed (that should have gone there, even to relay), and requests resend of anything that it didn't receive (and should have received, even as a relay). This also preserves the routing tree hierarchy, so you know that any message that was coming to the child must come through the parent, who must have gotten it from the grandparent.

Lots of hand-waiving details in there. Bottom line is that it didn't consume much memory nor rely on ACKs (as the ACK is just as likely to be lost as the original message). Resiliency is therefore accomplished strictly at the pt-to-pt level, not source-to-dest or operation, to keep it simple.

Not advocating to reproduce it - just passing along what was done.

@Matthew-Whitlock
Copy link
Author

Matthew-Whitlock commented Aug 28, 2025

The radix fanout typically isn't all that large (default is 64, I believe), so what I had done was create a small ring buffer for each endpoint. Don't need much as the message dwell time in any process is small since there is an async progress thread sending/receiving them. ...
Lots of hand-waiving details in there. Bottom line is that it didn't consume much memory nor rely on ACKs (as the ACK is just as likely to be lost as the original message).

If I'm understanding correctly, you avoided acks by heuristically deciding when to assume the message had been delivered and therefore safely destroy the local copy? I like the idea of this as a means for improving performance by essentially having local caches of messages along their send paths. I'd be concerned about data loss if several consecutive nodes in the path failed at once.

How would you feel about a two-stage ACK process?

  • X sends the signature + message to Y, keeping both in a local list of ongoing resilient messages.
  • Y ACKS with the signature, processes & discards the message, but keeps the signature in a list of received resilient messages to avoid receiving it twice.
  • X deletes the signature + message from its list, and ACKS the final time with the signature.
  • Y cleans up its received messages list with the final ACK.

This ensures that messages can be lost at any point in the process without risking duplicate processing or truly lost messages.

After the baseline implementation of that is working, it can be optimized by having each node hold onto the signatures and ACK state of messages that pass through it (following the same rules as Y) for a few benefits:

  1. Continuing the ack messages as closely as possible to the final destinations after fault handling
  2. Easily understanding when an ACK/message must be replayed vs conservatively assuming after all failures or after all failures along the message path
  3. Reducing the number of individual ACK messages by bundling them per endpoint, i.e. each rank delays propagating ACKS until N ACKs are ready to be sent to/through a given child.

@rhc54
Copy link
Contributor

rhc54 commented Aug 29, 2025

Hmmm…I think maybe we could benefit from a quick recap of the problem. Let’s say we have three processes in a linear tree: A <--> B <--> C. We don’t need to “ack” messages between A and B or between B and C - we’ll see socket closures if either end fails.

If B fails, then C will reconnect to A. In that handshake, C needs to tell A the last message it received from B and the last message it attempted to send thru B. A checks its “send-to-B” ring buffer to see if that matches, and resends any messages that are missing. A then checks its “recvd-from-B” ring buffer to see if it is missing any messages from C, and requests that C resend those messages that are missing.

In both cases, there is no need for an “ack” - we’ll find what’s missing and fix it.

You are asking the question: what happens if I have a deeper tree like A <--> B <--> C <--> D, and both B and C simultaneously fail? A has no direct connection with C or D, so it has no ring buffers that cover those pathways. When D reconnects to A, it will report the last message it received from C. A can check its ring buffer for B to see if there are any messages in it that would have gone to/thru D but are missed by D, and resend those. Ditto for messages sent by D that would have routed thru B.

You can see the “hole” here - if a message to/from D would have routed thru C, but not via B (e.g., from another child of C - let’s call it E), then that message could be lost. So the handshake gets a little more complicated. Since C and B failed, E will also reconnect to A. In this case, E has to tell A what messages it was attempting to send thru C to get to D, and D needs to tell A the last message it got from E. A needs to compare those and ask for resends from E for any missing info.

You can see that the handshake gets increasingly complex with higher tree depth as you have to consider all the child-to-child interactions. Fortunately, there are very few child-to-child interactions outside of rollup (E sending to C to go thru B to A) and xcast (the other direction), and we tend to keep the tree depth shallow. So in practice this tends to be fairly manageable - E won't report attempting to send any messages to D, and D won't report having received any messages from E, so A can just ignore that combination.

The coding is rather gnarly, but manageable if you hierarchically structure it to mirror the tree. Nothing is perfect.

The issue with ACKs is that the ack itself is just as unreliable as the original message. So you fall into this spiral of needing to ack the ack, and then ack the ack-ack, etc. At some point, you have to cut it off and just trust that the last ack made it through - but that actually isn’t any more reliable than trusting that the original message made it. And after all that, you still have to cache the messages since you will need to replay them once you determine that something didn’t make it through.

Doing things at the more atomistic level avoids all that, but the reconnect handshake becomes more complex due to the complexity of the tree. Ultimately up to you as the implementer - I’d just be skeptical of basing it all on “acks” that have to pass thru relays. Also, I'd be wary of striving for perfection here. At some point, there are enough simultaneous failures to warrant declaring the system "dead" and in need of a restart - or at least some very serious attention!

@Matthew-Whitlock
Copy link
Author

My question about simultaneous failures was more about knowing when a daemon can safely release its copy of the messages it has sent.

Say we set our ring buffer to 5 messages, and A is messaging C along the path: A -> B -> C. Once A sends messages 1-6 to B, A's ring buffer will no longer have message 1. If there's been a burst of communication * -> B -> C, B may not have had an opportunity to send A's message 1. If B fails at this point, that message's data is entirely lost.

If, instead, A was messaging D along the path A -> B -> C -> D when B and C fail, we will lose message 1 unless it has passed all the way through both B and C. So the ring buffer's best size depends on things like how consistent network performance is, how bursty the communication is, how long the message paths might be, and how reliable each node is.

The ACK-based approach I'm pitching works largely the same way, but A serves as a guaranteed source for the original message data of any lost messages by only releasing its local copy once the final destination's ACK comes in. Ignoring the optimization details (avoiding unnecessary message replays, using the ring buffers as local replay caches, etc), here's my reasoning for why we can cover everything with an ACK-ACK.

From A's perspective:

  • A sends a message to D with some ID, keeping a local copy of it. If faults are detected at this point, A replays the message.
    • If D's ACK was lost, not the original message, then we've needlessly replayed the message. However, we can assert that D knows to simply ignore the duplicate messages because it has kept record of receiving that ID.
  • A receives an ACK from D for this message ID. It sends the ACK-ACK and releases the original message. If failures are detected here, A does nothing because it no longer has any memory of this message.
    • If the ACK-ACK was lost, D may replay the ACK and A will no longer have memory of that ID.
    • A infers that it sent a message with this ID since it received an ACK, and that it has already confirmed the message has been completed since it no longer remembers the message. It can safely repeat the ACK-ACK to that ID and otherwise ignore the duplicate ACK.

From D's perspective:

  • D receives a message from A with some ID. It ACKs, processes, and releases it, keeping a copy of the message's ID. If faults are detected here, D replays the ACK.
    • If D's ACK was lost, A will replay the message, but D checks its local received message IDs and knows to discard the duplicate message.
    • Else, if A's ACK-ACK was lost, this ACK is a duplicate, but A will know to replay its ACK-ACK
  • D receives an ACK-ACK from A for this message ID. It removes that ID from its list of received messages, confident that A won't try replaying it again.
    • If D receives a duplicate ACK-ACK due to message latencies, it will not find the ID in its local received messages list. It can infer that this is a duplicate and ignore it.

I'm confident in the correctness of this solution (though I am, of course, still happily open to discussion, being questioned, or being proven wrong). I was really aiming to double check that ACKs wouldn't be considered too much of a performance burden on the network before I take the time to implement/verify them. It sounds to me like that's not a concern of yours, though, so I'll get started on this.

Thanks for taking the time to read through all of this and being so responsive!

@rhc54
Copy link
Contributor

rhc54 commented Aug 29, 2025

Ultimately, I leave it up to you as the implementer and only try to provide guidance/advice. I'm not concerned about performance burden as the runtime is on the management network and has no impact on MPI or other high-performance messaging. The sys admins will get concerned if they see too much traffic on the management net as it can interfere with their own communications (e.g., usage stats), but the runtime is generally quiet except for brief times when needed to support the app - generally at startup (when we wire the application procs together), during error reporting, and when supporting the occasional dynamic operation.

So I doubt your proposed method will raise any eyebrows 😄

@bosilca
Copy link
Contributor

bosilca commented Aug 29, 2025

A little bit of literature about error detection and propagation.

Reliable message delivery is a non-scalable design, because without expensive consensus per message, an intermediary routing process would have to keep a copy of the message for a very long time. That's why in ULFM, we decided to go the other route, deliver fast what you can (basically the same messaging layer as without fault tolerance), return errors where possible and design algorithms where global status changes are validated by consensus, and if this fails restart the entire status change.

@rhc54
Copy link
Contributor

rhc54 commented Aug 29, 2025

IIUC, @Matthew-Whitlock proposes to avoid holding messages for long times by having A drop its cached message as soon as B acknowledges completing the relay of that message, if required (which means that the next layer down the tree has received it). So the message is only cached for the time it takes to reach B and be acknowledged. Will have to see his final design to fully analyze it and see how that works, and to ensure that the runtime can be guaranteed to tear down when it needs to or when recovery really isn't possible.

@Matthew-Whitlock
Copy link
Author

@bosilca I agree, which is why I'm aiming for a mix of reliable message delivery and reliable operations.

With this approach, fault-free xcasts get slightly more expensive (w/ a communication pattern that looks more like a fence or barrier). Fences should have no fault-free penalty and either abort on failure or have localized (and then propagating upward) delays due to failures by using a similar design as the xcasts. I haven't dug into group creation, but the process is described as very similar to fences so I'm hopeful I can make the same claim there. So we can avoid the more expensive generic reliable message delivery approach for what I understand to be the more time consuming and communication bound operations.

This design also keeps the ACKs out of the critical path - they're only used to release the data copies. Since PRRTE is all asynchronous, we can immediately continue with other work while the ACKs happen "in the background." As long as the generically reliable messages aren't bandwidth bound, we should hopefully have negligible overheads.

@rhc54 A will need to hold its message until the final destination acknowledges receipt, but only for the subset of 'generically reliable' messages described above (generally, messages not belonging to group operations). However, A only has to hold messages initiated locally, it can cache or drop forwarded messages freely.

@rhc54
Copy link
Contributor

rhc54 commented Aug 29, 2025

A will need to hold its message until the final destination acknowledges receipt

Sounds like you are back to the ack flowing through relays, which is known to not be any more reliable than the original message itself. Running ack's through relays feels like you are just begging for trouble. Still, I'm happy to wait and look at what you do.

@bosilca
Copy link
Contributor

bosilca commented Aug 29, 2025

If a message is directly delivered to the final destination it will work, a validated message can be dropped from the cache. However, if the message is routed through intermediary nodes (and B is an intermediary), these intermediaries cannot do the tracking because if they disappear the message (and its ongoing tracking) will be lost. Thus, if the message is routed, you can either do a tracking at each level, and remove the message while the reception confirmation is flowing back (which requires the routing to be symmetric), or track the message at the source, because if you lose the source while the message is in transit, who cares if the message is delivered or not, and if you don't lose the source you can always retransmit. In any case, a message can only be removed from the cache when a reception confirmation from the final destination is seen.

If only the source keeps track of the message, the only way to know a message can be removed from the cache is when you receive a confirmation from the message final destination. More scalable, as a single copy is required, but confirmation for each message is necessary. But then the question became: if the confirmation message is also routed, how do you guarantee it is never lost along the way ?

The answer is timeout: a message not validated in a well-defined amount of time will be resent. Double receives need to be tracked, and timeout for all message. False positive timeout need to be handled also.

I guess the only thing that is clear is that there is no simple solution.

@Matthew-Whitlock
Copy link
Author

I'm assuming that the TCP layer will ensure that messages only fail to send if a process has died (or had some network issue which will cause us to globally presume it is dead, functionally the same). This means ranks can handshake after faults to avoid relying on timeouts. The TCP layer does use timeouts and keepalives to ensure the socket is still open, but that's kernel-managed and peer-to-peer rather than per message.

See the above "From A's perspective" and "From D's perspective" for how double receives, lost ACKs, and duplicate ACKs are handled. But this kind of thing will likely be clearer once fully specified in code.

@rhc54
Copy link
Contributor

rhc54 commented Aug 29, 2025

Sounds fine. I'm still not convinced that you have to hold the message at the original sender until the final recipient ack's it all the way back through the relay chain - didn't have to do that in the prior implementation, and it worked fine. However, I admit we may not have exposed that to a broad range of scenarios, and maybe we missed something in our testing.

I will, however, agree with @bosilca - relying on routed ack's just seems unreliable. Maybe doing some kind of timeout on the ack will help, but I'm somewhat skeptical of a design that relies on a routed ack mechanism.

@rhc54
Copy link
Contributor

rhc54 commented Aug 29, 2025

I still feel like we're missing something as the ORCM design was simpler than what is being proposed, and worked fine. Key piece was that we know the routing for all messages. I'll try to dig up an old design doc and sanitize it over the next couple of days - will post it on a gist.

@Petter-Programs
Copy link

Hi,

I am working on a project in which fault tolerance like this would be great. I'd be more than happy to help you test -- I took the liberty of trying the code and found that, at least on my side, launches do not seem to work across >1 nodes with plm ssh.

/apps/GPP/DMR/install/prrte-shrinkable/bin/prterun --prtemca prte_plm_base_verbose 100 --runtime-options fwd-environment --prtemca ras ^slurm --prtemca plm ^slurm --host host1:2,host2:2 ./sleepOfJobs 16 4
[host1:1770849] mca: base: component_find: searching NULL for plm components
[host1:1770849] mca: base: find_dyn_components: checking NULL for plm components
[host1:1770849] pmix:mca: base: components_register: registering framework plm components
[host1:1770849] pmix:mca: base: components_register: found loaded component ssh
[host1:1770849] pmix:mca: base: components_register: component ssh register function successful
[host1:1770849] mca: base: components_open: opening plm components
[host1:1770849] mca: base: components_open: found loaded component ssh
[host1:1770849] mca: base: components_open: component ssh open function successful
[host1:1770849] mca:base:select: Auto-selecting plm components
[host1:1770849] mca:base:select:(  plm) Querying component [ssh]
[host1:1770849] [[INVALID],UNDEFINED] plm:ssh_lookup on agent ssh : rsh path NULL
[host1:1770849] mca:base:select:(  plm) Query of component [ssh] set priority to 10
[host1:1770849] mca:base:select:(  plm) Selected component [ssh]
[host1:1770849] [prterun-host1-1770849@0,0] plm:ssh_setup on agent ssh : rsh path NULL
[host1:1770849] [prterun-host1-1770849@0,0] plm:base:receive start comm
[host1:1770849] [prterun-host1-1770849@0,0] plm:base:setup_vm
[host1:1770849] [prterun-host1-1770849@0,0] plm:base:setup_vm creating map
[host1:1770849] [prterun-host1-1770849@0,0] setup:vm: working unmanaged allocation
[host1:1770849] [prterun-host1-1770849@0,0] using dash_host
[host1:1770849] [prterun-host1-1770849@0,0] checking node host1
[host1:1770849] [prterun-host1-1770849@0,0] ignoring myself
[host1:1770849] [prterun-host1-1770849@0,0] checking node host2
[host1:1770849] [prterun-host1-1770849@0,0] plm:base:setup_vm add new daemon [prterun-host1-1770849@0,1]
[host1:1770849] [prterun-host1-1770849@0,0] plm:base:setup_vm assigning new daemon [prterun-host1-1770849@0,1] to node host2
[host1:1770849] [prterun-host1-1770849@0,0] plm:ssh: launching vm
[host1:1770849] [prterun-host1-1770849@0,0] plm:ssh: local shell: 0 (bash)
[host1:1770849] [prterun-host1-1770849@0,0] plm:ssh: assuming same remote shell as local shell
[host1:1770849] [prterun-host1-1770849@0,0] plm:ssh: remote shell: 0 (bash)
[host1:1770849] [prterun-host1-1770849@0,0] plm:ssh: final template argv:
	/usr/bin/ssh <template> PRTE_PREFIX=/apps/GPP/DMR/install/prrte-shrinkable;export PRTE_PREFIX;LD_LIBRARY_PATH=/apps/GPP/DMR/install/prrte-shrinkable/lib:/apps/GPP/DMR/install/pmix-for-shrinkable-prrte/lib:$LD_LIBRARY_PATH;export LD_LIBRARY_PATH;DYLD_LIBRARY_PATH=/apps/GPP/DMR/install/prrte-shrinkable/lib:/apps/GPP/DMR/install/pmix-for-shrinkable-prrte/lib:$DYLD_LIBRARY_PATH;export DYLD_LIBRARY_PATH;/apps/GPP/DMR/install/prrte-shrinkable/bin/prted --prtemca ess "env" --prtemca ess_base_nspace "prterun-host1-1770849@0" --prtemca ess_base_vpid "<template>" --prtemca ess_base_num_procs "2" --prtemca prte_hnp_uri "[email protected];tcp://10.2.1.200,10.1.1.200,10.9.1.200:43391:16,16,16" --prtemca PREFIXES "errmgr,ess,filem,grpcomm,iof,odls,plm,prtebacktrace,prtedl,prteinstalldirs,prtereachable,ras,rmaps,schizo,state,hwloc,if,reachable" --prtemca prte_plm_base_verbose "100" --prtemca ras "^slurm" --prtemca plm "^slurm" --prtemca pmix_session_server "1" --prtemca plm "ssh" --tree-spawn --prtemca prte_parent_uri "[email protected];tcp://10.2.1.200,10.1.1.200,10.9.1.200:43391:16,16,16"
[host1:1770849:0:1770849] Caught signal 11 (Segmentation fault: address not mapped to object at address (nil))
==== backtrace (tid:1770849) ====
 0  /apps/GPP/UCX/1.16.0/GCC/lib/libucs.so.0(ucs_handle_error+0x294) [0x7f83f92bbcb4]
 1  /apps/GPP/UCX/1.16.0/GCC/lib/libucs.so.0(+0x34e77) [0x7f83f92bbe77]
 2  /apps/GPP/UCX/1.16.0/GCC/lib/libucs.so.0(+0x35146) [0x7f83f92bc146]
 3  /lib64/libc.so.6(+0x54df0) [0x7f83f9dbfdf0]
 4  /apps/GPP/DMR/install/prrte-shrinkable/lib/libprrte.so.4(+0xee607) [0x7f83fa44b607]
 5  /lib64/libevent_core-2.1.so.7(+0x21a98) [0x7f83f9ff3a98]
 6  /lib64/libevent_core-2.1.so.7(event_base_loop+0x577) [0x7f83f9ff57a7]
 7  /apps/GPP/DMR/install/prrte-shrinkable/bin/prterun() [0x405af7]
 8  /lib64/libc.so.6(+0x3feb0) [0x7f83f9daaeb0]
 9  /lib64/libc.so.6(__libc_start_main+0x80) [0x7f83f9daaf60]
10  /apps/GPP/DMR/install/prrte-shrinkable/bin/prterun() [0x407725]
=================================

@rhc54
Copy link
Contributor

rhc54 commented Sep 1, 2025

I'd hold off on testing and/or fixing things on this PR as I'm not convinced this is the path we should take. I found the old doc on ORCM and am working to create a sanitized version. Probably be another day or two as I have granddaughter duty today.

@rhc54
Copy link
Contributor

rhc54 commented Sep 1, 2025

FWIW: if you are interested, the PMIx/PRRTE developers meet on the first Thurs of each month for a one-hour discussion over topics such as this. You are welcome to join us this Thurs Sept 4 at 11am US Eastern if you like and have time. Connection details are here: https://openpmix.org/captcha/

@rhc54
Copy link
Contributor

rhc54 commented Sep 2, 2025

Okay, here is the doc on how ORCM did reliable messaging - hope it helps. Feel free to ask questions about it.

ResilientMessaging.pdf

@Matthew-Whitlock
Copy link
Author

I am working on a project in which fault tolerance like this would be great. I'd be more than happy to help you test -- I took the liberty of trying the code and found that, at least on my side, launches do not seem to work across >1 nodes with plm ssh.

I haven't tested this over SSH at all yet, just Slurm, but I'm interested in trying over SSH for my own work as well so I'll likely take a look at this soon. Thanks for the info!

@Matthew-Whitlock
Copy link
Author

I'd hold off on testing and/or fixing things on this PR as I'm not convinced this is the path we should take.

Thanks for the notice. I'll focus my efforts on the resilient versions of the group operations for now; they seem like less controversial upgrades.

I'll attend the developers meeting on Thursday as well, but in general I have two 'complaints' about the ORCM approach.

  1. Reliability
    The ORCM model is based on caching messages, which is inherently probabilistic. Messages are probably available to replay, as long as messages pass through each node quickly enough - but with more messages going through the system, the time each message sits in the cache decreases and the time each message takes to pass through the next node increases. I don't doubt that it is usable, but without a stronger proof of reliability you're left with trying to overestimate parameters like ring buffer size and eviction times.
  2. Memory Scalability
    PRRTE's routing tree means nodes at the top of the tree see many more messages than nodes at the bottom. With P ranks, rank 0 has ~O(P) messages passing to/through it. As we begin to allow for higher failure rates, I expect that rank 0 would need a ring buffer sized ~O(P). Rank 1 would need roughly 1/64 of that, and so on. So we either estimate sizes based on the worst-case supported scenarios, or try heuristically calculating based on a rank's subtree size and the system's MTTF.

I'm not completely opposed to the approach, since without testing I'm not sure how impactful those two issues will actually be. However, I think an ACK-based approach is stronger, especially combined with the special handling on group operations.

Is your primary concern with the reliability of the routed ACKs? Don't forget that the internet is largely built on top of routed ACKs. The message->ACK->ACK-ACK process looks fairly similar to TCP's three-way handshake, actually.

@rhc54
Copy link
Contributor

rhc54 commented Sep 2, 2025

resilient versions of the group operations for now

I'd really rather not have separate reliability for group ops - it just complicates things. If the messaging is reliable, then that's all we need.

I don't doubt that it is usable, but without a stronger proof of reliability you're left with trying to overestimate parameters like ring buffer size and eviction times.

Correct - and that's just fine. This is not a high message environment - the whole point of an RTE is to be quiet. Remember, every cycle used by the RTE is being taken away from the application, so our aim is to be quiescent as much as possible. Thus, we can safely assume very few messages are flying around. I found that ring buffer sizes of 32 for the send and 2048 for the receive were more than enough on even very large systems. The large message buffers can be even smaller when we are talking about mpirun (as opposed to a system daemon). Eviction times for both message types are usually a few seconds at most.

PRRTE's routing tree means nodes at the top of the tree see many more messages than nodes at the bottom

They see more, but you seem to think there is a significant amount of messaging going on. There is not - we have one xcast to start the job, one all gather to share connection info, and a couple of messages/daemon to report state transitions. That's it, minus some IO forwarding.

So the daemon at the very top of the tree (the DVM master) sees all those messages, while a daemon at the very bottom only sees the xcast, the allgather, and sends its state transitions. Remember, only messages that are relayed go into the ring buffers, so all those state transition messages arriving at the DVM master never see the ring buffer as the master is the destination.

Is your primary concern with the reliability of the routed ACKs?

ACKs are known to be unreliable when passing along the same transport as the original message, especially when traversing relays. So yeah - I'm opposed to them, having been burned before. You can ack-ack-ack-ack all you want, but it doesn't change the fundamentals of the problem. In some scenarios (e.g., the Internet), you have no choice but to use it. However, in this scenario, we don't need to do so.

@Matthew-Whitlock
Copy link
Author

I'd really rather not have separate reliability for group ops - it just complicates things. If the messaging is reliable, then that's all we need.

I think it's important to have special handling for group ops on some level, at least. If rank 0 sends an xcast message to rank 1, and then rank 1 fails, it's important to be able to continue the broadcast to rank 1's children. If that's treated as a normal message, the xcast will worst-case never propagate to those subtrees and best case delay all messages down those subtrees until rank 1 comes back online (if it does).
In essence, some way to describe messages sent to 'ranks [1-64]' differently than those sent to 'my children, who are currently ranks [1-64]', and the same for parent-bound messages.

This is not a high message environment - the whole point of an RTE is to be quiet.

They see more, but you seem to think there is a significant amount of messaging going on.

I hear you, and I agree that the specifics of this application make it more amenable to the ring buffer design. I only think that it is not much more difficult to implement an approach that can give some guarantees, and I'm happily volunteering the time to develop it.

Fundamentally, though, I'm only seeking the functionality of continuing past node failures - the implementation details are interesting, but not as important. I can roll with whatever the decision ultimately is.

ACKs are known to be unreliable when passing along the same transport as the original message, especially when traversing relays. So yeah - I'm opposed to them, having been burned before. You can ack-ack-ack-ack all you want, but it doesn't change the fundamentals of the problem.

Can you describe a general or specific scenario that would break my proposed MSG->ACK->ACK-ACK design? I've convinced myself that this is all the ACKing we could ever need - no need to ACK-ACK-ACK or further. If there are some details I've left unclear, or there's some issue I've missed, I'd love to discuss it.

@rhc54
Copy link
Contributor

rhc54 commented Sep 2, 2025

If rank 0 sends an xcast message to rank 1, and then rank 1 fails

There does need to be some operational support, but not to necessarily make the op reliable beyond messaging. In the scenario you describe, rank 0 is supposed to resend the message to rank 1's children (I didn't deal with the immediate send failure in the doc). However, it needs to alter the message to indicate that it may not have been a successful operation - e.g., if some of the participants were clients of rank 1.

< implement an approach that can give some guarantees,

Multiple ways to do that - the ORCM approach, the multi-routing of duplicate messages, etc. I'm just not convinced that the use of ACKs provides any additional guarantee.

Can you describe a general or specific scenario that would break my proposed MSG->ACK->ACK-ACK design

There is no need to ACK a message between two directly connected procs, so we are talking about ACKing a message that flows through one or more relays. All it takes to break the design is to have one or more of those relays pass the original message, and then fail during the ACK sequence. Now the ACK never arrives back at the partner, implying that the message failed to be delivered - but it was. So after reconnection, we wind up resending a message that didn't require it, which means we have to check all inbound messages to see if they are duplicates, and discard the duplicates.

Does that still work? Well, yes - but what's the point? The ring buffer method would have detected the failure of the relay, and then checked receipts to find who was missing. This only occurs when a failure actually happens, and without imposing additional messaging burden on the system during normal operation.

@rhc54
Copy link
Contributor

rhc54 commented Sep 2, 2025

Bit of a correction. You can't send an updated status as there is no way to renotify the participants that already received it. So you have to complete the original xcast and then send event notifications to the remaining participants of the failure of those who died.

@bosilca
Copy link
Contributor

bosilca commented Sep 2, 2025

@rhc54 the algorithm described in the 2-pagers is not resilient, and certainly does not guarantee reliable messaging. It provides an optimistic wish-me-luck algorithm that works if the system is synchronous, balanced, runs very few communications and is mostly fault-free.

The counterproof is actually very simple. If you keep the messages across all layers, the higher a node is in the routing tree the more messages it shall keep, in a size restrained circular ring. In your example daemon A will need to track messages for basically everyone, while C is only responsible for H. As this can be highly imbalanced, the circular ring keeping track of messages at A could overload and evict a message before you have confirmation of its safe arrival. In your example, when G ask for messages from A there is no guarantee that A still has them, if there was significant traffic between the rest of the participants. Moreover, if everyone on the path keeps a copy of all messages in transit, the wasted memory can be significant as directly proportional to the ring size and the tree depth. Even with some form of logarithmic overlay, this is quite a significant amount.

There is plenty of literature on how to do reliable messaging (message logging or not), in and outside MPI, but such a discussion is way larger than the scope of this issue.

@rhc54
Copy link
Contributor

rhc54 commented Sep 2, 2025

Like I said, this method was designed to operate in the constraints of the system it was intended to support. Your comments reflect that - it does not work well for arbitrary environments that operate outside of the specified constraints. Harsh words aside, it worked (and continues to work) exceptionally well within the target environment. Can you make it fail? Of course - by simply violating the constraints. Do you need to experimentally determine the buffer size? Yes, some experimentation is sometimes required, but again, within the constraints of this environment, some general rules support the majority of uses.

@Matthew-Whitlock
Copy link
Author

Let me sum up the concerns raised about my ACK-based suggestion as I understand them.

1) Can routed, unreliable ACKS support reliability?

Based on this message, my understanding is that we now agree that this approach is viable.

There is no need to ACK a message between two directly connected procs, so we are talking about ACKing a message that flows through one or more relays. All it takes to break the design is to have one or more of those relays pass the original message, and then fail during the ACK sequence. Now the ACK never arrives back at the partner, implying that the message failed to be delivered - but it was. So after reconnection, we wind up resending a message that didn't require it, which means we have to check all inbound messages to see if they are duplicates, and discard the duplicates.

Does that still work? Well, yes

2) Does this approach offer stronger guarantees than ring buffers?

Based on this message, I think we can agree that ACKing offers stronger guarantees, even if we disagree about how strong a guarantee we need to have.

Like I said, this method was designed to operate in the constraints of the system it was intended to support. Your comments reflect that - it does not work well for arbitrary environments that operate outside of the specified constraints. Harsh words aside, it worked (and continues to work) exceptionally well within the target environment. Can you make it fail? Of course - by simply violating the constraints. Do you need to experimentally determine the buffer size? Yes, some experimentation is sometimes required, but again, within the constraints of this environment, some general rules support the majority of uses.

3) Is this approach too expensive?

Based on this message, I think we are (or were) in agreement that the additional costs are generally negligible for this use-case.

Ultimately, I leave it up to you as the implementer and only try to provide guidance/advice. I'm not concerned about performance burden as the runtime is on the management network and has no impact on MPI or other high-performance messaging. The sys admins will get concerned if they see too much traffic on the management net as it can interfere with their own communications (e.g., usage stats), but the runtime is generally quiet except for brief times when needed to support the app - generally at startup (when we wire the application procs together), during error reporting, and when supporting the occasional dynamic operation.

So I doubt your proposed method will raise any eyebrows 😄


Is this a correct understanding of where we stand on the above concerns? Are there any further concerns to discuss?

@rhc54
Copy link
Contributor

rhc54 commented Sep 2, 2025

Is this a correct understanding of where we stand on the above concerns? Are there any further concerns to discuss?

Errr...no, I would tend to disagree with those conclusions. I'm rather opposed to ACKs for all the reasons already stated and don't believe that's a desirable approach. We can discuss a bit on Thurs and kick around the options.

Signed-off-by: Matthew Whitlock <[email protected]>
@Matthew-Whitlock
Copy link
Author

@Petter-Programs I have fixed the issue you reported on my end, give it a try now

@Matthew-Whitlock
Copy link
Author

Matthew-Whitlock commented Sep 3, 2025

Does message ordering need to be strictly maintained? I'm glancing through the PMIx specification, and it seems like a fence is the only guaranteed way to order events from the client perspective. So if a client does something like this:

PMIx_Notify_event( ... );
PMIx_Get( ... PMIX_GET_REFRESH_CACHE ... );

I'm not seeing anything that would enforce that any data newly put/committed in the remote's event handler will be returned by the get. Since the put/commit are asynchronous, it seems like they may not have completed by the time the get is handled by the remote even if we have strict message ordering.

Is this a correct reading? I'm specifically wondering if it's necessary to promise the ordering of separate operations like xcasts and gets.

edit: Nevermind, of course those operations aren't ordered - if you start an xcast as non-HNP, then do a get for a rank below you in the tree, the get will necessarily arrive first.

@rhc54
Copy link
Contributor

rhc54 commented Sep 3, 2025

I'm not entirely sure I understand what you are saying in the prior comment. PMIx_Get doesn't get data from anyone - it simply retrieves info from the local datastore that was populated by its host daemon, usually as a result of a job launch or fence operation. It has nothing to do with where that proc sits in the routing tree relative to the proc whose data is being requested.

In other words, "get" is a purely local operation, except in one minor edge case.

@Matthew-Whitlock
Copy link
Author

Matthew-Whitlock commented Sep 3, 2025

Just caught that I was looking at a 2023 v5.0 "release candidate," rather than the 2024 v4.2 release

@rhc54
Copy link
Contributor

rhc54 commented Sep 3, 2025

That doesn't make any difference with respect to how "get" operates.

@Matthew-Whitlock
Copy link
Author

I see, looks like it's that edge case that I was looking at. That'd be when calling get on a non-reserved key with a remote target process when the host server implements pmix_server_dmodex_req_fn_t, right?

Just looking through the spec for what kinds of behavior are important.

@rhc54
Copy link
Contributor

rhc54 commented Sep 4, 2025

Correct - that's the edge case. However, it only occurs in a very specific instance - when the app doesn't include connection info in the fence operation. There are some scenarios where that can make wireup be faster (e.g., very sparse connectivity), but it isn't widely used. Still, it is something requiring support.

@Matthew-Whitlock
Copy link
Author

Brief notes on our discussion today:

  • Not entirely entirely settled on how useful the ACKs will be
  • However, the two approaches have a lot of overlap in their required functionality
  • I'm going to get the shared foundations up and running first, then proceed with the design I've proposed
    • It will (ideally) be relatively straightforward to convert to the ORCM approach if later desired

@rhc54
Copy link
Contributor

rhc54 commented Sep 4, 2025

Sounds reasonable. I'd suggest using the pmix_hotel class for your caches as it automatically includes timed evictions. If you use lists, then you have to search them for the item to remove - however, with the hotel, you can just add the room number to the message header so you can directly retrieve the message to be deleted. I'd also go ahead and drop the separate large vs small message cache and just do it all in one as we are treating messages the same way. Probably want a separate hotel for send and receive, I imagine.

@rhc54
Copy link
Contributor

rhc54 commented Sep 4, 2025

This is why you were seeing such long timeouts on node failure - the default keepalive times are set very high:

    // Wait for this amount of time before sending the first keepalive probe
    prte_oob_base.keepalive_time = 300;
    (void)pmix_mca_base_var_register("prte", "prte", NULL, "keepalive_time",
                                        "Idle time in seconds before starting to send keepalives (keepalive_time <= 0 disables "
                                        "keepalive functionality)",
                                        PMIX_MCA_BASE_VAR_TYPE_INT,
                                        &prte_oob_base.keepalive_time);

    // Resend keepalive probe every INT seconds
    prte_oob_base.keepalive_intvl = 20;
    (void) pmix_mca_base_var_register("prte", "prte", NULL, "keepalive_intvl",
                                        "Time between successive keepalive pings when peer has not responded, in seconds (ignored "
                                        "if keepalive_time <= 0)",
                                        PMIX_MCA_BASE_VAR_TYPE_INT,
                                        &prte_oob_base.keepalive_intvl);

    // After sending PR probes every INT seconds consider the connection dead
    prte_oob_base.keepalive_probes = 9;
    (void) pmix_mca_base_var_register("prte", "prte", NULL, "keepalive_probes",
                                        "Number of keepalives that can be missed before "
                                        "declaring error (ignored if keepalive_time <= 0)",
                                        PMIX_MCA_BASE_VAR_TYPE_INT,
                                        &prte_oob_base.keepalive_probes);

When running for fault recovery, you probably want to dial those down a bunch.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants