@@ -292,25 +292,29 @@ impl MagicSock {
292
292
}
293
293
294
294
pub ( crate ) fn register_connection ( & self , remote : NodeId , conn : & quinn:: Connection ) {
295
+ debug ! ( %remote, "register connection" ) ;
295
296
let weak_handle = conn. weak_handle ( ) ;
296
297
self . connection_map . insert ( remote, weak_handle) ;
297
298
298
299
// TODO: track task
299
300
// TODO: find a good home for this
300
301
let mut path_events = conn. path_events ( ) ;
301
- let _task = task:: spawn ( async move {
302
- loop {
303
- match path_events. recv ( ) . await {
304
- Ok ( event) => {
305
- info ! ( remote = %remote, "path event: {:?}" , event) ;
306
- }
307
- Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( _) ) => {
308
- warn ! ( "lagged path events" ) ;
302
+ let _task = task:: spawn (
303
+ async move {
304
+ loop {
305
+ match path_events. recv ( ) . await {
306
+ Ok ( event) => {
307
+ info ! ( remote = %remote, "path event: {:?}" , event) ;
308
+ }
309
+ Err ( tokio:: sync:: broadcast:: error:: RecvError :: Lagged ( _) ) => {
310
+ warn ! ( "lagged path events" ) ;
311
+ }
312
+ Err ( tokio:: sync:: broadcast:: error:: RecvError :: Closed ) => break ,
309
313
}
310
- Err ( tokio:: sync:: broadcast:: error:: RecvError :: Closed ) => break ,
311
314
}
312
315
}
313
- } ) ;
316
+ . instrument ( info_span ! ( "path events" , %remote) ) ,
317
+ ) ;
314
318
315
319
// open additional paths
316
320
if let Some ( addr) = self . node_map . get_current_addr ( remote) {
@@ -474,43 +478,51 @@ impl MagicSock {
474
478
for addr in addr. direct_addresses ( ) {
475
479
let conn = conn. clone ( ) ;
476
480
let addr = * addr;
477
- task:: spawn ( async move {
478
- match conn
479
- . open_path_ensure ( addr, quinn_proto:: PathStatus :: Available )
480
- . await
481
- {
482
- Ok ( path) => {
483
- path. set_max_idle_timeout ( Some (
484
- ENDPOINTS_FRESH_ENOUGH_DURATION ,
485
- ) )
486
- . ok ( ) ;
487
- path. set_keep_alive_interval ( Some ( HEARTBEAT_INTERVAL ) ) . ok ( ) ;
488
- }
489
- Err ( err) => {
490
- warn ! ( "failed to open path {:?}" , err) ;
481
+ task:: spawn (
482
+ async move {
483
+ debug ! ( %addr, "open path IP" ) ;
484
+ match conn
485
+ . open_path_ensure ( addr, quinn_proto:: PathStatus :: Available )
486
+ . await
487
+ {
488
+ Ok ( path) => {
489
+ path. set_max_idle_timeout ( Some (
490
+ ENDPOINTS_FRESH_ENOUGH_DURATION ,
491
+ ) )
492
+ . ok ( ) ;
493
+ path. set_keep_alive_interval ( Some ( HEARTBEAT_INTERVAL ) ) . ok ( ) ;
494
+ }
495
+ Err ( err) => {
496
+ warn ! ( "failed to open path {:?}" , err) ;
497
+ }
491
498
}
492
499
}
493
- } ) ;
500
+ . instrument ( info_span ! ( "open path IP" ) ) ,
501
+ ) ;
494
502
}
495
503
// Insert the relay addr
496
504
if let Some ( addr) = self . get_mapping_addr ( addr. node_id ) {
497
505
let conn = conn. clone ( ) ;
498
506
let addr = addr. private_socket_addr ( ) ;
499
- task:: spawn ( async move {
500
- match conn
501
- . open_path_ensure ( addr, quinn_proto:: PathStatus :: Backup )
502
- . await
503
- {
504
- Ok ( path) => {
505
- // Keep the relay path open
506
- path. set_max_idle_timeout ( None ) . ok ( ) ;
507
- path. set_keep_alive_interval ( None ) . ok ( ) ;
508
- }
509
- Err ( err) => {
510
- warn ! ( "failed to open path {:?}" , err) ;
507
+ task:: spawn (
508
+ async move {
509
+ debug ! ( %addr, "open path relay" ) ;
510
+ match conn
511
+ . open_path_ensure ( addr, quinn_proto:: PathStatus :: Backup )
512
+ . await
513
+ {
514
+ Ok ( path) => {
515
+ // Keep the relay path open
516
+ path. set_max_idle_timeout ( None ) . ok ( ) ;
517
+ path. set_keep_alive_interval ( None ) . ok ( ) ;
518
+ }
519
+ Err ( err) => {
520
+ warn ! ( "failed to open path {:?}" , err) ;
521
+ }
511
522
}
512
523
}
513
- } ) ;
524
+ . instrument ( info_span ! ( "open path relay" ) ) ,
525
+ ) ;
514
526
}
515
527
} else {
516
528
to_delete. push ( i) ;
@@ -644,19 +656,21 @@ impl MagicSock {
644
656
self . ipv6_reported . load ( Ordering :: Relaxed ) ,
645
657
& self . metrics . magicsock ,
646
658
) {
647
- Some ( ( node_id, udp_addr , relay_url , ping_actions) ) => {
659
+ Some ( ( node_id, _udp_addr , _relay_url , ping_actions) ) => {
648
660
if !ping_actions. is_empty ( ) {
649
661
self . actor_sender
650
662
. try_send ( ActorMessage :: PingActions ( ping_actions) )
651
663
. ok ( ) ;
652
664
}
653
- // Mixed will send all available addrs
654
665
655
- if let Some ( url) = relay_url {
656
- active_paths. push ( transports:: Addr :: Relay ( url, node_id) ) ;
657
- }
658
- if let Some ( addr) = udp_addr {
659
- active_paths. push ( transports:: Addr :: Ip ( addr) ) ;
666
+ if let Some ( addr) = self . node_map . get_current_addr ( node_id) {
667
+ // Mixed will send all available addrs
668
+ if let Some ( ref url) = addr. relay_url {
669
+ active_paths. push ( transports:: Addr :: Relay ( url. clone ( ) , node_id) ) ;
670
+ }
671
+ for ip in addr. direct_addresses ( ) {
672
+ active_paths. push ( transports:: Addr :: Ip ( * ip) ) ;
673
+ }
660
674
}
661
675
}
662
676
None => {
@@ -3204,18 +3218,18 @@ mod tests {
3204
3218
let _accept_task = AbortOnDropHandle :: new ( accept_task) ;
3205
3219
3206
3220
// Add an empty entry in the NodeMap of ep_1
3207
- msock_1
3208
- . add_node_addr (
3209
- NodeAddr {
3210
- node_id : node_id_2 ,
3211
- relay_url : None ,
3212
- direct_addresses : Default :: default ( ) ,
3213
- } ,
3214
- Source :: NamedApp {
3215
- name : "test" . into ( ) ,
3216
- } ,
3217
- )
3218
- . unwrap ( ) ;
3221
+ msock_1. node_map . add_node_addr (
3222
+ NodeAddr {
3223
+ node_id : node_id_2 ,
3224
+ relay_url : None ,
3225
+ direct_addresses : Default :: default ( ) ,
3226
+ } ,
3227
+ Source :: NamedApp {
3228
+ name : "test" . into ( ) ,
3229
+ } ,
3230
+ & msock_1 . metrics . magicsock ,
3231
+ ) ;
3232
+
3219
3233
let addr_2 = msock_1. get_mapping_addr ( node_id_2) . unwrap ( ) ;
3220
3234
3221
3235
// Set a low max_idle_timeout so quinn gives up on this quickly and our test does
0 commit comments