Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
983 changes: 525 additions & 458 deletions src/Cargo.lock

Large diffs are not rendered by default.

696 changes: 520 additions & 176 deletions src/init/Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/init/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
publish = false

[dependencies]
libc = "0.2.149"
libc = "=0.2.172"
qos_aws = { path = "../qos_aws"}
qos_system = { path = "../qos_system"}
qos_core = { path = "../qos_core", features = ["vm"], default-features = false }
Expand Down
2 changes: 1 addition & 1 deletion src/qos_aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libc = "0.2.149"
libc = "=0.2.172"
qos_system = { path = "../qos_system"}
8 changes: 6 additions & 2 deletions src/qos_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ qos_p256 = { path = "../qos_p256" }
qos_nsm = { path = "../qos_nsm", default-features = false }

nix = { version = "0.26", features = ["socket"], default-features = false }
libc = "=0.2.155"
libc = "=0.2.172"
borsh = { version = "1.0", features = ["std", "derive"] , default-features = false}
vsss-rs = { version = "5.1", default-features = false, features = ["std", "zeroize"] }

Expand All @@ -21,6 +21,9 @@ aws-nitro-enclaves-nsm-api = { version = "0.4", default-features = false }
serde_bytes = { version = "0.11", default-features = false }
serde = { version = "1", features = ["derive"], default-features = false }

tokio = { version = "1.38.0", features = ["io-util", "macros", "net", "rt-multi-thread", "time"], default-features = false }
tokio-vsock = { version = "0.7.1", optional = true }

[dev-dependencies]
qos_test_primitives = { path = "../qos_test_primitives" }
qos_p256 = { path = "../qos_p256", features = ["mock"] }
Expand All @@ -29,7 +32,8 @@ rustls = { version = "0.23.5" }
webpki-roots = { version = "0.26.1" }

[features]
default = ["vm"]
# Support for VSOCK
vm = []
vm = ["tokio-vsock"]
# Never use in production - support for mock NSM
mock = ["qos_nsm/mock"]
50 changes: 50 additions & 0 deletions src/qos_core/src/async_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Streaming socket based server for use in an enclave. Listens for connections
//! from [`crate::client::Client`].

use std::marker::PhantomData;

use crate::{
io::{AsyncListener, SocketAddress},
server::SocketServerError,
};

/// Something that can process requests in an async way.
pub trait AsyncRequestProcessor {
/// Process an incoming request and return a response in async.
///
/// The request and response are raw bytes. Likely this should be encoded
/// data and logic inside of this function should take care of decoding the
/// request and encoding a response.
fn process(
&mut self,
request: Vec<u8>,
) -> impl std::future::Future<Output = Vec<u8>>;
}

/// A bare bones, socket based server.
pub struct AsyncSocketServer<R: AsyncRequestProcessor> {
_phantom: PhantomData<R>,
}

impl<R: AsyncRequestProcessor> AsyncSocketServer<R> {
/// Listen and respond to incoming requests with the given `processor`.
pub async fn listen(
addr: SocketAddress,
mut processor: R,
) -> Result<(), SocketServerError> {
println!("`SocketServer` listening on {addr:?}");

let listener = AsyncListener::listen(addr).await?;

loop {
let mut stream = listener.accept().await?;
match stream.recv().await {
Ok(payload) => {
let response = processor.process(payload).await;
let _ = stream.send(&response).await?;
}
Err(err) => eprintln!("AsyncServer::listen error: {err:?}"),
}
}
}
}
169 changes: 169 additions & 0 deletions src/qos_core/src/io/async_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
//! Abstractions to handle connection based socket streams.

use std::time::Duration;

#[cfg(feature = "vm")]
pub use nix::sys::time::TimeVal;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
net::{UnixListener, UnixSocket, UnixStream},
};
use tokio_vsock::{VsockListener, VsockStream};

use super::{IOError, SocketAddress};

enum InnerListener {
Unix(UnixListener),
#[cfg(feature = "vm")]
Vsock(VsockListener),
}

enum InnerStream {
Unix(UnixStream),
#[cfg(feature = "vm")]
Vsock(VsockStream),
}

/// Handle on a stream
pub struct AsyncStream(InnerStream);

impl AsyncStream {
fn unix_accepted(stream: UnixStream) -> Self {
Self(InnerStream::Unix(stream))
}

fn vsock_accepted(stream: VsockStream) -> Self {
Self(InnerStream::Vsock(stream))
}

/// Create a new `Stream` from a `SocketAddress` and a timeout and connect using async
pub async fn connect(
addr: &SocketAddress,
timeout: TimeVal,
) -> Result<AsyncStream, IOError> {
match addr {
SocketAddress::Unix(uaddr) => {
let path =
uaddr.path().ok_or(IOError::ConnectAddressInvalid)?;

let socket = UnixSocket::new_stream()?;
let timeout = Duration::new(
timeout.tv_sec() as u64,
timeout.tv_usec() as u32 * 1000,
);

let inner =
tokio::time::timeout(timeout.into(), socket.connect(path))
.await??;

Ok(Self(InnerStream::Unix(inner)))
}
SocketAddress::Vsock(vaddr) => {
let vaddr =
tokio_vsock::VsockAddr::new(vaddr.cid(), vaddr.port());
let inner = VsockStream::connect(vaddr).await?;

Ok(Self(InnerStream::Vsock(inner)))
}
}
}

/// Sends a buffer over the underlying socket using async
pub(crate) async fn send(&mut self, buf: &[u8]) -> Result<(), IOError> {
match &mut self.0 {
InnerStream::Unix(ref mut s) => send(s, buf).await,
InnerStream::Vsock(ref mut s) => send(s, buf).await,
}
}

/// Receive from the underlying socket using async
pub(crate) async fn recv(&mut self) -> Result<Vec<u8>, IOError> {
match &mut self.0 {
InnerStream::Unix(ref mut s) => recv(s).await,
InnerStream::Vsock(ref mut s) => recv(s).await,
}
}

/// Perform a "call" by sending the req_buf bytes and waiting for reply on the same socket.
pub async fn call(&mut self, req_buf: &[u8]) -> Result<Vec<u8>, IOError> {
self.send(req_buf).await?;
self.recv().await
}
}

async fn send<S: AsyncWriteExt + Unpin>(
stream: &mut S,
buf: &[u8],
) -> Result<(), IOError> {
let len = buf.len();
// First, send the length of the buffer
let len_buf: [u8; size_of::<u64>()] = (len as u64).to_le_bytes();
stream.write_all(&len_buf).await?;
// Send the actual contents of the buffer
stream.write_all(buf).await?;

Ok(())
}

async fn recv<S: AsyncReadExt + Unpin>(
stream: &mut S,
) -> Result<Vec<u8>, IOError> {
let length: usize = {
let mut buf = [0u8; size_of::<u64>()];
stream.read_exact(&mut buf).await?;
u64::from_le_bytes(buf)
.try_into()
// Should only be possible if we are on 32bit architecture
.map_err(|_| IOError::ArithmeticSaturation)?
};

// Read the buffer
let mut buf = vec![0; length];
stream.read_exact(&mut buf).await?;

Ok(buf)
}

/// Abstraction to listen for incoming stream connections.
pub struct AsyncListener {
inner: InnerListener,
// addr: SocketAddress,
}

impl AsyncListener {
/// Bind and listen on the given address.
pub(crate) async fn listen(addr: SocketAddress) -> Result<Self, IOError> {
let listener = match addr {
SocketAddress::Unix(uaddr) => {
let path =
uaddr.path().ok_or(IOError::ConnectAddressInvalid)?;
let inner = InnerListener::Unix(UnixListener::bind(path)?);
Self { inner }
}
SocketAddress::Vsock(vaddr) => {
let vaddr =
tokio_vsock::VsockAddr::new(vaddr.cid(), vaddr.port());
let inner = InnerListener::Vsock(VsockListener::bind(vaddr)?);
Self { inner }
}
};

Ok(listener)
}

/// Accept a new connection.
pub async fn accept(&self) -> Result<AsyncStream, IOError> {
let stream = match &self.inner {
InnerListener::Unix(l) => {
let (s, _) = l.accept().await?;
AsyncStream::unix_accepted(s)
}
InnerListener::Vsock(l) => {
let (s, _) = l.accept().await?;
AsyncStream::vsock_accepted(s)
}
};

Ok(stream)
}
}
21 changes: 21 additions & 0 deletions src/qos_core/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,31 @@
//! NOTE TO MAINTAINERS: Interaction with any sys calls should be contained
//! within this module.

mod async_stream;
mod stream;

pub use stream::{
Listener, SocketAddress, Stream, TimeVal, TimeValLike, VMADDR_FLAG_TO_HOST,
VMADDR_NO_FLAGS,
};

pub use async_stream::*;

/// QOS I/O error
#[derive(Debug)]
pub enum IOError {
/// `std::io::Error` wrapper.
StdIoError(std::io::Error),
/// `nix::Error` wrapper.
NixError(nix::Error),
/// Arithmetic operation saturated.
ArithmeticSaturation,
/// Unknown error.
UnknownError,
/// Connect address invalid
ConnectAddressInvalid,
/// Timed out while claling `connect` over a socket.
ConnectTimeout,
/// Timed out while calling `recv` over a socket.
RecvTimeout,
/// The `recv` system call was interrupted while receiving over a socket.
Expand All @@ -38,3 +47,15 @@ impl From<nix::Error> for IOError {
Self::NixError(err)
}
}

impl From<std::io::Error> for IOError {
fn from(err: std::io::Error) -> Self {
Self::StdIoError(err)
}
}

impl From<tokio::time::error::Elapsed> for IOError {
fn from(_: tokio::time::error::Elapsed) -> Self {
Self::ConnectTimeout
}
}
1 change: 1 addition & 0 deletions src/qos_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ compile_error!(
"feature \"vm\" and feature \"mock\" cannot be enabled at the same time"
);

pub mod async_server;
pub mod cli;
pub mod client;
pub mod handles;
Expand Down
2 changes: 1 addition & 1 deletion src/qos_enclave/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ publish = false

[dependencies]
nitro-cli = { git = "https://github.com/aws/aws-nitro-enclaves-cli", version = "1.4.0" }
libc = "0.2.149"
libc = "=0.2.172"

[features]
default = []
8 changes: 5 additions & 3 deletions src/qos_net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ edition = "2021"
publish = false

[dependencies]
qos_core = { path = "../qos_core", default-features = false }
qos_core = { path = "../qos_core", default-features = false, features = ["vm"] }

borsh = { version = "1.0", features = [
"std",
Expand All @@ -16,18 +16,20 @@ hickory-resolver = { version = "0.24.1", features = [
"tokio-runtime",
], default-features = false, optional = true }
rand = { version = "0.8.5", default-features = false, optional = true }
tokio = { version = "1.38.0", features = ["io-util", "macros", "net", "rt-multi-thread", "time"], default-features = false, optional = true }

[dev-dependencies]
qos_test_primitives = { path = "../qos_test_primitives" }
httparse = { version = "1.9.4", default-features = false }
chunked_transfer = { version = "1.5.0", default-features = false }
rustls = { version = "0.23.5" }
serde_json = { version = "1.0.121", features = [
"std",
], default-features = false }
rustls = { version = "0.23.5" }
webpki-roots = { version = "0.26.1" }

[features]
default = ["proxy"] # keep this as a default feature ensures we lint by default
default = ["async_proxy"] # keep this as a default feature ensures we lint by default
async_proxy = ["hickory-resolver", "rand", "tokio"]
proxy = ["rand", "hickory-resolver"]
vm = []
26 changes: 26 additions & 0 deletions src/qos_net/src/async_cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use crate::{
async_proxy::AsyncProxy,
cli::{ProxyOpts, CLI},
};

///! Async extension to the CLI

impl CLI {
/// Execute the enclave proxy CLI with the environment args in an async way.
pub async fn async_execute() {
use qos_core::async_server::AsyncSocketServer;

let mut args: Vec<String> = std::env::args().collect();
let opts = ProxyOpts::new(&mut args);

if opts.parsed.version() {
println!("version: {}", env!("CARGO_PKG_VERSION"));
} else if opts.parsed.help() {
println!("{}", opts.parsed.info());
} else {
AsyncSocketServer::listen(opts.addr(), AsyncProxy::new())
.await
.unwrap();
}
}
}
Loading
Loading