Skip to content

Commit 68f6332

Browse files
author
Dylan Wolff
committed
Adding semantic next_event information to Task struct
1 parent 25c8836 commit 68f6332

29 files changed

+496
-120
lines changed

shuttle/src/future/batch_semaphore.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
//! A counting semaphore supporting both async and sync operations.
22
use crate::current;
33
use crate::runtime::execution::ExecutionState;
4-
use crate::runtime::task::{clock::VectorClock, TaskId};
4+
use crate::runtime::task::{clock::VectorClock, Event, TaskId};
55
use crate::runtime::thread;
66
use crate::sync::{ResourceSignature, ResourceType};
77
use std::cell::RefCell;
@@ -289,7 +289,6 @@ impl BatchSemaphoreState {
289289
pub struct BatchSemaphore {
290290
state: RefCell<BatchSemaphoreState>,
291291
fairness: Fairness,
292-
#[allow(unused)]
293292
signature: ResourceSignature,
294293
}
295294

@@ -392,8 +391,9 @@ impl BatchSemaphore {
392391

393392
/// Closes the semaphore. This prevents the semaphore from issuing new
394393
/// permits and notifies all pending waiters.
394+
#[track_caller]
395395
pub fn close(&self) {
396-
thread::switch_task();
396+
thread::switch(Event::batch_semaphore_rel(&self.signature));
397397

398398
self.init_object_id();
399399
let mut state = self.state.borrow_mut();
@@ -436,8 +436,9 @@ impl BatchSemaphore {
436436
/// If the permits are available, returns Ok(())
437437
/// If the semaphore is closed, returns `Err(TryAcquireError::Closed)`
438438
/// If there aren't enough permits, returns `Err(TryAcquireError::NoPermits)`
439+
#[track_caller]
439440
pub fn try_acquire(&self, num_permits: usize) -> Result<(), TryAcquireError> {
440-
thread::switch_task();
441+
thread::switch(Event::batch_semaphore_acq(&self.signature));
441442

442443
self.init_object_id();
443444
let mut state = self.state.borrow_mut();
@@ -538,22 +539,25 @@ impl BatchSemaphore {
538539
}
539540

540541
/// Acquire the specified number of permits (async API)
542+
#[track_caller]
541543
pub fn acquire(&self, num_permits: usize) -> Acquire<'_> {
542544
// No switch here; switch should be triggered on polling future
543545
self.init_object_id();
544546
Acquire::new(self, num_permits)
545547
}
546548

547549
/// Acquire the specified number of permits (blocking API)
550+
#[track_caller]
548551
pub fn acquire_blocking(&self, num_permits: usize) -> Result<(), AcquireError> {
549552
// No switch here; switch should be triggered on polling future
550553
self.init_object_id();
551554
crate::future::block_on(self.acquire(num_permits))
552555
}
553556

554557
/// Release `num_permits` back to the Semaphore
558+
#[track_caller]
555559
pub fn release(&self, num_permits: usize) {
556-
thread::switch_task();
560+
thread::switch(Event::batch_semaphore_rel(&self.signature));
557561

558562
self.init_object_id();
559563
if num_permits == 0 {
@@ -645,6 +649,7 @@ pub struct Acquire<'a> {
645649
}
646650

647651
impl<'a> Acquire<'a> {
652+
#[track_caller]
648653
fn new(semaphore: &'a BatchSemaphore, num_permits: usize) -> Self {
649654
let waiter = Arc::new(Waiter::new(num_permits));
650655
Self {
@@ -675,7 +680,7 @@ impl Future for Acquire<'_> {
675680
// available, the extra waiter doesn't affect other tasks -- all waiters and active tasks will race
676681
// to acquire permits when the current holder releases.
677682
if self.never_polled && (will_succeed || blocking_changes_state) {
678-
thread::switch_task();
683+
thread::switch(Event::batch_semaphore_acq(&self.semaphore.signature));
679684
self.never_polled = false;
680685
}
681686

@@ -764,12 +769,25 @@ impl Future for Acquire<'_> {
764769
self.waiter.is_queued.store(true, Ordering::SeqCst);
765770
}
766771
trace!("Acquire::poll for waiter {:?} that is enqueued", self.waiter);
772+
773+
let event = Event::batch_semaphore_acq(&self.semaphore.signature);
774+
ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) });
775+
// SAFETY: This is safe because the current task immediately suspends after this future
776+
// returns Poll::Pending (src/future/mod.rs). Whenever a task resumes, the `next_event`
777+
// is unset, so there is no opportunity to corrupt the reference to our signature while
778+
// it is set as the `next_task`.
767779
Poll::Pending
768780
}
769781
Err(TryAcquireError::Closed) => unreachable!(),
770782
}
771783
} else {
772784
// No progress made, future is still pending.
785+
let event = Event::batch_semaphore_acq(&self.semaphore.signature);
786+
// SAFETY: This is safe because the current task immediately suspends after this future
787+
// returns Poll::Pending (src/future/mod.rs). Whenever a task resumes, the `next_event`
788+
// is unset, so there is no opportunity to corrupt the reference to our signature while
789+
// it is set as the `next_task`.
790+
ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) });
773791
Poll::Pending
774792
}
775793
}

shuttle/src/future/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ pub fn block_on<F: Future>(future: F) -> F::Output {
251251
Poll::Ready(result) => break result,
252252
Poll::Pending => {
253253
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
254-
thread::switch_task();
254+
thread::switch_keep_event();
255255
}
256256
}
257257
}

shuttle/src/runtime/execution.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::runtime::failure::{init_panic_hook, persist_failure, persist_task_fai
22
use crate::runtime::storage::{StorageKey, StorageMap};
33
use crate::runtime::task::clock::VectorClock;
44
use crate::runtime::task::labels::Labels;
5-
use crate::runtime::task::{ChildLabelFn, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS};
5+
use crate::runtime::task::{ChildLabelFn, Event, Task, TaskId, TaskName, TaskSignature, DEFAULT_INLINE_TASKS};
66
use crate::runtime::thread;
77
use crate::runtime::thread::continuation::PooledContinuation;
88
use crate::scheduler::{Schedule, Scheduler};
@@ -472,7 +472,8 @@ impl ExecutionState {
472472
where
473473
F: Future<Output = ()> + 'static,
474474
{
475-
thread::switch_task();
475+
let signature = ExecutionState::with(|state| state.current_mut().signature.new_child(caller));
476+
thread::switch(Event::Spawn(&signature));
476477
let task_id = Self::with(|state| {
477478
let schedule_len = state.current_schedule.len();
478479
let parent_span_id = state.top_level_span.id();
@@ -495,7 +496,7 @@ impl ExecutionState {
495496
schedule_len,
496497
tag,
497498
Some(state.current().id()),
498-
state.current_mut().signature.new_child(caller),
499+
signature,
499500
);
500501

501502
state.tasks.push(task);
@@ -515,7 +516,8 @@ impl ExecutionState {
515516
mut initial_clock: Option<VectorClock>,
516517
caller: &'static Location<'static>,
517518
) -> TaskId {
518-
thread::switch_task();
519+
let signature = ExecutionState::with(|state| state.current_mut().signature.new_child(caller));
520+
thread::switch(Event::Spawn(&signature));
519521
let task_id = Self::with(|state| {
520522
let parent_span_id = state.top_level_span.id();
521523
let task_id = TaskId(state.tasks.len());
@@ -544,7 +546,7 @@ impl ExecutionState {
544546
schedule_len,
545547
tag,
546548
Some(state.current().id()),
547-
state.current_mut().signature.new_child(caller),
549+
signature,
548550
);
549551
state.tasks.push(task);
550552

@@ -819,7 +821,7 @@ impl ExecutionState {
819821
.scheduler
820822
.borrow_mut()
821823
.next_task(task_refs, self.current_task.id(), is_yielding)
822-
.map(ScheduledTask::Some)
824+
.map(|task| ScheduledTask::Some(task.id()))
823825
.unwrap_or(ScheduledTask::Stopped);
824826

825827
// Tracing this `in_scope` is purely a matter of taste. We do it because

shuttle/src/runtime/runner.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,12 @@ impl<S: Scheduler> Scheduler for PortfolioStoppableScheduler<S> {
237237
}
238238
}
239239

240-
fn next_task(
240+
fn next_task<'a>(
241241
&mut self,
242-
runnable_tasks: &[&Task],
242+
runnable_tasks: &'a [&'a Task],
243243
current_task: Option<TaskId>,
244244
is_yielding: bool,
245-
) -> Option<TaskId> {
245+
) -> Option<&'a Task> {
246246
if self.stop_signal.load(Ordering::SeqCst) {
247247
None
248248
} else {

shuttle/src/runtime/task/mod.rs

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,107 @@ impl PartialEq for TaskSignature {
251251

252252
impl Eq for TaskSignature {}
253253

254+
pub(crate) type Loc = &'static Location<'static>;
255+
256+
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
257+
pub(crate) enum Event<'a> {
258+
AtomicRead(&'a ResourceSignature, Loc),
259+
AtomicWrite(&'a ResourceSignature, Loc),
260+
AtomicReadWrite(&'a ResourceSignature, Loc),
261+
BatchSemaphoreAcq(&'a ResourceSignature, Loc),
262+
BatchSemaphoreRel(&'a ResourceSignature, Loc),
263+
BarrierWait(&'a ResourceSignature, Loc),
264+
CondvarWait(&'a ResourceSignature, Loc),
265+
CondvarNotify(Loc),
266+
Park(Loc),
267+
Unpark(&'a TaskSignature, Loc),
268+
ChannelSend(&'a ResourceSignature, Loc),
269+
ChannelRecv(&'a ResourceSignature, Loc),
270+
Spawn(&'a TaskSignature),
271+
Yield(Loc),
272+
Sleep(Loc),
273+
Exit,
274+
Join(&'a TaskSignature, Loc),
275+
Unknown,
276+
}
277+
278+
impl<'a> Event<'a> {
279+
#[track_caller]
280+
pub(crate) fn atomic_read(sig: &'a ResourceSignature) -> Self {
281+
Self::AtomicRead(sig, Location::caller())
282+
}
283+
284+
#[track_caller]
285+
pub(crate) fn atomic_write(sig: &'a ResourceSignature) -> Self {
286+
Self::AtomicWrite(sig, Location::caller())
287+
}
288+
289+
#[track_caller]
290+
pub(crate) fn atomic_read_write(sig: &'a ResourceSignature) -> Self {
291+
Self::AtomicReadWrite(sig, Location::caller())
292+
}
293+
294+
#[track_caller]
295+
pub(crate) fn batch_semaphore_acq(sig: &'a ResourceSignature) -> Self {
296+
Self::BatchSemaphoreAcq(sig, Location::caller())
297+
}
298+
299+
#[track_caller]
300+
pub(crate) fn batch_semaphore_rel(sig: &'a ResourceSignature) -> Self {
301+
Self::BatchSemaphoreRel(sig, Location::caller())
302+
}
303+
304+
#[track_caller]
305+
pub(crate) fn barrier_wait(sig: &'a ResourceSignature) -> Self {
306+
Self::BarrierWait(sig, Location::caller())
307+
}
308+
309+
#[track_caller]
310+
pub(crate) fn condvar_wait(sig: &'a ResourceSignature) -> Self {
311+
Self::CondvarWait(sig, Location::caller())
312+
}
313+
314+
#[track_caller]
315+
pub(crate) fn condvar_notify() -> Self {
316+
Self::CondvarNotify(Location::caller())
317+
}
318+
319+
#[track_caller]
320+
pub(crate) fn park() -> Self {
321+
Self::Park(Location::caller())
322+
}
323+
324+
#[track_caller]
325+
pub(crate) fn unpark(sig: &'a TaskSignature) -> Self {
326+
Self::Unpark(sig, Location::caller())
327+
}
328+
329+
#[track_caller]
330+
pub(crate) fn channel_send(sig: &'a ResourceSignature) -> Self {
331+
Self::ChannelSend(sig, Location::caller())
332+
}
333+
334+
#[track_caller]
335+
pub(crate) fn channel_recv(sig: &'a ResourceSignature) -> Self {
336+
Self::ChannelRecv(sig, Location::caller())
337+
}
338+
339+
#[track_caller]
340+
pub(crate) fn yield_now() -> Self {
341+
Self::Yield(Location::caller())
342+
}
343+
344+
#[track_caller]
345+
pub(crate) fn sleep() -> Self {
346+
Self::Sleep(Location::caller())
347+
}
348+
349+
#[track_caller]
350+
pub(crate) fn join(sig: &'a TaskSignature) -> Self {
351+
Self::Join(sig, Location::caller())
352+
}
353+
}
354+
254355
/// A `Task` represents a user-level unit of concurrency. Each task has an `id` that is unique within
255356
/// the execution, and a `state` reflecting whether the task is runnable (enabled) or not.
256357
#[derive(Debug)]
@@ -271,6 +372,8 @@ pub struct Task {
271372
// Remember whether the waker was invoked while we were running
272373
woken: bool,
273374

375+
next_event: Event<'static>,
376+
274377
name: Option<String>,
275378

276379
local_storage: StorageMap,
@@ -342,6 +445,7 @@ impl Task {
342445
waiter: None,
343446
waker,
344447
woken: false,
448+
next_event: Event::Unknown,
345449
detached: false,
346450
park_state: ParkState::default(),
347451
name,
@@ -416,7 +520,7 @@ impl Task {
416520
let cx = &mut Context::from_waker(&waker);
417521
while future.as_mut().poll(cx).is_pending() {
418522
ExecutionState::with(|state| state.current_mut().sleep_unless_woken());
419-
thread::switch_task();
523+
thread::switch_keep_event();
420524
}
421525
}),
422526
stack_size,
@@ -651,6 +755,19 @@ impl Task {
651755
TASK_ID_TO_TAGS.with(|cell| cell.borrow_mut().insert(self.id(), tag.clone()));
652756
self.tag.replace(tag)
653757
}
758+
759+
/// Get the next_event with a downcast lifetime tied to self.
760+
pub(crate) fn next_event(&self) -> &Event<'_> {
761+
unsafe { std::mem::transmute(&self.next_event) }
762+
}
763+
764+
pub(crate) unsafe fn set_next_event(&mut self, event: Event<'_>) {
765+
self.next_event = unsafe { std::mem::transmute::<Event<'_>, Event<'static>>(event) };
766+
}
767+
768+
pub(crate) fn unset_next_event(&mut self) {
769+
self.next_event = Event::Unknown;
770+
}
654771
}
655772

656773
#[derive(PartialEq, Eq, Clone, Copy, Debug)]

shuttle/src/runtime/thread/continuation.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#![allow(deprecated)]
66

77
use crate::runtime::execution::ExecutionState;
8+
use crate::runtime::task::Event;
89
use generator::{Generator, Gn};
910
use scoped_tls::scoped_thread_local;
1011
use std::cell::{Cell, RefCell};
@@ -277,13 +278,24 @@ unsafe impl Send for PooledContinuation {}
277278
/// is a visible operation, meaning that both scheduling points are necessary for complete
278279
/// exploration of all possible behaviors.
279280
#[track_caller]
280-
pub(crate) fn switch_task() {
281+
pub(crate) fn switch(event: Event<'_>) {
282+
// SAFETY we cast the lifetime of the Event to 'static when embedding it into the current Task
283+
// This is safe because (1) we have a valid reference to the Event's data for the scope of this function
284+
// and (2) the static reference is dropped at the end of the scope of this function when the next_event
285+
// is set back to Unknown
286+
ExecutionState::with(|s| unsafe { s.current_mut().set_next_event(event) });
287+
switch_keep_event()
288+
}
289+
290+
#[track_caller]
291+
pub(crate) fn switch_keep_event() {
281292
crate::annotations::record_tick();
282293
trace!("switch from {}", Location::caller());
283294
if ExecutionState::maybe_yield() {
284295
let r = generator::yield_(ContinuationOutput::Yielded).unwrap();
285296
assert!(matches!(r, ContinuationInput::Resume));
286297
}
298+
ExecutionState::with(|s| s.current_mut().unset_next_event());
287299
}
288300

289301
#[cfg(test)]

shuttle/src/runtime/thread/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
pub(crate) mod continuation;
2-
3-
pub(crate) use continuation::switch_task;
2+
pub(crate) use continuation::switch;
3+
pub(crate) use continuation::switch_keep_event;

0 commit comments

Comments
 (0)