Skip to content

Commit 3667ab4

Browse files
committed
implement async qos_net::AsyncProxy
this includes: - Reaper::execute_async [qos_core] - AsyncStream [qos_core] - AsyncPool/AsyncStream [qos_core] - AsyncListener [qos_core] - AsyncRequestProcessor [qos_core] - AsyncProxyConnection [qos_net] - AsyncProxy [qos_net] - AsyncProxyStream [qos_net] as well as fixing qos_next's qos_core "vm" feature dependency issue by only requiring it if qos_net is requested with it
1 parent 5cb6afc commit 3667ab4

30 files changed

+2413
-642
lines changed

src/Cargo.lock

Lines changed: 436 additions & 453 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/qos_core/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@ aws-nitro-enclaves-nsm-api = { version = "0.4", default-features = false }
2121
serde_bytes = { version = "0.11", default-features = false }
2222
serde = { version = "1", features = ["derive"], default-features = false }
2323

24-
tokio = { version = "1.38.0", features = ["io-util", "macros", "net", "rt-multi-thread", "time"], default-features = false }
24+
futures = { version = "0.3.31" }
25+
tokio = { version = "1.38.0", features = ["io-util", "macros", "net", "rt-multi-thread", "time"], default-features = false, optional = true}
2526
tokio-vsock = { version = "0.7.1", optional = true }
2627

2728
[dev-dependencies]
@@ -32,7 +33,9 @@ rustls = { version = "0.23.5" }
3233
webpki-roots = { version = "0.26.1" }
3334

3435
[features]
36+
default = ["async", "vm"]
37+
async = ["tokio", "tokio-vsock"]
3538
# Support for VSOCK
36-
vm = ["tokio-vsock"]
39+
vm = []
3740
# Never use in production - support for mock NSM
3841
mock = ["qos_nsm/mock"]

src/qos_core/src/async_client.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//! Streaming socket based client to connect with
2+
//! [`crate::server::SocketServer`].
3+
4+
use crate::{client::ClientError, io::AsyncStreamPool};
5+
6+
/// Client for communicating with the enclave [`crate::server::SocketServer`].
7+
pub struct AsyncClient {
8+
pool: AsyncStreamPool,
9+
}
10+
11+
impl AsyncClient {
12+
/// Create a new client.
13+
#[must_use]
14+
pub fn new(pool: AsyncStreamPool) -> Self {
15+
Self { pool }
16+
}
17+
18+
/// Send raw bytes and wait for a response until the clients configured
19+
/// timeout.
20+
pub async fn call(&self, request: &[u8]) -> Result<Vec<u8>, ClientError> {
21+
let mut stream = self.pool.get().await;
22+
let resp = stream.call(request).await?;
23+
24+
Ok(resp)
25+
}
26+
}

src/qos_core/src/async_server.rs

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
//! Streaming socket based server for use in an enclave. Listens for connections
2+
//! from [`crate::client::Client`].
3+
4+
use std::marker::PhantomData;
5+
6+
use crate::{
7+
io::{AsyncListener, AsyncStreamPool},
8+
server::SocketServerError,
9+
};
10+
11+
/// Something that can process requests in an async way.
12+
pub trait AsyncRequestProcessor: Send {
13+
/// Process an incoming request and return a response in async.
14+
///
15+
/// The request and response are raw bytes. Likely this should be encoded
16+
/// data and logic inside of this function should take care of decoding the
17+
/// request and encoding a response.
18+
fn process(
19+
&self,
20+
request: Vec<u8>,
21+
) -> impl std::future::Future<Output = Vec<u8>> + Send;
22+
}
23+
24+
/// A bare bones, socket based server.
25+
pub struct AsyncSocketServer<R: AsyncRequestProcessor> {
26+
_phantom: PhantomData<R>,
27+
}
28+
29+
impl<R> AsyncSocketServer<R>
30+
where
31+
R: AsyncRequestProcessor + 'static + Clone,
32+
{
33+
/// Listen and respond to incoming requests on all the pool's addresses with the given `processor`.
34+
/// *NOTE*: the POOL_SIZE must match on both sides, since we expect ALL sockets to be connected
35+
/// to right away (e.g. not on first use). The client side connect (above) will always connect them all.
36+
pub async fn listen_all(
37+
pool: AsyncStreamPool,
38+
processor: R,
39+
) -> Result<(), SocketServerError> {
40+
println!("`AsyncSocketServer` listening on pool"); // TODO: add the addresses herei
41+
42+
let listeners = pool.listen().await?;
43+
44+
let mut tasks = Vec::new();
45+
for listener in listeners {
46+
let p = processor.clone();
47+
let task =
48+
tokio::spawn(async move { accept_loop(listener, p).await });
49+
50+
tasks.push(task);
51+
}
52+
53+
// wait for ALL pool connections
54+
let joined = futures::future::join_all(tasks).await;
55+
for outer_result in joined {
56+
match outer_result {
57+
Err(_join_err) => {
58+
return Err(SocketServerError::IOError(
59+
crate::io::IOError::UnknownError, // TODO: add a join error translation to IOError
60+
));
61+
} // this really shouldn't happen
62+
Ok(result) => result?,
63+
}
64+
}
65+
66+
Ok(())
67+
}
68+
}
69+
70+
async fn accept_loop<P>(
71+
listener: AsyncListener,
72+
processor: P,
73+
) -> Result<(), SocketServerError>
74+
where
75+
P: AsyncRequestProcessor + Clone,
76+
{
77+
loop {
78+
let mut stream = listener.accept().await?;
79+
loop {
80+
match stream.recv().await {
81+
Ok(payload) => {
82+
let response = processor.process(payload).await;
83+
let _ = stream.send(&response).await?;
84+
}
85+
Err(err) => return Err(SocketServerError::IOError(err)),
86+
}
87+
}
88+
}
89+
}

src/qos_core/src/cli.rs

Lines changed: 108 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,13 @@ use crate::{
99
io::SocketAddress,
1010
parser::{GetParserForOptions, OptionsParser, Parser, Token},
1111
reaper::Reaper,
12-
EPHEMERAL_KEY_FILE, MANIFEST_FILE, PIVOT_FILE, QUORUM_FILE, SEC_APP_SOCK,
12+
DEFAULT_POOL_SIZE, EPHEMERAL_KEY_FILE, MANIFEST_FILE, PIVOT_FILE,
13+
QUORUM_FILE, SEC_APP_SOCK,
1314
};
1415

16+
#[cfg(feature = "async")]
17+
use crate::io::AsyncStreamPool;
18+
1519
/// "cid"
1620
pub const CID: &str = "cid";
1721
/// "port"
@@ -28,6 +32,8 @@ pub const EPHEMERAL_FILE_OPT: &str = "ephemeral-file";
2832
/// Name for the option to specify the manifest file.
2933
pub const MANIFEST_FILE_OPT: &str = "manifest-file";
3034
const APP_USOCK: &str = "app-usock";
35+
/// Name for the option to specify the maximum AsyncPool size.
36+
pub const POOL_SIZE: &str = "pool-size";
3137

3238
/// CLI options for starting up the enclave server.
3339
#[derive(Default, Clone, Debug, PartialEq)]
@@ -44,11 +50,52 @@ impl EnclaveOpts {
4450
Self { parsed }
4551
}
4652

53+
/// Create a new [AsyncPool] of [AsyncStream] using the list of [SocketAddress] for the enclave server and
54+
/// return the new [AsyncPool]. Analogous to [Self::addr] and [Self::app_addr] depending on the [app] parameter.
55+
#[cfg(feature = "async")]
56+
fn async_pool(&self, app: bool) -> AsyncStreamPool {
57+
let pool_size: u32 = self
58+
.parsed
59+
.single(POOL_SIZE)
60+
.expect("invalid pool options")
61+
.parse()
62+
.expect("invalid pool_size specified");
63+
let usock_param = if app { APP_USOCK } else { USOCK };
64+
65+
match (
66+
self.parsed.single(CID),
67+
self.parsed.single(PORT),
68+
self.parsed.single(usock_param),
69+
) {
70+
#[cfg(feature = "vm")]
71+
(Some(c), Some(p), None) => {
72+
let c = c.parse::<u32>().unwrap();
73+
let start_port = p.parse::<u32>().unwrap();
74+
75+
let addresses = (start_port..start_port + pool_size).map(|p| {
76+
SocketAddress::new_vsock(c, p, crate::io::VMADDR_NO_FLAGS)
77+
});
78+
79+
AsyncStreamPool::new(addresses)
80+
}
81+
(None, None, Some(u)) => {
82+
let addresses = (0..pool_size).map(|i| {
83+
let u = format!("{}_{}", u, i); // add _X suffix for pooling
84+
SocketAddress::new_unix(&u)
85+
});
86+
87+
AsyncStreamPool::new(addresses)
88+
}
89+
_ => panic!("Invalid socket opts"),
90+
}
91+
}
92+
4793
/// Get the `SocketAddress` for the enclave server.
4894
///
4995
/// # Panics
5096
///
5197
/// Panics if the opts are not valid for exactly one of unix or vsock.
98+
#[allow(unused)]
5299
fn addr(&self) -> SocketAddress {
53100
match (
54101
self.parsed.single(CID),
@@ -66,6 +113,7 @@ impl EnclaveOpts {
66113
}
67114
}
68115

116+
#[allow(unused)]
69117
fn app_addr(&self) -> SocketAddress {
70118
SocketAddress::new_unix(
71119
self.parsed
@@ -135,20 +183,48 @@ impl CLI {
135183
} else if opts.parsed.help() {
136184
println!("{}", opts.parsed.info());
137185
} else {
138-
Reaper::execute(
139-
&Handles::new(
140-
opts.ephemeral_file(),
141-
opts.quorum_file(),
142-
opts.manifest_file(),
143-
opts.pivot_file(),
144-
),
145-
opts.nsm(),
146-
opts.addr(),
147-
opts.app_addr(),
148-
None,
149-
);
186+
Self::run(opts)
150187
}
151188
}
189+
190+
#[cfg(not(feature = "async"))]
191+
fn run(opts: EnclaveOpts) {
192+
Reaper::execute(
193+
&Handles::new(
194+
opts.ephemeral_file(),
195+
opts.quorum_file(),
196+
opts.manifest_file(),
197+
opts.pivot_file(),
198+
),
199+
opts.nsm(),
200+
opts.addr(),
201+
opts.app_addr(),
202+
None,
203+
);
204+
}
205+
206+
#[cfg(feature = "async")]
207+
fn run(opts: EnclaveOpts) {
208+
tokio::runtime::Builder::new_current_thread()
209+
.enable_all()
210+
.build()
211+
.expect("tokio main to run")
212+
.block_on(async {
213+
Reaper::async_execute(
214+
&Handles::new(
215+
opts.ephemeral_file(),
216+
opts.quorum_file(),
217+
opts.manifest_file(),
218+
opts.pivot_file(),
219+
),
220+
opts.nsm(),
221+
opts.async_pool(false),
222+
opts.async_pool(true),
223+
None,
224+
)
225+
.await;
226+
});
227+
}
152228
}
153229

154230
/// Parser for enclave CLI
@@ -201,6 +277,11 @@ impl GetParserForOptions for EnclaveParser {
201277
.takes_value(true)
202278
.default_value(SEC_APP_SOCK)
203279
)
280+
.token(
281+
Token::new(POOL_SIZE, "the pool size for use with all socket types")
282+
.takes_value(true)
283+
.default_value(DEFAULT_POOL_SIZE)
284+
)
204285
}
205286
}
206287

@@ -281,6 +362,20 @@ mod test {
281362
assert_eq!(opts.addr(), SocketAddress::new_unix("./test.sock"));
282363
}
283364

365+
#[test]
366+
#[cfg(feature = "async")]
367+
fn parse_pool_size() {
368+
let mut args: Vec<_> =
369+
vec!["binary", "--usock", "./test.sock", "--pool-size", "7"]
370+
.into_iter()
371+
.map(String::from)
372+
.collect();
373+
let opts = EnclaveOpts::new(&mut args);
374+
375+
let pool = opts.async_pool(false);
376+
assert_eq!(pool.len(), 7);
377+
}
378+
284379
#[test]
285380
fn parse_manifest_file() {
286381
let mut args: Vec<_> = vec!["binary", "--usock", "./test.sock"]

0 commit comments

Comments
 (0)