Skip to content

Commit ae1ff6e

Browse files
Make Async::Queue thread safe.
1 parent 868b69c commit ae1ff6e

File tree

1 file changed

+45
-82
lines changed

1 file changed

+45
-82
lines changed

lib/async/queue.rb

Lines changed: 45 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
require_relative "notification"
1111

1212
module Async
13-
# A queue which allows items to be processed in order.
13+
# A thread-safe queue which allows items to be processed in order.
14+
#
15+
# This implementation uses Thread::Queue internally for thread safety while
16+
# maintaining compatibility with the fiber scheduler.
1417
#
1518
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
1619
#
@@ -21,53 +24,42 @@ class Queue
2124
class ClosedError < RuntimeError
2225
end
2326

24-
# Create a new queue.
27+
# Create a new thread-safe queue.
2528
#
2629
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
27-
# @parameter available [Notification] The notification to use for signaling when items are available.
28-
def initialize(parent: nil, available: Notification.new)
29-
@items = []
30-
@closed = false
30+
# @parameter available [Notification] The notification to use for signaling when items are available. (ignored, for compatibility)
31+
def initialize(parent: nil, available: nil, queue: Thread::Queue.new)
32+
@queue = queue
3133
@parent = parent
32-
@available = available
3334
end
3435

3536
# @returns [Boolean] Whether the queue is closed.
3637
def closed?
37-
@closed
38+
@queue.closed?
3839
end
3940

4041
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
4142
def close
42-
@closed = true
43-
44-
while @available.waiting?
45-
@available.signal(nil)
46-
end
43+
@queue.close
4744
end
4845

49-
# @attribute [Array] The items in the queue.
50-
attr :items
51-
5246
# @returns [Integer] The number of items in the queue.
5347
def size
54-
@items.size
48+
@queue.size
5549
end
5650

5751
# @returns [Boolean] Whether the queue is empty.
5852
def empty?
59-
@items.empty?
53+
@queue.empty?
6054
end
6155

6256
# Add an item to the queue.
6357
def push(item)
64-
if @closed
58+
if @queue.closed?
6559
raise ClosedError, "Cannot push items to a closed queue."
6660
end
6761

68-
@items << item
69-
70-
@available.signal unless self.empty?
62+
@queue.push(item)
7163
end
7264

7365
# Compatibility with {::Queue#push}.
@@ -77,26 +69,30 @@ def <<(item)
7769

7870
# Add multiple items to the queue.
7971
def enqueue(*items)
80-
if @closed
72+
if @queue.closed?
8173
raise ClosedError, "Cannot enqueue items to a closed queue."
8274
end
8375

84-
@items.concat(items)
85-
86-
@available.signal unless self.empty?
76+
items.each { |item| @queue.push(item) }
8777
end
8878

8979
# Remove and return the next item from the queue.
9080
def dequeue
91-
while @items.empty?
92-
if @closed
93-
return nil
94-
end
81+
return nil if @queue.closed? && @queue.empty?
82+
83+
begin
84+
# Try non-blocking first
85+
@queue.pop(true)
86+
rescue ThreadError
87+
# Queue is empty, check if closed
88+
return nil if @queue.closed?
9589

96-
@available.wait
90+
# Use blocking pop - the fiber scheduler will handle this properly
91+
# in Ruby's fiber scheduler implementation
92+
@queue.pop(false)
9793
end
98-
99-
@items.shift
94+
rescue ClosedQueueError
95+
nil
10096
end
10197

10298
# Compatibility with {::Queue#pop}.
@@ -136,7 +132,7 @@ def wait
136132
end
137133
end
138134

139-
# A queue which limits the number of items that can be enqueued.
135+
# A thread-safe queue which limits the number of items that can be enqueued.
140136
# @public Since *Async v1*.
141137
class LimitedQueue < Queue
142138
# @private This exists purely for emitting a warning.
@@ -149,30 +145,19 @@ def self.new(...)
149145
# Create a new limited queue.
150146
#
151147
# @parameter limit [Integer] The maximum number of items that can be enqueued.
152-
# @parameter full [Notification] The notification to use for signaling when the queue is full.
153-
def initialize(limit = 1, full: Notification.new, **options)
154-
super(**options)
155-
156-
@limit = limit
157-
@full = full
148+
# @parameter full [Notification] The notification to use for signaling when the queue is full. (ignored, for compatibility)
149+
def initialize(limit = 1, full: nil, **options)
150+
super(**options, queue: Thread::SizedQueue.new(limit))
158151
end
159152

160153
# @attribute [Integer] The maximum number of items that can be enqueued.
161-
attr :limit
162-
163-
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
164-
# Also signals all tasks waiting for the queue to be full.
165-
def close
166-
super
167-
168-
while @full.waiting?
169-
@full.signal(nil)
170-
end
154+
def limit
155+
@queue.max
171156
end
172157

173158
# @returns [Boolean] Whether trying to enqueue an item would block.
174159
def limited?
175-
!@closed && @items.size >= @limit
160+
!@queue.closed? && @queue.size >= @queue.max
176161
end
177162

178163
# Add an item to the queue.
@@ -181,46 +166,24 @@ def limited?
181166
#
182167
# @parameter item [Object] The item to add to the queue.
183168
def push(item)
184-
while limited?
185-
@full.wait
169+
if @queue.closed?
170+
raise ClosedError, "Cannot push items to a closed queue."
186171
end
187172

188-
super
173+
begin
174+
@queue.push(item) # This will block if queue is full
175+
rescue ClosedQueueError
176+
raise ClosedError, "Cannot push items to a closed queue."
177+
end
189178
end
190179

191180
# Add multiple items to the queue.
192181
#
193-
# If the queue is full, this method will block until there is space available.
182+
# If the queue is full, this method will block until there is space available.
194183
#
195184
# @parameter items [Array] The items to add to the queue.
196185
def enqueue(*items)
197-
while !items.empty?
198-
while limited?
199-
@full.wait
200-
end
201-
202-
if @closed
203-
raise ClosedError, "Cannot enqueue items to a closed queue."
204-
end
205-
206-
available = @limit - @items.size
207-
@items.concat(items.shift(available))
208-
209-
@available.signal unless self.empty?
210-
end
211-
end
212-
213-
# Remove and return the next item from the queue.
214-
#
215-
# If the queue is empty, this method will block until an item is available.
216-
#
217-
# @returns [Object] The next item in the queue.
218-
def dequeue
219-
item = super
220-
221-
@full.signal
222-
223-
return item
186+
items.each { |item| push(item) }
224187
end
225188
end
226189
end

0 commit comments

Comments
 (0)