Skip to content

Commit 1b7b3b8

Browse files
committed
Add callSoonThreadSafe
1 parent ecc0944 commit 1b7b3b8

File tree

4 files changed

+60
-6
lines changed

4 files changed

+60
-6
lines changed

src/asynchronous/events.d

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -395,15 +395,17 @@ abstract class EventLoop
395395
return callback;
396396
}
397397

398-
/+
399398
/**
400399
* Like $(D_PSYMBOL call_soon()), but thread safe.
401400
*/
402-
final auto callSoonThreadsafe(alias callback, Args...)(Args args)
401+
final auto callSoonThreadSafe(Dg, Args...)(Dg dg, Args args)
403402
{
404-
return callLater!callback(0, args);
403+
auto callback = new Callback!(Dg, Args)(this, dg, args);
404+
405+
scheduleCallbackThreadSafe(callback);
406+
407+
return callback;
405408
}
406-
+/
407409

408410
// Delayed calls
409411

@@ -1180,6 +1182,8 @@ protected:
11801182

11811183
void scheduleCallback(CallbackHandle callback);
11821184

1185+
void scheduleCallbackThreadSafe(CallbackHandle callback);
1186+
11831187
void scheduleCallback(Duration delay, CallbackHandle callback);
11841188

11851189
void scheduleCallback(SysTime when, CallbackHandle callback);

src/asynchronous/libasync/events.d

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import std.string;
1010
import std.traits;
1111
import std.typecons;
1212
import libasync.events : EventLoop_ = EventLoop, NetworkAddress;
13+
import libasync.signal : AsyncSignal;
1314
import libasync.timer : AsyncTimer;
1415
import libasync.tcp : AsyncTCPConnection, AsyncTCPListener, TCPEvent;
1516
import libasync.threads : destroyAsyncThreads, gs_threads;
@@ -39,6 +40,9 @@ package class LibasyncEventLoop : EventLoop
3940
private Appender!(CallbackHandle[]) nextCallbacks2;
4041
private Appender!(CallbackHandle[])* currentAppender;
4142

43+
private Appender!(CallbackHandle[]) nextThreadSafeCallbacks;
44+
private shared AsyncSignal newThreadSafeCallbacks;
45+
4246
private LibasyncTransport[] pendingConnections;
4347

4448
alias Listener = Tuple!(ServerImpl, "server", AsyncTCPListener, "listener");
@@ -50,6 +54,9 @@ package class LibasyncEventLoop : EventLoop
5054
this.eventLoop = new EventLoop_;
5155
this.timers = new Timers(this.eventLoop);
5256
this.currentAppender = &nextCallbacks1;
57+
58+
this.newThreadSafeCallbacks = new shared AsyncSignal(this.eventLoop);
59+
this.newThreadSafeCallbacks.run(&scheduleThreadSafeCallbacks);
5360
}
5461

5562
override void runForever()
@@ -97,6 +104,27 @@ package class LibasyncEventLoop : EventLoop
97104
currentAppender.put(callback);
98105
}
99106

107+
private void scheduleThreadSafeCallbacks()
108+
{
109+
synchronized (this)
110+
{
111+
foreach (callback; nextThreadSafeCallbacks.data)
112+
scheduleCallback(callback);
113+
114+
nextThreadSafeCallbacks.clear;
115+
}
116+
}
117+
118+
override void scheduleCallbackThreadSafe(CallbackHandle callback)
119+
{
120+
synchronized (this)
121+
{
122+
nextThreadSafeCallbacks ~= callback;
123+
}
124+
125+
newThreadSafeCallbacks.trigger;
126+
}
127+
100128
override void scheduleCallback(Duration delay, CallbackHandle callback)
101129
{
102130
if (delay <= Duration.zero)

test/tcp.d

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ import std.array;
33
import std.conv;
44
import std.datetime;
55
import std.string;
6-
76
import asynchronous;
87

9-
108
class TestHelper
119
{
1210
private Appender!(string[]) actualEvents_;

test/threads.d

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import core.thread;
2+
import std.datetime : msecs;
3+
import asynchronous;
4+
5+
unittest
6+
{
7+
auto loop = getEventLoop;
8+
auto future = new Future!int;
9+
10+
void stopLoop()
11+
{
12+
future.setResult(10);
13+
}
14+
15+
auto thread1 = new Thread({
16+
Thread.sleep(2.msecs);
17+
loop.callSoonThreadSafe(&stopLoop);
18+
});
19+
thread1.start;
20+
21+
loop.runUntilComplete(future);
22+
assert(future.done);
23+
assert(future.result == 10);
24+
}

0 commit comments

Comments
 (0)