Skip to content

Commit 150b841

Browse files
feat(iroh)!: Emit mDNS expiry events (#3409)
## Description I'm working on a project where we want to have a list of peers that can be connected to without actually establishing a connection to each of them. The user can then later select a peer and we will establish a connection to the ones that are required. For this to work we need to be able to tell our frontend about the available peers on the network. Right now Iroh emits events when peers are discovered but it doesn't provide a mechanism to detect when those peers are no longer online so they remain in the frontend UI until the application is restarted. This PR implements a system for Iroh's discovery system to emit events when a peer is no-longer aviable. This is implemented into the core discovery system, however the only discovery mechanism that currently emits these are mDNS. This PR solves #3040. ## Breaking Changes All methods that previously retruned `DiscoveryItem` now return `DiscoveryEvent`. This includes the `Discovery` trait, `Endpoint::discovery_stream`, etc. ## Change checklist <!-- Remove any that are not relevant. --> - [x] Self-review. - [x] Documentation updates following the [style guide](https://rust-lang.github.io/rfcs/1574-more-api-documentation-conventions.html#appendix-a-full-conventions-text), if relevant. - [x] Tests if relevant. - [x] All breaking changes documented. - [x] List all breaking changes in the above "Breaking Changes" section.
1 parent f3da758 commit 150b841

File tree

5 files changed

+154
-57
lines changed

5 files changed

+154
-57
lines changed

iroh/examples/locally-discovered-nodes.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well.
66
use std::time::Duration;
77

8-
use iroh::{Endpoint, NodeId, node_info::UserData};
8+
use iroh::{Endpoint, NodeId, discovery::DiscoveryEvent, node_info::UserData};
99
use n0_future::StreamExt;
1010
use n0_snafu::Result;
1111
use tokio::task::JoinSet;
@@ -32,7 +32,7 @@ async fn main() -> Result<()> {
3232
tracing::error!("{e}");
3333
return;
3434
}
35-
Ok(item) => {
35+
Ok(DiscoveryEvent::Discovered(item)) => {
3636
// if there is no user data, or the user data
3737
// does not indicate that the discovered node
3838
// is a part of the example, ignore it
@@ -53,6 +53,7 @@ async fn main() -> Result<()> {
5353
println!("Found node {}!", item.node_id().fmt_short());
5454
}
5555
}
56+
Ok(DiscoveryEvent::Expired(_)) => {}
5657
};
5758
}
5859
});

iroh/src/discovery.rs

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -352,13 +352,23 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static {
352352
/// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system
353353
/// and add the discovered addresses to the internal address book as they arrive
354354
/// on this stream.
355-
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
355+
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
356356
None
357357
}
358358
}
359359

360360
impl<T: Discovery> Discovery for Arc<T> {}
361361

362+
/// An event emitted from [`Discovery`] services.
363+
#[derive(Debug, Clone, Eq, PartialEq)]
364+
pub enum DiscoveryEvent {
365+
/// A peer was discovered or it's information was updated.
366+
Discovered(DiscoveryItem),
367+
/// A peer was expired due to being inactive, unreachable, or otherwise
368+
/// unavailable.
369+
Expired(NodeId),
370+
}
371+
362372
/// Node discovery results from [`Discovery`] services.
363373
///
364374
/// This is the item in the streams returned from [`Discovery::resolve`] and
@@ -367,7 +377,7 @@ impl<T: Discovery> Discovery for Arc<T> {}
367377
///
368378
/// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`]
369379
/// directly from [`DiscoveryItem`].
370-
#[derive(Debug, Clone)]
380+
#[derive(Debug, Clone, Eq, PartialEq)]
371381
pub struct DiscoveryItem {
372382
/// The node info for the node, as discovered by the the discovery service.
373383
node_info: NodeInfo,
@@ -498,7 +508,7 @@ impl Discovery for ConcurrentDiscovery {
498508
Some(Box::pin(streams))
499509
}
500510

501-
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
511+
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
502512
let mut streams = vec![];
503513
for service in self.services.iter() {
504514
if let Some(stream) = service.subscribe() {
@@ -649,11 +659,13 @@ impl DiscoveryTask {
649659
}
650660
debug!(%provenance, addr = ?node_addr, "new address found");
651661
ep.add_node_addr_with_source(node_addr, provenance).ok();
662+
652663
if let Some(tx) = on_first_tx.take() {
653664
tx.send(Ok(())).ok();
654665
}
655666
// Send the discovery item to the subscribers of the discovery broadcast stream.
656-
ep.discovery_subscribers().send(r);
667+
ep.discovery_subscribers()
668+
.send(DiscoveryEvent::Discovered(r));
657669
}
658670
Some(Err(err)) => {
659671
warn!(?err, "discovery service produced error");
@@ -685,7 +697,7 @@ pub struct Lagged {
685697

686698
#[derive(Clone, Debug)]
687699
pub(super) struct DiscoverySubscribers {
688-
inner: tokio::sync::broadcast::Sender<DiscoveryItem>,
700+
inner: tokio::sync::broadcast::Sender<DiscoveryEvent>,
689701
}
690702

691703
impl DiscoverySubscribers {
@@ -699,13 +711,13 @@ impl DiscoverySubscribers {
699711
}
700712
}
701713

702-
pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryItem, Lagged>> + use<> {
714+
pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryEvent, Lagged>> + use<> {
703715
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
704716
let recv = self.inner.subscribe();
705717
BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n })
706718
}
707719

708-
pub(crate) fn send(&self, item: DiscoveryItem) {
720+
pub(crate) fn send(&self, item: DiscoveryEvent) {
709721
// `broadcast::Sender::send` returns an error if the channel has no subscribers,
710722
// which we don't care about.
711723
self.inner.send(item).ok();
@@ -737,7 +749,7 @@ mod tests {
737749
#[derive(Debug, Clone)]
738750
struct TestDiscoveryShared {
739751
nodes: Arc<Mutex<InfoStore>>,
740-
watchers: tokio::sync::broadcast::Sender<DiscoveryItem>,
752+
watchers: tokio::sync::broadcast::Sender<DiscoveryEvent>,
741753
}
742754

743755
impl Default for TestDiscoveryShared {
@@ -770,7 +782,7 @@ mod tests {
770782
}
771783
}
772784

773-
pub fn send_passive(&self, item: DiscoveryItem) {
785+
pub fn send_passive(&self, item: DiscoveryEvent) {
774786
self.watchers.send(item).ok();
775787
}
776788
}
@@ -831,7 +843,7 @@ mod tests {
831843
Some(stream)
832844
}
833845

834-
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
846+
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
835847
let recv = self.shared.watchers.subscribe();
836848
let stream =
837849
tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|item| item.ok());
@@ -1016,25 +1028,34 @@ mod tests {
10161028
ep2.node_addr().initialized().await;
10171029
let _ = ep1.connect(ep2.node_id(), TEST_ALPN).await?;
10181030

1019-
let item = tokio::time::timeout(Duration::from_secs(1), stream.next())
1020-
.await
1021-
.expect("timeout")
1022-
.expect("stream closed")
1023-
.expect("stream lagged");
1031+
let DiscoveryEvent::Discovered(item) =
1032+
tokio::time::timeout(Duration::from_secs(1), stream.next())
1033+
.await
1034+
.expect("timeout")
1035+
.expect("stream closed")
1036+
.expect("stream lagged")
1037+
else {
1038+
panic!("Returned unexpected discovery event!");
1039+
};
1040+
10241041
assert_eq!(item.node_id(), ep2.node_id());
10251042
assert_eq!(item.provenance(), "test-disco");
10261043

10271044
// inject item into discovery passively
10281045
let passive_node_id = SecretKey::generate(rand::thread_rng()).public();
10291046
let node_info = NodeInfo::new(passive_node_id);
10301047
let passive_item = DiscoveryItem::new(node_info, "test-disco-passive", None);
1031-
disco_shared.send_passive(passive_item.clone());
1032-
1033-
let item = tokio::time::timeout(Duration::from_secs(1), stream.next())
1034-
.await
1035-
.expect("timeout")
1036-
.expect("stream closed")
1037-
.expect("stream lagged");
1048+
disco_shared.send_passive(DiscoveryEvent::Discovered(passive_item.clone()));
1049+
1050+
let DiscoveryEvent::Discovered(item) =
1051+
tokio::time::timeout(Duration::from_secs(1), stream.next())
1052+
.await
1053+
.expect("timeout")
1054+
.expect("stream closed")
1055+
.expect("stream lagged")
1056+
else {
1057+
panic!("Returned unexpected discovery event!");
1058+
};
10381059
assert_eq!(item.node_id(), passive_node_id);
10391060
assert_eq!(item.provenance(), "test-disco-passive");
10401061

iroh/src/discovery/mdns.rs

Lines changed: 92 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use tokio::sync::mpsc::{self, error::TrySendError};
4848
use tracing::{Instrument, debug, error, info_span, trace, warn};
4949

5050
use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError};
51-
use crate::discovery::{Discovery, DiscoveryItem, NodeData, NodeInfo};
51+
use crate::discovery::{Discovery, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo};
5252

5353
/// The n0 local swarm node discovery name
5454
const N0_LOCAL_SWARM: &str = "iroh.local.swarm";
@@ -83,27 +83,27 @@ enum Message {
8383
Discovery(String, Peer),
8484
Resolve(NodeId, mpsc::Sender<Result<DiscoveryItem, DiscoveryError>>),
8585
Timeout(NodeId, usize),
86-
Subscribe(mpsc::Sender<DiscoveryItem>),
86+
Subscribe(mpsc::Sender<DiscoveryEvent>),
8787
}
8888

8989
/// Manages the list of subscribers that are subscribed to this discovery service.
9090
#[derive(Debug)]
91-
struct Subscribers(Vec<mpsc::Sender<DiscoveryItem>>);
91+
struct Subscribers(Vec<mpsc::Sender<DiscoveryEvent>>);
9292

9393
impl Subscribers {
9494
fn new() -> Self {
9595
Self(vec![])
9696
}
9797

9898
/// Add the subscriber to the list of subscribers
99-
fn push(&mut self, subscriber: mpsc::Sender<DiscoveryItem>) {
99+
fn push(&mut self, subscriber: mpsc::Sender<DiscoveryEvent>) {
100100
self.0.push(subscriber);
101101
}
102102

103103
/// Sends the `node_id` and `item` to each subscriber.
104104
///
105105
/// Cleans up any subscribers that have been dropped.
106-
fn send(&mut self, item: DiscoveryItem) {
106+
fn send(&mut self, item: DiscoveryEvent) {
107107
let mut clean_up = vec![];
108108
for (i, subscriber) in self.0.iter().enumerate() {
109109
// assume subscriber was dropped
@@ -234,6 +234,7 @@ impl MdnsDiscovery {
234234
error!("MdnsDiscovery channel closed");
235235
error!("closing MdnsDiscovery");
236236
timeouts.abort_all();
237+
discovery.remove_all();
237238
return;
238239
}
239240
Some(msg) => msg,
@@ -266,6 +267,7 @@ impl MdnsDiscovery {
266267
"removing node from MdnsDiscovery address book"
267268
);
268269
node_addrs.remove(&discovered_node_id);
270+
subscribers.send(DiscoveryEvent::Expired(discovered_node_id));
269271
continue;
270272
}
271273

@@ -298,7 +300,7 @@ impl MdnsDiscovery {
298300
// in other words, nodes sent to the `subscribers` should only be the ones that
299301
// have been "passively" discovered
300302
if !resolved {
301-
subscribers.send(item);
303+
subscribers.send(DiscoveryEvent::Discovered(item));
302304
}
303305
}
304306
Message::Resolve(node_id, sender) => {
@@ -346,8 +348,8 @@ impl MdnsDiscovery {
346348
let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor")));
347349
Ok(Self {
348350
handle: AbortOnDropHandle::new(handle),
349-
advertise,
350351
sender: send,
352+
advertise,
351353
local_addrs,
352354
})
353355
}
@@ -450,7 +452,7 @@ impl Discovery for MdnsDiscovery {
450452
}
451453
}
452454

453-
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
455+
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
454456
use futures_util::FutureExt;
455457

456458
let (sender, recv) = mpsc::channel(20);
@@ -490,29 +492,99 @@ mod tests {
490492
let user_data: UserData = "foobar".parse()?;
491493
let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()]))
492494
.with_user_data(Some(user_data.clone()));
493-
println!("info {node_data:?}");
494495

495496
// resolve twice to ensure we can create separate streams for the same node_id
496-
let mut s1 = discovery_a.resolve(node_id_b).unwrap();
497-
let mut s2 = discovery_a.resolve(node_id_b).unwrap();
497+
let mut s1 = discovery_a
498+
.subscribe()
499+
.unwrap()
500+
.filter(|event| match event {
501+
DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b,
502+
_ => false,
503+
});
504+
let mut s2 = discovery_a
505+
.subscribe()
506+
.unwrap()
507+
.filter(|event| match event {
508+
DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b,
509+
_ => false,
510+
});
498511

499512
tracing::debug!(?node_id_b, "Discovering node id b");
500513
// publish discovery_b's address
501514
discovery_b.publish(&node_data);
502-
let s1_res = tokio::time::timeout(Duration::from_secs(5), s1.next())
503-
.await
504-
.context("timeout")?
505-
.unwrap()?;
506-
let s2_res = tokio::time::timeout(Duration::from_secs(5), s2.next())
507-
.await
508-
.context("timeout")?
509-
.unwrap()?;
515+
let DiscoveryEvent::Discovered(s1_res) =
516+
tokio::time::timeout(Duration::from_secs(5), s1.next())
517+
.await
518+
.context("timeout")?
519+
.unwrap()
520+
else {
521+
panic!("Received unexpected discovery event");
522+
};
523+
let DiscoveryEvent::Discovered(s2_res) =
524+
tokio::time::timeout(Duration::from_secs(5), s2.next())
525+
.await
526+
.context("timeout")?
527+
.unwrap()
528+
else {
529+
panic!("Received unexpected discovery event");
530+
};
510531
assert_eq!(s1_res.node_info().data, node_data);
511532
assert_eq!(s2_res.node_info().data, node_data);
512533

513534
Ok(())
514535
}
515536

537+
#[tokio::test]
538+
#[traced_test]
539+
async fn mdns_publish_expire() -> Result {
540+
let (_, discovery_a) = make_discoverer(false)?;
541+
let (node_id_b, discovery_b) = make_discoverer(true)?;
542+
543+
// publish discovery_b's address
544+
let node_data = NodeData::new(None, BTreeSet::from(["0.0.0.0:11111".parse().unwrap()]))
545+
.with_user_data(Some("".parse()?));
546+
discovery_b.publish(&node_data);
547+
548+
let mut s1 = discovery_a.subscribe().unwrap();
549+
tracing::debug!(?node_id_b, "Discovering node id b");
550+
551+
// Wait for the specific node to be discovered
552+
loop {
553+
let event = tokio::time::timeout(Duration::from_secs(5), s1.next())
554+
.await
555+
.context("timeout")?
556+
.expect("Stream should not be closed");
557+
558+
match event {
559+
DiscoveryEvent::Discovered(item) if item.node_info().node_id == node_id_b => {
560+
break;
561+
}
562+
_ => continue, // Ignore other discovery events
563+
}
564+
}
565+
566+
// Shutdown node B
567+
drop(discovery_b);
568+
tokio::time::sleep(Duration::from_secs(5)).await;
569+
570+
// Wait for the expiration event for the specific node
571+
loop {
572+
let event = tokio::time::timeout(Duration::from_secs(10), s1.next())
573+
.await
574+
.context("timeout waiting for expiration event")?
575+
.expect("Stream should not be closed");
576+
577+
match event {
578+
DiscoveryEvent::Expired(expired_node_id) if expired_node_id == node_id_b => {
579+
break;
580+
}
581+
_ => continue, // Ignore other events
582+
}
583+
}
584+
585+
Ok(())
586+
}
587+
516588
#[tokio::test]
517589
#[traced_test]
518590
async fn mdns_subscribe() -> Result {
@@ -537,7 +609,7 @@ mod tests {
537609
let test = async move {
538610
let mut got_ids = BTreeSet::new();
539611
while got_ids.len() != num_nodes {
540-
if let Some(item) = events.next().await {
612+
if let Some(DiscoveryEvent::Discovered(item)) = events.next().await {
541613
if node_ids.contains(&(item.node_id(), item.user_data())) {
542614
got_ids.insert((item.node_id(), item.user_data()));
543615
}

0 commit comments

Comments
 (0)