Skip to content

Commit 0bcb64d

Browse files
authored
feat(driver): determine current driver type at runtime (#471)
* refactor(driver): determine driver type at runtime * feat(driver): get current driver type * fix(driver): correct current driver type * fix(driver,fusion): warn instead of error * feat(driver): allow specifying driver type
1 parent 793888f commit 0bcb64d

File tree

11 files changed

+143
-122
lines changed

11 files changed

+143
-122
lines changed

compio-driver/src/buffer_pool/fusion.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ impl Debug for BufferPool {
2929
}
3030

3131
impl BufferPool {
32+
pub(crate) fn is_io_uring(&self) -> bool {
33+
matches!(&self.inner, BufferPollInner::IoUring(_))
34+
}
35+
3236
pub(crate) fn new_io_uring(buffer_pool: iour::BufferPool) -> Self {
3337
Self {
3438
inner: BufferPollInner::IoUring(buffer_pool),

compio-driver/src/driver_type.rs

Lines changed: 38 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,109 +1,64 @@
1-
use std::sync::atomic::{AtomicU8, Ordering};
2-
3-
const UNINIT: u8 = u8::MAX;
4-
const IO_URING: u8 = 0;
5-
const POLLING: u8 = 1;
6-
const IOCP: u8 = 2;
7-
8-
static DRIVER_TYPE: AtomicU8 = AtomicU8::new(UNINIT);
9-
101
/// Representing underlying driver type the fusion driver is using
112
#[repr(u8)]
123
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
134
pub enum DriverType {
145
/// Using `polling` driver
15-
Poll = POLLING,
16-
6+
Poll,
177
/// Using `io-uring` driver
18-
IoUring = IO_URING,
19-
8+
IoUring,
209
/// Using `iocp` driver
21-
IOCP = IOCP,
10+
IOCP,
2211
}
2312

2413
impl DriverType {
25-
fn from_num(n: u8) -> Self {
26-
match n {
27-
IO_URING => Self::IoUring,
28-
POLLING => Self::Poll,
29-
IOCP => Self::IOCP,
30-
_ => unreachable!("invalid driver type"),
31-
}
32-
}
33-
3414
/// Get the underlying driver type
35-
fn get() -> DriverType {
36-
cfg_if::cfg_if! {
37-
if #[cfg(windows)] {
38-
DriverType::IOCP
39-
} else if #[cfg(fusion)] {
40-
use io_uring::opcode::*;
15+
#[cfg(fusion)]
16+
pub(crate) fn suggest() -> DriverType {
17+
use io_uring::opcode::*;
4118

42-
// Add more opcodes here if used
43-
const USED_OP: &[u8] = &[
44-
Read::CODE,
45-
Readv::CODE,
46-
Write::CODE,
47-
Writev::CODE,
48-
Fsync::CODE,
49-
Accept::CODE,
50-
Connect::CODE,
51-
RecvMsg::CODE,
52-
SendMsg::CODE,
53-
AsyncCancel::CODE,
54-
OpenAt::CODE,
55-
Close::CODE,
56-
Shutdown::CODE,
57-
Socket::CODE,
58-
];
19+
// Add more opcodes here if used
20+
const USED_OP: &[u8] = &[
21+
Read::CODE,
22+
Readv::CODE,
23+
Write::CODE,
24+
Writev::CODE,
25+
Fsync::CODE,
26+
Accept::CODE,
27+
Connect::CODE,
28+
RecvMsg::CODE,
29+
SendMsg::CODE,
30+
AsyncCancel::CODE,
31+
OpenAt::CODE,
32+
Close::CODE,
33+
Shutdown::CODE,
34+
Socket::CODE,
35+
];
5936

60-
(|| {
61-
let uring = io_uring::IoUring::new(2)?;
62-
let mut probe = io_uring::Probe::new();
63-
uring.submitter().register_probe(&mut probe)?;
64-
if USED_OP.iter().all(|op| probe.is_supported(*op)) {
65-
std::io::Result::Ok(DriverType::IoUring)
66-
} else {
67-
Ok(DriverType::Poll)
68-
}
69-
})()
70-
.unwrap_or(DriverType::Poll) // Should we fail here?
71-
} else if #[cfg(io_uring)] {
72-
DriverType::IoUring
73-
} else if #[cfg(unix)] {
74-
DriverType::Poll
37+
(|| {
38+
let uring = io_uring::IoUring::new(2)?;
39+
let mut probe = io_uring::Probe::new();
40+
uring.submitter().register_probe(&mut probe)?;
41+
if USED_OP.iter().all(|op| probe.is_supported(*op)) {
42+
std::io::Result::Ok(DriverType::IoUring)
7543
} else {
76-
compile_error!("unsupported platform");
44+
Ok(DriverType::Poll)
7745
}
78-
}
79-
}
80-
81-
/// Get the underlying driver type and cache it. Following calls will return
82-
/// the cached value.
83-
pub fn current() -> DriverType {
84-
match DRIVER_TYPE.load(Ordering::Acquire) {
85-
UNINIT => {}
86-
x => return DriverType::from_num(x),
87-
}
88-
let dev_ty = Self::get();
89-
90-
DRIVER_TYPE.store(dev_ty as u8, Ordering::Release);
91-
92-
dev_ty
46+
})()
47+
.unwrap_or(DriverType::Poll) // Should we fail here?
9348
}
9449

9550
/// Check if the current driver is `polling`
96-
pub fn is_polling() -> bool {
97-
Self::current() == DriverType::Poll
51+
pub fn is_polling(&self) -> bool {
52+
*self == DriverType::Poll
9853
}
9954

10055
/// Check if the current driver is `io-uring`
101-
pub fn is_iouring() -> bool {
102-
Self::current() == DriverType::IoUring
56+
pub fn is_iouring(&self) -> bool {
57+
*self == DriverType::IoUring
10358
}
10459

10560
/// Check if the current driver is `iocp`
106-
pub fn is_iocp() -> bool {
107-
Self::current() == DriverType::IOCP
61+
pub fn is_iocp(&self) -> bool {
62+
*self == DriverType::IOCP
10863
}
10964
}

compio-driver/src/fusion/mod.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ pub(crate) mod op;
1010
pub use std::os::fd::{AsFd, AsRawFd, BorrowedFd, OwnedFd, RawFd};
1111
use std::{io, task::Poll, time::Duration};
1212

13+
use compio_log::warn;
1314
pub use iour::{OpCode as IourOpCode, OpEntry};
1415
pub use poll::{Decision, OpCode as PollOpCode, OpType};
1516

@@ -37,17 +38,37 @@ pub(crate) struct Driver {
3738
impl Driver {
3839
/// Create a new fusion driver with given number of entries
3940
pub fn new(builder: &ProactorBuilder) -> io::Result<Self> {
40-
match DriverType::current() {
41+
let (ty, fallback) = match &builder.driver_type {
42+
Some(t) => (*t, false),
43+
None => (DriverType::suggest(), true),
44+
};
45+
match ty {
4146
DriverType::Poll => Ok(Self {
4247
fuse: FuseDriver::Poll(poll::Driver::new(builder)?),
4348
}),
44-
DriverType::IoUring => Ok(Self {
45-
fuse: FuseDriver::IoUring(iour::Driver::new(builder)?),
46-
}),
49+
DriverType::IoUring => match iour::Driver::new(builder) {
50+
Ok(driver) => Ok(Self {
51+
fuse: FuseDriver::IoUring(driver),
52+
}),
53+
Err(_e) if fallback => {
54+
warn!("Fail to create io-uring driver: {_e:?}, fallback to polling driver.");
55+
Ok(Self {
56+
fuse: FuseDriver::Poll(poll::Driver::new(builder)?),
57+
})
58+
}
59+
Err(e) => Err(e),
60+
},
4761
_ => unreachable!("Fuse driver will only be enabled on linux"),
4862
}
4963
}
5064

65+
pub fn driver_type(&self) -> DriverType {
66+
match &self.fuse {
67+
FuseDriver::Poll(driver) => driver.driver_type(),
68+
FuseDriver::IoUring(driver) => driver.driver_type(),
69+
}
70+
}
71+
5172
pub fn create_op<T: OpCode + 'static>(&self, op: T) -> Key<T> {
5273
match &self.fuse {
5374
FuseDriver::Poll(driver) => driver.create_op(op),

compio-driver/src/fusion/op.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::ffi::CString;
1+
use std::{ffi::CString, hint::unreachable_unchecked};
22

33
use compio_buf::{IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
44
use socket2::SockAddr;
@@ -10,24 +10,39 @@ macro_rules! op {
1010
(<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? )) => {
1111
::paste::paste!{
1212
enum [< $name Inner >] <$($ty: $trait),*> {
13+
Uninit($($arg_t),*),
1314
Poll(poll::$name<$($ty),*>),
1415
IoUring(iour::$name<$($ty),*>),
1516
}
1617

1718
impl<$($ty: $trait),*> [< $name Inner >]<$($ty),*> {
1819
fn poll(&mut self) -> &mut poll::$name<$($ty),*> {
19-
debug_assert!(DriverType::current() == DriverType::Poll);
20-
2120
match self {
21+
Self::Uninit(..) => {
22+
unsafe {
23+
let Self::Uninit($($arg),*) = std::ptr::read(self) else {
24+
unreachable_unchecked()
25+
};
26+
std::ptr::write(self, Self::Poll(poll::$name::new($($arg),*)));
27+
}
28+
self.poll()
29+
},
2230
Self::Poll(ref mut op) => op,
2331
Self::IoUring(_) => unreachable!("Current driver is not `io-uring`"),
2432
}
2533
}
2634

2735
fn iour(&mut self) -> &mut iour::$name<$($ty),*> {
28-
debug_assert!(DriverType::current() == DriverType::IoUring);
29-
3036
match self {
37+
Self::Uninit(..) => {
38+
unsafe {
39+
let Self::Uninit($($arg),*) = std::ptr::read(self) else {
40+
unreachable_unchecked()
41+
};
42+
std::ptr::write(self, Self::IoUring(iour::$name::new($($arg),*)));
43+
}
44+
self.iour()
45+
},
3146
Self::IoUring(ref mut op) => op,
3247
Self::Poll(_) => unreachable!("Current driver is not `polling`"),
3348
}
@@ -42,8 +57,12 @@ macro_rules! op {
4257
impl<$($ty: $trait),*> IntoInner for $name <$($ty),*> {
4358
type Inner = <poll::$name<$($ty),*> as IntoInner>::Inner;
4459

45-
fn into_inner(self) -> Self::Inner {
60+
fn into_inner(mut self) -> Self::Inner {
4661
match self.inner {
62+
[< $name Inner >]::Uninit(..) => {
63+
self.inner.poll();
64+
self.into_inner()
65+
},
4766
[< $name Inner >]::Poll(op) => op.into_inner(),
4867
[< $name Inner >]::IoUring(op) => op.into_inner(),
4968
}
@@ -53,15 +72,7 @@ macro_rules! op {
5372
impl<$($ty: $trait),*> $name <$($ty),*> {
5473
#[doc = concat!("Create a new `", stringify!($name), "`.")]
5574
pub fn new($($arg: $arg_t),*) -> Self {
56-
match DriverType::current() {
57-
DriverType::Poll => Self {
58-
inner: [< $name Inner >]::Poll(poll::$name::new($($arg),*)),
59-
},
60-
DriverType::IoUring => Self {
61-
inner: [< $name Inner >]::IoUring(iour::$name::new($($arg),*)),
62-
},
63-
_ => unreachable!("Fuse driver will only be enabled on linux"),
64-
}
75+
Self { inner: [< $name Inner >]::Uninit($($arg),*) }
6576
}
6677
}
6778
}
@@ -103,7 +114,7 @@ op!(<S: AsFd> FileStat(fd: S));
103114
op!(<> PathStat(path: CString, follow_symlink: bool));
104115

105116
macro_rules! mop {
106-
(<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? )) => {
117+
(<$($ty:ident: $trait:ident),* $(,)?> $name:ident( $($arg:ident: $arg_t:ty),* $(,)? ) with $pool:ident) => {
107118
::paste::paste!{
108119
enum [< $name Inner >] <$($ty: $trait),*> {
109120
Poll(crate::op::managed::$name<$($ty),*>),
@@ -112,17 +123,13 @@ macro_rules! mop {
112123

113124
impl<$($ty: $trait),*> [< $name Inner >]<$($ty),*> {
114125
fn poll(&mut self) -> &mut crate::op::managed::$name<$($ty),*> {
115-
debug_assert!(DriverType::current() == DriverType::Poll);
116-
117126
match self {
118127
Self::Poll(ref mut op) => op,
119128
Self::IoUring(_) => unreachable!("Current driver is not `io-uring`"),
120129
}
121130
}
122131

123132
fn iour(&mut self) -> &mut iour::$name<$($ty),*> {
124-
debug_assert!(DriverType::current() == DriverType::IoUring);
125-
126133
match self {
127134
Self::IoUring(ref mut op) => op,
128135
Self::Poll(_) => unreachable!("Current driver is not `polling`"),
@@ -138,14 +145,14 @@ macro_rules! mop {
138145
impl<$($ty: $trait),*> $name <$($ty),*> {
139146
#[doc = concat!("Create a new `", stringify!($name), "`.")]
140147
pub fn new($($arg: $arg_t),*) -> std::io::Result<Self> {
141-
Ok(match DriverType::current() {
142-
DriverType::Poll => Self {
143-
inner: [< $name Inner >]::Poll(crate::op::managed::$name::new($($arg),*)?),
144-
},
145-
DriverType::IoUring => Self {
148+
Ok(if $pool.is_io_uring() {
149+
Self {
146150
inner: [< $name Inner >]::IoUring(iour::$name::new($($arg),*)?),
147-
},
148-
_ => unreachable!("Fuse driver will only be enabled on linux"),
151+
}
152+
} else {
153+
Self {
154+
inner: [< $name Inner >]::Poll(crate::op::managed::$name::new($($arg),*)?),
155+
}
149156
})
150157
}
151158
}
@@ -196,5 +203,5 @@ macro_rules! mop {
196203
};
197204
}
198205

199-
mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize));
200-
mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize));
206+
mop!(<S: AsFd> ReadManagedAt(fd: S, offset: u64, pool: &BufferPool, len: usize) with pool);
207+
mop!(<S: AsFd> RecvManaged(fd: S, pool: &BufferPool, len: usize) with pool);

compio-driver/src/iocp/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use windows_sys::Win32::{
1717
System::IO::OVERLAPPED,
1818
};
1919

20-
use crate::{AsyncifyPool, BufferPool, Entry, Key, ProactorBuilder};
20+
use crate::{AsyncifyPool, BufferPool, DriverType, Entry, Key, ProactorBuilder};
2121

2222
pub(crate) mod op;
2323

@@ -301,6 +301,10 @@ impl Driver {
301301
})
302302
}
303303

304+
pub fn driver_type(&self) -> DriverType {
305+
DriverType::IOCP
306+
}
307+
304308
pub fn create_op<T: OpCode + 'static>(&self, op: T) -> Key<T> {
305309
Key::new(self.port.as_raw_handle() as _, op)
306310
}

compio-driver/src/iour/mod.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use io_uring::{
2727
};
2828
use slab::Slab;
2929

30-
use crate::{AsyncifyPool, BufferPool, Entry, Key, ProactorBuilder, syscall};
30+
use crate::{AsyncifyPool, BufferPool, DriverType, Entry, Key, ProactorBuilder, syscall};
3131

3232
pub(crate) mod op;
3333

@@ -121,6 +121,10 @@ impl Driver {
121121
})
122122
}
123123

124+
pub fn driver_type(&self) -> DriverType {
125+
DriverType::IoUring
126+
}
127+
124128
// Auto means that it choose to wait or not automatically.
125129
fn submit_auto(&mut self, timeout: Option<Duration>) -> io::Result<()> {
126130
instrument!(compio_log::Level::TRACE, "submit_auto", ?timeout);

0 commit comments

Comments
 (0)