Skip to content

Commit a78c778

Browse files
Initial fault tolerance commit
1 parent 2278a71 commit a78c778

28 files changed

+2191
-555
lines changed

src/mca/errmgr/dvm/errmgr_dvm.c

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ static void proc_errors(int fd, short args, void *cbdata)
278278
/* remove from dependent routes, if it is one */
279279
prte_rml_route_lost(proc->rank);
280280
/* if all my routes and local children are gone, then terminate ourselves */
281-
if (0 == pmix_list_get_size(&prte_rml_base.children)) {
281+
if (0 == prte_rml_base.n_children) {
282282
for (i = 0; i < prte_local_children->size; i++) {
283283
proct = (prte_proc_t *) pmix_pointer_array_get_item(prte_local_children, i);
284284
if (NULL != proct &&
@@ -301,7 +301,7 @@ static void proc_errors(int fd, short args, void *cbdata)
301301
PMIX_OUTPUT_VERBOSE((5, prte_errmgr_base_framework.framework_output,
302302
"%s Comm failure: %d routes remain alive",
303303
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
304-
(int) pmix_list_get_size(&prte_rml_base.children)));
304+
prte_rml_base.n_children));
305305
}
306306
goto cleanup;
307307
}
@@ -316,6 +316,10 @@ static void proc_errors(int fd, short args, void *cbdata)
316316
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME), prte_process_info.nodename,
317317
PRTE_NAME_PRINT(proc), pptr->node->name);
318318
}
319+
320+
if(PRTE_SUCCESS == prte_rml_route_lost(proc->rank))
321+
goto cleanup;
322+
319323
/* mark the daemon job as failed */
320324
jdata->state = PRTE_JOB_STATE_COMM_FAILED;
321325
/* point to the lowest rank to cause the problem */
@@ -364,7 +368,7 @@ static void proc_errors(int fd, short args, void *cbdata)
364368
}
365369
/* if all my routes and children are gone, then terminate
366370
ourselves nicely (i.e., this is a normal termination) */
367-
if (0 == pmix_list_get_size(&prte_rml_base.children)) {
371+
if (0 == prte_rml_base.n_children) {
368372
PMIX_OUTPUT_VERBOSE((2, prte_errmgr_base_framework.framework_output,
369373
"%s errmgr:default:dvm all routes gone - exiting",
370374
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));

src/mca/errmgr/prted/errmgr_prted.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ static void proc_errors(int fd, short args, void *cbdata)
435435
}
436436
/* if all my routes and children are gone, then terminate
437437
ourselves nicely (i.e., this is a normal termination) */
438-
if (0 == pmix_list_get_size(&prte_rml_base.children)) {
438+
if (0 == prte_rml_base.n_children) {
439439
PMIX_OUTPUT_VERBOSE((2, prte_errmgr_base_framework.framework_output,
440440
"%s errmgr:default:prted all routes gone - exiting",
441441
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
@@ -444,7 +444,7 @@ static void proc_errors(int fd, short args, void *cbdata)
444444
PMIX_OUTPUT_VERBOSE((2, prte_errmgr_base_framework.framework_output,
445445
"%s errmgr:default:prted not exiting, num_routes() == %d",
446446
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME),
447-
(int) pmix_list_get_size(&prte_rml_base.children)));
447+
prte_rml_base.n_children));
448448
}
449449
}
450450
/* if not, then we can continue */
@@ -627,7 +627,7 @@ static void proc_errors(int fd, short args, void *cbdata)
627627
}
628628
/* if all my routes and children are gone, then terminate
629629
ourselves nicely (i.e., this is a normal termination) */
630-
if (0 == pmix_list_get_size(&prte_rml_base.children)) {
630+
if (0 == prte_rml_base.n_children) {
631631
PMIX_OUTPUT_VERBOSE((2, prte_errmgr_base_framework.framework_output,
632632
"%s errmgr:default:prted all routes gone - exiting",
633633
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));

src/mca/filem/filem.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ typedef int (*prte_filem_base_module_init_fn_t)(void);
209209
*/
210210
typedef int (*prte_filem_base_module_finalize_fn_t)(void);
211211

212+
/**
213+
* Recovers filem operations from given faults or else activates relevant job
214+
* failure state.
215+
*/
216+
typedef void (*prte_filem_base_fault_handler_fn_t)(const prte_rml_recovery_status_t *status);
217+
212218
/**
213219
* Put a file or directory on the remote machine
214220
*
@@ -350,6 +356,9 @@ struct prte_filem_base_module_1_0_0_t {
350356
/** Finalization Function */
351357
prte_filem_base_module_finalize_fn_t filem_finalize;
352358

359+
/** Respond to daemon failures */
360+
prte_filem_base_fault_handler_fn_t fault_handler;
361+
353362
/** Put a file on the remote machine */
354363
prte_filem_base_put_fn_t put;
355364
prte_filem_base_put_nb_fn_t put_nb;

src/mca/filem/raw/filem_raw_module.c

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,12 +63,14 @@
6363

6464
static int raw_init(void);
6565
static int raw_finalize(void);
66+
static void raw_fault_handler(const prte_rml_recovery_status_t *status);
6667
static int raw_preposition_files(prte_job_t *jdata, prte_filem_completion_cbfunc_t cbfunc,
6768
void *cbdata);
6869
static int raw_link_local_files(prte_job_t *jdata, prte_app_context_t *app);
6970

7071
prte_filem_base_module_t prte_filem_raw_module = {.filem_init = raw_init,
7172
.filem_finalize = raw_finalize,
73+
.fault_handler = raw_fault_handler,
7274
/* we don't use any of the following */
7375
.put = prte_filem_base_none_put,
7476
.put_nb = prte_filem_base_none_put_nb,
@@ -135,6 +137,20 @@ static int raw_finalize(void)
135137
return PRTE_SUCCESS;
136138
}
137139

140+
static void raw_fault_handler(const prte_rml_recovery_status_t *status){
141+
PRTE_HIDE_UNUSED_PARAMS(status);
142+
/* TODO: Make this actually resilient. Seems pretty trivial, since xcast is
143+
* already resilient */
144+
if (0 < pmix_list_get_size(&incoming_files) ||
145+
0 < pmix_list_get_size(&outbound_files)) {
146+
PMIX_OUTPUT_VERBOSE((0, prte_filem_base_framework.framework_output,
147+
"%s filem:raw daemon failed during active file"
148+
" transfer operation(s)",
149+
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
150+
PRTE_ACTIVATE_JOB_STATE(NULL, PRTE_JOB_STATE_COMM_FAILED);
151+
}
152+
}
153+
138154
static void xfer_complete(int status, prte_filem_raw_xfer_t *xfer)
139155
{
140156
prte_filem_raw_outbound_t *outbound = xfer->outbound;

src/mca/grpcomm/direct/grpcomm_direct.c

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@
4040
/* Static API's */
4141
static int init(void);
4242
static void finalize(void);
43+
static void fault_handler(const prte_rml_recovery_status_t* status);
4344

4445
/* Module def */
4546
prte_grpcomm_base_module_t prte_grpcomm_direct_module = {
4647
.init = init,
4748
.finalize = finalize,
49+
.fault_handler = fault_handler,
4850
.xcast = prte_grpcomm_direct_xcast,
4951
.fence = prte_grpcomm_direct_fence,
5052
.group = prte_grpcomm_direct_group
@@ -56,12 +58,16 @@ prte_grpcomm_base_module_t prte_grpcomm_direct_module = {
5658
static int init(void)
5759
{
5860
/* setup the trackers */
61+
PMIX_CONSTRUCT(&prte_mca_grpcomm_direct_component.xcast_ops,
62+
prte_grpcomm_xcast_t);
5963
PMIX_CONSTRUCT(&prte_mca_grpcomm_direct_component.fence_ops, pmix_list_t);
6064
PMIX_CONSTRUCT(&prte_mca_grpcomm_direct_component.group_ops, pmix_list_t);
6165

62-
/* xcast receive */
66+
/* xcast receives */
6367
PRTE_RML_RECV(PRTE_NAME_WILDCARD, PRTE_RML_TAG_XCAST,
6468
PRTE_RML_PERSISTENT, prte_grpcomm_direct_xcast_recv, NULL);
69+
PRTE_RML_RECV(PRTE_NAME_WILDCARD, PRTE_RML_TAG_XCAST_ACK,
70+
PRTE_RML_PERSISTENT, prte_grpcomm_direct_xcast_ack, NULL);
6571

6672
/* fence receives */
6773
PRTE_RML_RECV(PRTE_NAME_WILDCARD, PRTE_RML_TAG_FENCE,
@@ -86,14 +92,22 @@ static int init(void)
8692
*/
8793
static void finalize(void)
8894
{
89-
95+
PMIX_DESTRUCT(&prte_mca_grpcomm_direct_component.xcast_ops);
9096
PMIX_LIST_DESTRUCT(&prte_mca_grpcomm_direct_component.fence_ops);
9197
PMIX_LIST_DESTRUCT(&prte_mca_grpcomm_direct_component.group_ops);
9298

9399
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_XCAST);
100+
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_XCAST_ACK);
94101
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_FENCE);
95102
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_FENCE_RELEASE);
96103
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_GROUP);
97104
PRTE_RML_CANCEL(PRTE_NAME_WILDCARD, PRTE_RML_TAG_GROUP_RELEASE);
98105
return;
99106
}
107+
108+
static void fault_handler(const prte_rml_recovery_status_t* status)
109+
{
110+
prte_grpcomm_direct_xcast_fault_handler(status);
111+
prte_grpcomm_direct_fence_fault_handler(status);
112+
prte_grpcomm_direct_group_fault_handler(status);
113+
}

src/mca/grpcomm/direct/grpcomm_direct.h

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,37 @@
2121

2222
BEGIN_C_DECLS
2323

24+
/* Tracks ongoing xcast operations to ensure all messages are delivered exactly
25+
* once to all daemons even in the presence of daemon failures */
26+
typedef struct {
27+
pmix_object_t super;
28+
// list of ongoing operations, defined in grpcomm_direct_xcast.c
29+
pmix_list_t ops;
30+
// list of operations sent to HNP to be started, but not seen since
31+
pmix_list_t pending_ops;
32+
// global op id of the last known completed (in our subtree) operation
33+
size_t op_id_completed;
34+
// global op id of what was completed (in our subtree) when we were last
35+
// promoted (meaning our subtree grew, so we can't assume completion in the
36+
// new subtree)
37+
size_t op_id_completed_at_promotion;
38+
// local op id of the last op generated here
39+
size_t op_id_local;
40+
// used by HNP to assign global op id
41+
size_t op_id_global;
42+
} prte_grpcomm_xcast_t;
43+
PRTE_MODULE_EXPORT PMIX_CLASS_DECLARATION(prte_grpcomm_xcast_t);
44+
2445
/*
2546
* Grpcomm interfaces
2647
*/
27-
2848
typedef struct {
29-
prte_grpcomm_base_component_t super;
30-
// track ongoing fence operations - list of prte_grpcomm_fence_t
31-
pmix_list_t fence_ops;
32-
// track ongoiong group operations - list of prte_grpcomm_group_t
33-
pmix_list_t group_ops;
49+
prte_grpcomm_base_component_t super;
50+
prte_grpcomm_xcast_t xcast_ops;
51+
// track ongoing fence operations - list of prte_grpcomm_fence_t
52+
pmix_list_t fence_ops;
53+
// track ongoiong group operations - list of prte_grpcomm_group_t
54+
pmix_list_t group_ops;
3455
} prte_grpcomm_direct_component_t;
3556

3657
PRTE_MODULE_EXPORT extern prte_grpcomm_direct_component_t prte_mca_grpcomm_direct_component;
@@ -65,7 +86,6 @@ typedef struct {
6586
} prte_grpcomm_direct_group_signature_t;
6687
PRTE_MODULE_EXPORT PMIX_CLASS_DECLARATION(prte_grpcomm_direct_group_signature_t);
6788

68-
6989
/* Internal component object for tracking ongoing
7090
* allgather operations */
7191
typedef struct {
@@ -169,6 +189,14 @@ void prte_grpcomm_direct_xcast_recv(int status, pmix_proc_t *sender,
169189
pmix_data_buffer_t *buffer,
170190
prte_rml_tag_t tg, void *cbdata);
171191

192+
PRTE_MODULE_EXPORT extern
193+
void prte_grpcomm_direct_xcast_ack(int status, pmix_proc_t *sender,
194+
pmix_data_buffer_t *buffer,
195+
prte_rml_tag_t tg, void *cbdata);
196+
197+
PRTE_MODULE_EXPORT extern
198+
void prte_grpcomm_direct_xcast_fault_handler(const prte_rml_recovery_status_t* status);
199+
172200
/* fence functions */
173201
PRTE_MODULE_EXPORT extern
174202
int prte_grpcomm_direct_fence(const pmix_proc_t procs[], size_t nprocs,
@@ -185,6 +213,8 @@ void prte_grpcomm_direct_fence_release(int status, pmix_proc_t *sender,
185213
pmix_data_buffer_t *buffer,
186214
prte_rml_tag_t tag, void *cbdata);
187215

216+
PRTE_MODULE_EXPORT extern
217+
void prte_grpcomm_direct_fence_fault_handler(const prte_rml_recovery_status_t* status);
188218

189219
/* group functions */
190220
PRTE_MODULE_EXPORT extern
@@ -193,6 +223,9 @@ int prte_grpcomm_direct_group(pmix_group_operation_t op, char *grpid,
193223
const pmix_info_t directives[], size_t ndirs,
194224
pmix_info_cbfunc_t cbfunc, void *cbdata);
195225

226+
PRTE_MODULE_EXPORT extern
227+
void prte_grpcomm_direct_group_fault_handler(const prte_rml_recovery_status_t* status);
228+
196229
#if PMIX_NUMERIC_VERSION >= 0x00060000
197230

198231
PRTE_MODULE_EXPORT extern

src/mca/grpcomm/direct/grpcomm_direct_component.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,6 @@ PMIX_CLASS_INSTANCE(prte_grpcomm_group_t,
162162
pmix_list_item_t,
163163
gccon, gcdes);
164164

165-
166165
static void mdcon(prte_pmix_fence_caddy_t *p)
167166
{
168167
p->sig = NULL;

src/mca/grpcomm/direct/grpcomm_direct_fence.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,20 @@ int prte_grpcomm_direct_fence(const pmix_proc_t procs[], size_t nprocs,
8181
return PRTE_SUCCESS;
8282
}
8383

84+
void prte_grpcomm_direct_fence_fault_handler(const prte_rml_recovery_status_t* status)
85+
{
86+
PRTE_HIDE_UNUSED_PARAMS(status);
87+
/* TODO: make this actually resilient
88+
* For now, we'll just kill the job if any ops are active */
89+
if(0 < pmix_list_get_size(&prte_mca_grpcomm_direct_component.fence_ops)){
90+
PMIX_OUTPUT_VERBOSE((0, prte_grpcomm_base_framework.framework_output,
91+
"%s grpcomm:direct:fence daemon failed during"
92+
" active fence operation(s)",
93+
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
94+
PRTE_ACTIVATE_JOB_STATE(NULL, PRTE_JOB_STATE_COMM_FAILED);
95+
}
96+
}
97+
8498
static void fence(int sd, short args, void *cbdata)
8599
{
86100
prte_pmix_fence_caddy_t *cd = (prte_pmix_fence_caddy_t *) cbdata;

src/mca/grpcomm/direct/grpcomm_direct_group.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,21 @@ int prte_grpcomm_direct_group(pmix_group_operation_t op, char *grpid,
9595
return PRTE_SUCCESS;
9696
}
9797

98+
void prte_grpcomm_direct_group_fault_handler(const prte_rml_recovery_status_t* status)
99+
{
100+
PRTE_HIDE_UNUSED_PARAMS(status);
101+
/* TODO: make this actually resilient
102+
* For now, we'll just kill the job if any ops are active */
103+
if(0 < pmix_list_get_size(&prte_mca_grpcomm_direct_component.group_ops)){
104+
PMIX_OUTPUT_VERBOSE((0, prte_grpcomm_base_framework.framework_output,
105+
"%s grpcomm:direct:group daemon failed during"
106+
" active group operation(s)",
107+
PRTE_NAME_PRINT(PRTE_PROC_MY_NAME)));
108+
PRTE_ACTIVATE_JOB_STATE(NULL, PRTE_JOB_STATE_COMM_FAILED);
109+
}
110+
}
111+
112+
98113
static void group(int sd, short args, void *cbdata)
99114
{
100115
prte_pmix_grp_caddy_t *cd = (prte_pmix_grp_caddy_t*)cbdata;

0 commit comments

Comments
 (0)