1
1
package blocking_dequeue
2
2
3
3
import (
4
- "math/rand"
5
4
"sync"
6
5
"testing"
7
- "time"
8
6
)
9
7
10
8
// Tests that when inserting items concurrently, each item is inserted once and only once.
@@ -16,7 +14,7 @@ func TestSyncedPushes(t *testing.T) {
16
14
17
15
results := make ([]int , 0 , len (values ))
18
16
19
- dequeue := NewBlockingDequeue [ int ]( )
17
+ dequeue := NewBlockingDequeue ( make ([] int , 5 ) )
20
18
wg := sync.WaitGroup {}
21
19
22
20
// Consume all values that are inserted into the dequeue concurrently
@@ -61,7 +59,7 @@ func TestSyncedPops(t *testing.T) {
61
59
}
62
60
results := make ([]int , 0 , len (values ))
63
61
64
- dequeue := NewBlockingDequeue [ int ]( )
62
+ dequeue := NewBlockingDequeue ( make ([] int , 2000 ) )
65
63
wg := sync.WaitGroup {}
66
64
resultLock := sync.Mutex {}
67
65
@@ -103,7 +101,7 @@ func TestSyncedPops(t *testing.T) {
103
101
}
104
102
}
105
103
106
- // Test that when reading and writing items at the same time, no value is lost.
104
+ // Test that when reading and writing items at the same time, no value is lost. And that a small buffer is sufficient.
107
105
func TestSyncedMixedWrites (t * testing.T ) {
108
106
values := []int {}
109
107
for i := 1 ; i <= 1000 ; i ++ {
@@ -112,7 +110,7 @@ func TestSyncedMixedWrites(t *testing.T) {
112
110
results := make ([]int , 0 , len (values ))
113
111
resultLock := sync.Mutex {}
114
112
115
- dequeue := NewBlockingDequeue [ int ]( )
113
+ dequeue := NewBlockingDequeue ( make ([] int , 10 ) )
116
114
wg := sync.WaitGroup {}
117
115
118
116
// Concurrent producers
@@ -156,82 +154,3 @@ func TestSyncedMixedWrites(t *testing.T) {
156
154
}
157
155
}
158
156
}
159
-
160
- // Test that when capacity is increased, blocking producers are freed up.
161
- func TestCapacityChange (t * testing.T ) {
162
- values := make ([]int , 0 )
163
- for i := 1 ; i <= 1000 ; i ++ {
164
- values = append (values , i )
165
- }
166
-
167
- dequeue := NewBlockingDequeue [int ]()
168
- dequeue .SetCapacity (100 )
169
- wg := sync.WaitGroup {}
170
-
171
- // Insert the values concurrently
172
- for _ , value := range values {
173
- wg .Add (1 )
174
- go func (v int ) {
175
- defer wg .Done ()
176
- dequeue .PushBack (v )
177
- }(value )
178
- }
179
-
180
- // Sleep to allow all the goroutines to execute
181
- time .Sleep (100 * time .Millisecond )
182
-
183
- if dequeue .Size () != dequeue .Capacity () {
184
- t .Errorf ("Expected dequeue to have %d items, got %d" , dequeue .Capacity (), dequeue .Size ())
185
- }
186
-
187
- // Update the capacity
188
- err := dequeue .SetCapacity (len (values ))
189
- if err != nil {
190
- t .Errorf ("Expected no error, got %s" , err )
191
- }
192
-
193
- wg .Wait ()
194
-
195
- // Make sure that all the number from values are in the dequeue
196
- if dequeue .Size () != len (values ) {
197
- t .Errorf ("Expected dequeue to have %d items, got %d" , len (values ), dequeue .Size ())
198
- }
199
- }
200
-
201
- // Test that no race condition happens due to capacity changes
202
- func TestConcurrentCapacityChange (t * testing.T ) {
203
- dequeue := NewBlockingDequeue [int ]()
204
- wg := sync.WaitGroup {}
205
-
206
- // Read capacity concurrently
207
- for i := 0 ; i < 100 ; i ++ {
208
- wg .Add (1 )
209
- go func () {
210
- defer wg .Done ()
211
-
212
- // Do random sleep
213
- time .Sleep (time .Duration (rand .Int ()) % 100 * time .Millisecond )
214
-
215
- // Simulate reading the capacity
216
- dequeue .Capacity ()
217
- }()
218
- }
219
-
220
- // Write capacity concurrently
221
- for i := 0 ; i < 100 ; i ++ {
222
- wg .Add (1 )
223
- go func () {
224
- defer wg .Done ()
225
-
226
- // Do random sleep
227
- time .Sleep (time .Duration (rand .Int ()) % 100 * time .Millisecond )
228
-
229
- // Simulate setting the capacity
230
- dequeue .SetCapacity (rand .Int () % 100 )
231
- }()
232
- }
233
-
234
- wg .Wait ()
235
-
236
- // No checks need to be done, if the test is here without race conditions, it passed
237
- }
0 commit comments