Skip to content

Commit 225d038

Browse files
committed
Move LimitedQueue into limited_queue.rb.
1 parent d1b1066 commit 225d038

File tree

2 files changed

+93
-76
lines changed

2 files changed

+93
-76
lines changed

lib/async/limited_queue.rb

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,91 @@
33
# Released under the MIT License.
44
# Copyright, 2025, by Samuel Williams.
55

6-
# The implementation lives in `queue.rb` but later we may move it here for better autoload/inference.
76
require_relative "queue"
7+
8+
module Async
9+
# A queue which limits the number of items that can be enqueued.
10+
# @public Since *Async v1*.
11+
class LimitedQueue < Queue
12+
def self.new(...)
13+
super
14+
end
15+
16+
# Create a new limited queue.
17+
#
18+
# @parameter limit [Integer] The maximum number of items that can be enqueued.
19+
# @parameter full [Notification] The notification to use for signaling when the queue is full.
20+
def initialize(limit = 1, full: Notification.new, **options)
21+
super(**options)
22+
23+
@limit = limit
24+
@full = full
25+
end
26+
27+
# @attribute [Integer] The maximum number of items that can be enqueued.
28+
attr :limit
29+
30+
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
31+
# Also signals all tasks waiting for the queue to be full.
32+
def close
33+
super
34+
35+
while @full.waiting?
36+
@full.signal(nil)
37+
end
38+
end
39+
40+
# @returns [Boolean] Whether trying to enqueue an item would block.
41+
def limited?
42+
!@closed && @items.size >= @limit
43+
end
44+
45+
# Add an item to the queue.
46+
#
47+
# If the queue is full, this method will block until there is space available.
48+
#
49+
# @parameter item [Object] The item to add to the queue.
50+
def push(item)
51+
while limited?
52+
@full.wait
53+
end
54+
55+
super
56+
end
57+
58+
# Add multiple items to the queue.
59+
#
60+
# If the queue is full, this method will block until there is space available.
61+
#
62+
# @parameter items [Array] The items to add to the queue.
63+
def enqueue(*items)
64+
while !items.empty?
65+
while limited?
66+
@full.wait
67+
end
68+
69+
if @closed
70+
raise ClosedError, "Cannot enqueue items to a closed queue."
71+
end
72+
73+
available = @limit - @items.size
74+
@items.concat(items.shift(available))
75+
76+
@available.signal unless self.empty?
77+
end
78+
end
79+
80+
# Remove and return the next item from the queue.
81+
#
82+
# If the queue is empty, this method will block until an item is available.
83+
#
84+
# @returns [Object] The next item in the queue.
85+
def dequeue
86+
item = super
87+
88+
@full.signal
89+
90+
return item
91+
end
92+
end
93+
end

lib/async/queue.rb

Lines changed: 6 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -130,84 +130,15 @@ def wait
130130
end
131131
end
132132

133-
# A queue which limits the number of items that can be enqueued.
134-
# @public Since *Async v1*.
133+
# @private
135134
class LimitedQueue < Queue
136-
# Create a new limited queue.
137-
#
138-
# @parameter limit [Integer] The maximum number of items that can be enqueued.
139-
# @parameter full [Notification] The notification to use for signaling when the queue is full.
140-
def initialize(limit = 1, full: Notification.new, **options)
141-
super(**options)
142-
143-
@limit = limit
144-
@full = full
145-
end
146-
147-
# @attribute [Integer] The maximum number of items that can be enqueued.
148-
attr :limit
149-
150-
# Close the queue, causing all waiting tasks to return `nil`. Any subsequent calls to {enqueue} will raise an exception.
151-
# Also signals all tasks waiting for the queue to be full.
152-
def close
153-
super
154-
155-
while @full.waiting?
156-
@full.signal(nil)
157-
end
158-
end
159-
160-
# @returns [Boolean] Whether trying to enqueue an item would block.
161-
def limited?
162-
!@closed && @items.size >= @limit
163-
end
164-
165-
# Add an item to the queue.
166-
#
167-
# If the queue is full, this method will block until there is space available.
168-
#
169-
# @parameter item [Object] The item to add to the queue.
170-
def push(item)
171-
while limited?
172-
@full.wait
173-
end
174-
175-
super
176-
end
177-
178-
# Add multiple items to the queue.
179-
#
180-
# If the queue is full, this method will block until there is space available.
181-
#
182-
# @parameter items [Array] The items to add to the queue.
183-
def enqueue(*items)
184-
while !items.empty?
185-
while limited?
186-
@full.wait
187-
end
188-
189-
if @closed
190-
raise ClosedError, "Cannot enqueue items to a closed queue."
191-
end
192-
193-
available = @limit - @items.size
194-
@items.concat(items.shift(available))
195-
196-
@available.signal unless self.empty?
197-
end
198-
end
199-
200-
# Remove and return the next item from the queue.
201-
#
202-
# If the queue is empty, this method will block until an item is available.
203-
#
204-
# @returns [Object] The next item in the queue.
205-
def dequeue
206-
item = super
135+
# This gets redefined if you load `async/limited_queue`.
136+
def self.new(...)
137+
warn "Use `require 'async/limited_queue'` to use `Async::LimitedQueue` without a warning.", uplevel: 1
207138

208-
@full.signal
139+
require_relative "limited_queue"
209140

210-
return item
141+
super(...)
211142
end
212143
end
213144
end

0 commit comments

Comments
 (0)