Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions iroh/examples/locally-discovered-nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
//! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well.
use std::time::Duration;

use iroh::{Endpoint, NodeId, node_info::UserData};
use iroh::{Endpoint, NodeId, discovery::DiscoveryEvent, node_info::UserData};
use n0_future::StreamExt;
use n0_snafu::Result;
use tokio::task::JoinSet;
Expand All @@ -32,7 +32,7 @@ async fn main() -> Result<()> {
tracing::error!("{e}");
return;
}
Ok(item) => {
Ok(DiscoveryEvent::Discovered(item)) => {
// if there is no user data, or the user data
// does not indicate that the discovered node
// is a part of the example, ignore it
Expand All @@ -53,6 +53,7 @@ async fn main() -> Result<()> {
println!("Found node {}!", item.node_id().fmt_short());
}
}
Ok(DiscoveryEvent::Expired(_)) => {}
};
}
});
Expand Down
65 changes: 43 additions & 22 deletions iroh/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,13 +352,23 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static {
/// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system
/// and add the discovered addresses to the internal address book as they arrive
/// on this stream.
fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
None
}
}

impl<T: Discovery> Discovery for Arc<T> {}

/// An event emitted from [`Discovery`] services.
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum DiscoveryEvent {
/// A peer was discovered or it's information was updated.
Discovered(DiscoveryItem),
/// A peer was expired due to being inactive, unreachable, or otherwise
/// unavailable.
Expired(NodeId),
}

/// Node discovery results from [`Discovery`] services.
///
/// This is the item in the streams returned from [`Discovery::resolve`] and
Expand All @@ -367,7 +377,7 @@ impl<T: Discovery> Discovery for Arc<T> {}
///
/// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`]
/// directly from [`DiscoveryItem`].
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct DiscoveryItem {
/// The node info for the node, as discovered by the the discovery service.
node_info: NodeInfo,
Expand Down Expand Up @@ -498,7 +508,7 @@ impl Discovery for ConcurrentDiscovery {
Some(Box::pin(streams))
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
let mut streams = vec![];
for service in self.services.iter() {
if let Some(stream) = service.subscribe() {
Expand Down Expand Up @@ -649,11 +659,13 @@ impl DiscoveryTask {
}
debug!(%provenance, addr = ?node_addr, "new address found");
ep.add_node_addr_with_source(node_addr, provenance).ok();

if let Some(tx) = on_first_tx.take() {
tx.send(Ok(())).ok();
}
// Send the discovery item to the subscribers of the discovery broadcast stream.
ep.discovery_subscribers().send(r);
ep.discovery_subscribers()
.send(DiscoveryEvent::Discovered(r));
}
Some(Err(err)) => {
warn!(?err, "discovery service produced error");
Expand Down Expand Up @@ -685,7 +697,7 @@ pub struct Lagged {

#[derive(Clone, Debug)]
pub(super) struct DiscoverySubscribers {
inner: tokio::sync::broadcast::Sender<DiscoveryItem>,
inner: tokio::sync::broadcast::Sender<DiscoveryEvent>,
}

impl DiscoverySubscribers {
Expand All @@ -699,13 +711,13 @@ impl DiscoverySubscribers {
}
}

pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryItem, Lagged>> + use<> {
pub(crate) fn subscribe(&self) -> impl Stream<Item = Result<DiscoveryEvent, Lagged>> + use<> {
use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError};
let recv = self.inner.subscribe();
BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n })
}

pub(crate) fn send(&self, item: DiscoveryItem) {
pub(crate) fn send(&self, item: DiscoveryEvent) {
// `broadcast::Sender::send` returns an error if the channel has no subscribers,
// which we don't care about.
self.inner.send(item).ok();
Expand Down Expand Up @@ -737,7 +749,7 @@ mod tests {
#[derive(Debug, Clone)]
struct TestDiscoveryShared {
nodes: Arc<Mutex<InfoStore>>,
watchers: tokio::sync::broadcast::Sender<DiscoveryItem>,
watchers: tokio::sync::broadcast::Sender<DiscoveryEvent>,
}

impl Default for TestDiscoveryShared {
Expand Down Expand Up @@ -770,7 +782,7 @@ mod tests {
}
}

pub fn send_passive(&self, item: DiscoveryItem) {
pub fn send_passive(&self, item: DiscoveryEvent) {
self.watchers.send(item).ok();
}
}
Expand Down Expand Up @@ -831,7 +843,7 @@ mod tests {
Some(stream)
}

fn subscribe(&self) -> Option<BoxStream<DiscoveryItem>> {
fn subscribe(&self) -> Option<BoxStream<DiscoveryEvent>> {
let recv = self.shared.watchers.subscribe();
let stream =
tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|item| item.ok());
Expand Down Expand Up @@ -1016,25 +1028,34 @@ mod tests {
ep2.node_addr().initialized().await;
let _ = ep1.connect(ep2.node_id(), TEST_ALPN).await?;

let item = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged");
let DiscoveryEvent::Discovered(item) =
tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged")
else {
panic!("Returned unexpected discovery event!");
};

assert_eq!(item.node_id(), ep2.node_id());
assert_eq!(item.provenance(), "test-disco");

// inject item into discovery passively
let passive_node_id = SecretKey::generate(rand::thread_rng()).public();
let node_info = NodeInfo::new(passive_node_id);
let passive_item = DiscoveryItem::new(node_info, "test-disco-passive", None);
disco_shared.send_passive(passive_item.clone());

let item = tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged");
disco_shared.send_passive(DiscoveryEvent::Discovered(passive_item.clone()));

let DiscoveryEvent::Discovered(item) =
tokio::time::timeout(Duration::from_secs(1), stream.next())
.await
.expect("timeout")
.expect("stream closed")
.expect("stream lagged")
else {
panic!("Returned unexpected discovery event!");
};
assert_eq!(item.node_id(), passive_node_id);
assert_eq!(item.provenance(), "test-disco-passive");

Expand Down
Loading
Loading