Skip to content

Commit 67c5f44

Browse files
Make Async::Queue thread safe.
1 parent 868b69c commit 67c5f44

File tree

1 file changed

+47
-75
lines changed

1 file changed

+47
-75
lines changed

lib/async/queue.rb

Lines changed: 47 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
require_relative "notification"
1111

1212
module Async
13-
# A queue which allows items to be processed in order.
13+
# A thread-safe queue which allows items to be processed in order.
14+
#
15+
# This implementation uses Thread::Queue internally for thread safety while
16+
# maintaining compatibility with the fiber scheduler.
1417
#
1518
# It has a compatible interface with {Notification} and {Condition}, except that it's multi-value.
1619
#
@@ -21,15 +24,14 @@ class Queue
2124
class ClosedError < RuntimeError
2225
end
2326

24-
# Create a new queue.
27+
# Create a new thread-safe queue.
2528
#
2629
# @parameter parent [Interface(:async) | Nil] The parent task to use for async operations.
27-
# @parameter available [Notification] The notification to use for signaling when items are available.
28-
def initialize(parent: nil, available: Notification.new)
29-
@items = []
30-
@closed = false
30+
# @parameter available [Notification] The notification to use for signaling when items are available. (ignored, for compatibility)
31+
def initialize(parent: nil, available: nil)
32+
@queue = Thread::Queue.new
3133
@parent = parent
32-
@available = available
34+
@closed = false
3335
end
3436

3537
# @returns [Boolean] Whether the queue is closed.
@@ -40,23 +42,23 @@ def closed?
4042
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
4143
def close
4244
@closed = true
43-
44-
while @available.waiting?
45-
@available.signal(nil)
46-
end
45+
@queue.close
4746
end
4847

4948
# @attribute [Array] The items in the queue.
50-
attr :items
49+
# @deprecated Use {#size} instead. This method is provided for compatibility but returns nil.
50+
def items
51+
nil # Thread::Queue doesn't expose internal items
52+
end
5153

5254
# @returns [Integer] The number of items in the queue.
5355
def size
54-
@items.size
56+
@queue.size
5557
end
5658

5759
# @returns [Boolean] Whether the queue is empty.
5860
def empty?
59-
@items.empty?
61+
@queue.empty?
6062
end
6163

6264
# Add an item to the queue.
@@ -65,9 +67,7 @@ def push(item)
6567
raise ClosedError, "Cannot push items to a closed queue."
6668
end
6769

68-
@items << item
69-
70-
@available.signal unless self.empty?
70+
@queue.push(item)
7171
end
7272

7373
# Compatibility with {::Queue#push}.
@@ -81,22 +81,26 @@ def enqueue(*items)
8181
raise ClosedError, "Cannot enqueue items to a closed queue."
8282
end
8383

84-
@items.concat(items)
85-
86-
@available.signal unless self.empty?
84+
items.each {|item| @queue.push(item)}
8785
end
8886

8987
# Remove and return the next item from the queue.
9088
def dequeue
91-
while @items.empty?
92-
if @closed
93-
return nil
94-
end
89+
return nil if @closed && @queue.empty?
90+
91+
begin
92+
# Try non-blocking first
93+
@queue.pop(true)
94+
rescue ThreadError
95+
# Queue is empty, check if closed
96+
return nil if @closed
9597

96-
@available.wait
98+
# Use blocking pop - the fiber scheduler will handle this properly
99+
# in Ruby's fiber scheduler implementation
100+
@queue.pop(false)
97101
end
98-
99-
@items.shift
102+
rescue ClosedQueueError
103+
nil
100104
end
101105

102106
# Compatibility with {::Queue#pop}.
@@ -136,7 +140,7 @@ def wait
136140
end
137141
end
138142

139-
# A queue which limits the number of items that can be enqueued.
143+
# A thread-safe queue which limits the number of items that can be enqueued.
140144
# @public Since *Async v1*.
141145
class LimitedQueue < Queue
142146
# @private This exists purely for emitting a warning.
@@ -149,30 +153,20 @@ def self.new(...)
149153
# Create a new limited queue.
150154
#
151155
# @parameter limit [Integer] The maximum number of items that can be enqueued.
152-
# @parameter full [Notification] The notification to use for signaling when the queue is full.
153-
def initialize(limit = 1, full: Notification.new, **options)
156+
# @parameter full [Notification] The notification to use for signaling when the queue is full. (ignored, for compatibility)
157+
def initialize(limit = 1, full: nil, **options)
154158
super(**options)
155-
156-
@limit = limit
157-
@full = full
159+
@queue = Thread::SizedQueue.new(limit)
158160
end
159161

160162
# @attribute [Integer] The maximum number of items that can be enqueued.
161-
attr :limit
162-
163-
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
164-
# Also signals all tasks waiting for the queue to be full.
165-
def close
166-
super
167-
168-
while @full.waiting?
169-
@full.signal(nil)
170-
end
163+
def limit
164+
@queue.max
171165
end
172166

173167
# @returns [Boolean] Whether trying to enqueue an item would block.
174168
def limited?
175-
!@closed && @items.size >= @limit
169+
!@closed && @queue.size >= @queue.max
176170
end
177171

178172
# Add an item to the queue.
@@ -181,46 +175,24 @@ def limited?
181175
#
182176
# @parameter item [Object] The item to add to the queue.
183177
def push(item)
184-
while limited?
185-
@full.wait
178+
if @closed
179+
raise ClosedError, "Cannot push items to a closed queue."
186180
end
187181

188-
super
182+
begin
183+
@queue.push(item) # This will block if queue is full
184+
rescue ClosedQueueError
185+
raise ClosedError, "Cannot push items to a closed queue."
186+
end
189187
end
190188

191189
# Add multiple items to the queue.
192190
#
193-
# If the queue is full, this method will block until there is space available.
191+
# If the queue is full, this method will block until there is space available.
194192
#
195193
# @parameter items [Array] The items to add to the queue.
196194
def enqueue(*items)
197-
while !items.empty?
198-
while limited?
199-
@full.wait
200-
end
201-
202-
if @closed
203-
raise ClosedError, "Cannot enqueue items to a closed queue."
204-
end
205-
206-
available = @limit - @items.size
207-
@items.concat(items.shift(available))
208-
209-
@available.signal unless self.empty?
210-
end
211-
end
212-
213-
# Remove and return the next item from the queue.
214-
#
215-
# If the queue is empty, this method will block until an item is available.
216-
#
217-
# @returns [Object] The next item in the queue.
218-
def dequeue
219-
item = super
220-
221-
@full.signal
222-
223-
return item
195+
items.each {|item| push(item)}
224196
end
225197
end
226198
end

0 commit comments

Comments
 (0)