Skip to content

Commit 15bc231

Browse files
author
Dylan Wolff
committed
Adding semantic next_event information to Task struct
1 parent e121af1 commit 15bc231

29 files changed

+533
-120
lines changed

shuttle/src/future/batch_semaphore.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A counting semaphore supporting both async and sync operations.
22
use crate::current;
33
use crate::runtime::execution::ExecutionState;
4-
use crate::runtime::task::{clock::VectorClock, TaskId};
4+
use crate::runtime::task::{clock::VectorClock, Event, TaskId};
55
use crate::runtime::thread;
66
use crate::sync::{ResourceSignature, ResourceType};
77
use std::cell::RefCell;
@@ -289,7 +289,6 @@ impl BatchSemaphoreState {
289289
pub struct BatchSemaphore {
290290
state: RefCell<BatchSemaphoreState>,
291291
fairness: Fairness,
292-
#[allow(unused)]
293292
signature: ResourceSignature,
294293
}
295294

@@ -392,8 +391,9 @@ impl BatchSemaphore {
392391

393392
/// Closes the semaphore. This prevents the semaphore from issuing new
394393
/// permits and notifies all pending waiters.
394+
#[track_caller]
395395
pub fn close(&self) {
396-
thread::switch();
396+
thread::switch(Event::batch_semaphore_rel(&self.signature));
397397

398398
self.init_object_id();
399399
let mut state = self.state.borrow_mut();
@@ -436,8 +436,9 @@ impl BatchSemaphore {
436436
/// If the permits are available, returns Ok(())
437437
/// If the semaphore is closed, returns `Err(TryAcquireError::Closed)`
438438
/// If there aren't enough permits, returns `Err(TryAcquireError::NoPermits)`
439+
#[track_caller]
439440
pub fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
440-
thread::switch();
441+
thread::switch(Event::batch_semaphore_acq(&self.signature));
441442

442443
self.init_object_id();
443444
let mut state = self.state.borrow_mut();
@@ -538,22 +539,25 @@ impl BatchSemaphore {
538539
}
539540

540541
/// Acquire the specified number of permits (async API)
542+
#[track_caller]
541543
pub fn acquire(&self, num_permits: usize) -> Acquire<'_> {
542544
// No switch here; switch should be triggered on polling future
543545
self.init_object_id();
544546
Acquire::new(self, num_permits)
545547
}
546548

547549
/// Acquire the specified number of permits (blocking API)
550+
#[track_caller]
548551
pub fn acquire_blocking(&self, num_permits: usize) -> Result<(), AcquireError> {
549552
// No switch here; switch should be triggered on polling future
550553
self.init_object_id();
551554
crate::future::block_on(self.acquire(num_permits))
552555
}
553556

554557
/// Release `num_permits` back to the Semaphore
558+
#[track_caller]
555559
pub fn release(&self, num_permits: usize) {
556-
thread::switch();
560+
thread::switch(Event::batch_semaphore_rel(&self.signature));
557561

558562
self.init_object_id();
559563
if num_permits == 0 {
@@ -645,6 +649,7 @@ pub struct Acquire<'a> {
645649
}
646650

647651
impl<'a> Acquire<'a> {
652+
#[track_caller]
648653
fn new(semaphore: &'a BatchSemaphore, num_permits: usize) -> Self {
649654
let waiter = Arc::new(Waiter::new(num_permits));
650655
Self {
@@ -689,7 +694,7 @@ impl Future for Acquire<'_> {
689694
let blocking_is_not_commutative = self.semaphore.fairness == Fairness::StrictlyFair;
690695

691696
if self.never_polled && (will_succeed || blocking_is_not_commutative) {
692-
thread::switch();
697+
thread::switch(Event::batch_semaphore_acq(&self.semaphore.signature));
693698
}
694699
self.never_polled = false;
695700

@@ -778,12 +783,25 @@ impl Future for Acquire<'_> {
778783
self.waiter.is_queued.store(true, Ordering::SeqCst);
779784
}
780785
trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter);
786+
787+
let event = Event::batch_semaphore_acq(&self.semaphore.signature);
788+
ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) });
789+
// SAFETY: This is safe because the current task immediately suspends after this future
790+
// returns Poll::Pending (src/future/mod.rs). Whenever a task resumes, the `next_event`
791+
// is unset, so there is no opportunity to corrupt the reference to our signature while
792+
// it is set as the `next_task`.
781793
Poll::Pending
782794
}
783795
Err(TryAcquireError::Closed) => unreachable!(),
784796
}
785797
} else {
786798
// No progress made, future is still pending.
799+
let event = Event::batch_semaphore_acq(&self.semaphore.signature);
800+
// SAFETY: This is safe because the current task immediately suspends after this future
801+
// returns Poll::Pending (src/future/mod.rs). Whenever a task resumes, the `next_event`
802+
// is unset, so there is no opportunity to corrupt the reference to our signature while
803+
// it is set as the `next_task`.
804+
ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) });
787805
Poll::Pending
788806
}
789807
}

shuttle/src/future/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
251251
Poll::Ready(result) => break result,
252252
Poll::Pending => {
253253
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
254-
thread::switch();
254+
thread::switch_keep_event();
255255
}
256256
}
257257
}

shuttle/src/runtime/execution.rs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::runtime::failure::{init_panic_hook, persist_failure};
22
use crate::runtime::storage::{StorageKey, StorageMap};
33
use crate::runtime::task::clock::VectorClock;
44
use crate::runtime::task::labels::Labels;
5-
use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS};
5+
use crate::runtime::task::{ChildLabelFn, Event, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS};
66
use crate::runtime::thread;
77
use crate::runtime::thread::continuation::PooledContinuation;
88
use crate::scheduler::{Schedule, Scheduler};
@@ -535,7 +535,8 @@ impl ExecutionState {
535535
where
536536
F: Future<Output = ()> + 'static,
537537
{
538-
thread::switch();
538+
let signature = ExecutionState::with(|state| state.current_mut().signature.new_child(caller));
539+
thread::switch(Event::Spawn(&signature));
539540
let task_id = Self::with(|state| {
540541
let schedule_len = CurrentSchedule::len();
541542
let parent_span_id = state.top_level_span.id();
@@ -558,7 +559,7 @@ impl ExecutionState {
558559
schedule_len,
559560
tag,
560561
Some(state.current().id()),
561-
state.current_mut().signature.new_child(caller),
562+
signature,
562563
);
563564

564565
state.tasks.push(task);
@@ -578,7 +579,8 @@ impl ExecutionState {
578579
mut initial_clock: Option<VectorClock>,
579580
caller: &'static Location<'static>,
580581
) -> TaskId {
581-
thread::switch();
582+
let signature = ExecutionState::with(|state| state.current_mut().signature.new_child(caller));
583+
thread::switch(Event::Spawn(&signature));
582584
let task_id = Self::with(|state| {
583585
let parent_span_id = state.top_level_span.id();
584586
let task_id = TaskId(state.tasks.len());
@@ -605,7 +607,7 @@ impl ExecutionState {
605607
CurrentSchedule::len(),
606608
tag,
607609
Some(state.current().id()),
608-
state.current_mut().signature.new_child(caller),
610+
signature,
609611
);
610612
state.tasks.push(task);
611613

@@ -754,6 +756,10 @@ impl ExecutionState {
754756
self.tasks.get(id.0)
755757
}
756758

759+
pub(crate) fn try_current_mut(&mut self) -> Option<&mut Task> {
760+
self.tasks.get_mut(self.current_task.id()?.0)
761+
}
762+
757763
pub(crate) fn in_cleanup(&self) -> bool {
758764
self.in_cleanup
759765
}
@@ -885,7 +891,7 @@ impl ExecutionState {
885891
.scheduler
886892
.borrow_mut()
887893
.next_task(task_refs, self.current_task.id(), is_yielding)
888-
.map(ScheduledTask::Some)
894+
.map(|task| ScheduledTask::Some(task.id()))
889895
.unwrap_or(ScheduledTask::Stopped);
890896

891897
// Tracing this `in_scope` is purely a matter of taste. We do it because

shuttle/src/runtime/runner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,12 @@ impl<S: Scheduler> Scheduler for PortfolioStoppableScheduler<S> {
237237
}
238238
}
239239

240-
fn next_task(
240+
fn next_task<'a>(
241241
&mut self,
242-
runnable_tasks: &[&Task],
242+
runnable_tasks: &'a [&'a Task],
243243
current_task: Option<TaskId>,
244244
is_yielding: bool,
245-
) -> Option<TaskId> {
245+
) -> Option<&'a Task> {
246246
if self.stop_signal.load(Ordering::SeqCst) {
247247
None
248248
} else {

shuttle/src/runtime/task/mod.rs

Lines changed: 143 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,132 @@ impl PartialEq for TaskSignature {
251251

252252
impl Eq for TaskSignature {}
253253

254+
pub(crate) type Loc = &'static Location<'static>;
255+
256+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
257+
pub(crate) enum Event<'a> {
258+
AtomicRead(&'a ResourceSignature, Loc),
259+
AtomicWrite(&'a ResourceSignature, Loc),
260+
AtomicReadWrite(&'a ResourceSignature, Loc),
261+
BatchSemaphoreAcq(&'a ResourceSignature, Loc),
262+
BatchSemaphoreRel(&'a ResourceSignature, Loc),
263+
BarrierWait(&'a ResourceSignature, Loc),
264+
CondvarWait(&'a ResourceSignature, Loc),
265+
CondvarNotify(Loc),
266+
Park(Loc),
267+
Unpark(&'a TaskSignature, Loc),
268+
ChannelSend(&'a ResourceSignature, Loc),
269+
ChannelRecv(&'a ResourceSignature, Loc),
270+
Spawn(&'a TaskSignature),
271+
Yield(Loc),
272+
Sleep(Loc),
273+
Exit,
274+
Join(&'a TaskSignature, Loc),
275+
Unknown,
276+
}
277+
278+
impl<'a> std::fmt::Display for Event<'a> {
279+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280+
match self {
281+
Event::AtomicRead(_, loc) => write!(f, "AtomicRead at {}", loc),
282+
Event::AtomicWrite(_, loc) => write!(f, "AtomicWrite at {}", loc),
283+
Event::AtomicReadWrite(_, loc) => write!(f, "AtomicReadWrite at {}", loc),
284+
Event::BatchSemaphoreAcq(_, loc) => write!(f, "BatchSemaphoreAcq at {}", loc),
285+
Event::BatchSemaphoreRel(_, loc) => write!(f, "BatchSemaphoreRel at {}", loc),
286+
Event::BarrierWait(_, loc) => write!(f, "BarrierWait at {}", loc),
287+
Event::CondvarWait(_, loc) => write!(f, "CondvarWait at {}", loc),
288+
Event::CondvarNotify(loc) => write!(f, "CondvarNotify at {}", loc),
289+
Event::Park(loc) => write!(f, "Park at {}", loc),
290+
Event::Unpark(_, loc) => write!(f, "Unpark at {}", loc),
291+
Event::ChannelSend(_, loc) => write!(f, "ChannelSend at {}", loc),
292+
Event::ChannelRecv(_, loc) => write!(f, "ChannelRecv at {}", loc),
293+
Event::Spawn(sig) => write!(f, "Spawn at {}", sig.task_creation_stack.last().unwrap().0),
294+
Event::Yield(loc) => write!(f, "Yield at {}", loc),
295+
Event::Sleep(loc) => write!(f, "Sleep at {}", loc),
296+
Event::Exit => write!(f, "Exit"),
297+
Event::Join(_, loc) => write!(f, "Join at {}", loc),
298+
Event::Unknown => write!(f, "Unknown"),
299+
}
300+
}
301+
}
302+
303+
impl<'a> Event<'a> {
304+
#[track_caller]
305+
pub(crate) fn atomic_read(sig: &'a ResourceSignature) -> Self {
306+
Self::AtomicRead(sig, Location::caller())
307+
}
308+
309+
#[track_caller]
310+
pub(crate) fn atomic_write(sig: &'a ResourceSignature) -> Self {
311+
Self::AtomicWrite(sig, Location::caller())
312+
}
313+
314+
#[track_caller]
315+
pub(crate) fn atomic_read_write(sig: &'a ResourceSignature) -> Self {
316+
Self::AtomicReadWrite(sig, Location::caller())
317+
}
318+
319+
#[track_caller]
320+
pub(crate) fn batch_semaphore_acq(sig: &'a ResourceSignature) -> Self {
321+
Self::BatchSemaphoreAcq(sig, Location::caller())
322+
}
323+
324+
#[track_caller]
325+
pub(crate) fn batch_semaphore_rel(sig: &'a ResourceSignature) -> Self {
326+
Self::BatchSemaphoreRel(sig, Location::caller())
327+
}
328+
329+
#[track_caller]
330+
pub(crate) fn barrier_wait(sig: &'a ResourceSignature) -> Self {
331+
Self::BarrierWait(sig, Location::caller())
332+
}
333+
334+
#[track_caller]
335+
pub(crate) fn condvar_wait(sig: &'a ResourceSignature) -> Self {
336+
Self::CondvarWait(sig, Location::caller())
337+
}
338+
339+
#[track_caller]
340+
pub(crate) fn condvar_notify() -> Self {
341+
Self::CondvarNotify(Location::caller())
342+
}
343+
344+
#[track_caller]
345+
pub(crate) fn park() -> Self {
346+
Self::Park(Location::caller())
347+
}
348+
349+
#[track_caller]
350+
pub(crate) fn unpark(sig: &'a TaskSignature) -> Self {
351+
Self::Unpark(sig, Location::caller())
352+
}
353+
354+
#[track_caller]
355+
pub(crate) fn channel_send(sig: &'a ResourceSignature) -> Self {
356+
Self::ChannelSend(sig, Location::caller())
357+
}
358+
359+
#[track_caller]
360+
pub(crate) fn channel_recv(sig: &'a ResourceSignature) -> Self {
361+
Self::ChannelRecv(sig, Location::caller())
362+
}
363+
364+
#[track_caller]
365+
pub(crate) fn yield_now() -> Self {
366+
Self::Yield(Location::caller())
367+
}
368+
369+
#[track_caller]
370+
pub(crate) fn sleep() -> Self {
371+
Self::Sleep(Location::caller())
372+
}
373+
374+
#[track_caller]
375+
pub(crate) fn join(sig: &'a TaskSignature) -> Self {
376+
Self::Join(sig, Location::caller())
377+
}
378+
}
379+
254380
/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
255381
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
256382
#[derive(Debug)]
@@ -271,6 +397,8 @@ pub struct Task {
271397
// Remember whether the waker was invoked while we were running
272398
woken: bool,
273399

400+
next_event: Event<'static>,
401+
274402
name: Option<String>,
275403

276404
local_storage: StorageMap,
@@ -342,6 +470,7 @@ impl Task {
342470
waiter: None,
343471
waker,
344472
woken: false,
473+
next_event: Event::Unknown,
345474
detached: false,
346475
park_state: ParkState::default(),
347476
name,
@@ -416,7 +545,7 @@ impl Task {
416545
let cx = &mut Context::from_waker(&waker);
417546
while future.as_mut().poll(cx).is_pending() {
418547
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
419-
thread::switch();
548+
thread::switch_keep_event();
420549
}
421550
}),
422551
stack_size,
@@ -664,6 +793,19 @@ impl Task {
664793
}
665794
)
666795
}
796+
797+
/// Get the next_event with a downcast lifetime tied to self.
798+
pub(crate) fn next_event(&self) -> &Event<'_> {
799+
unsafe { std::mem::transmute(&self.next_event) }
800+
}
801+
802+
pub(crate) unsafe fn set_next_event(&mut self, event: Event<'_>) {
803+
self.next_event = unsafe { std::mem::transmute::<Event<'_>, Event<'static>>(event) };
804+
}
805+
806+
pub(crate) fn unset_next_event(&mut self) {
807+
self.next_event = Event::Unknown;
808+
}
667809
}
668810

669811
#[derive(PartialEq, Eq, Clone, Copy, Debug)]

0 commit comments

Comments
 (0)