Skip to content

Commit 24e1d27

Browse files
authored
fix: interruption handling in worker thread (#442)
1 parent 17a2e2d commit 24e1d27

File tree

1 file changed

+13
-1
lines changed
  • lib/redis_client/cluster/concurrent_worker

1 file changed

+13
-1
lines changed

lib/redis_client/cluster/concurrent_worker/pooled.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ module ConcurrentWorker
1010
# It is a fixed size but we can modify the size with some environment variables.
1111
# So it consumes memory 1 MB multiplied a number of workers.
1212
class Pooled
13+
IO_ERROR_NEVER = { IOError => :never }.freeze
14+
IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze
15+
private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING
16+
1317
def initialize(size:)
1418
raise ArgumentError, "size must be positive: #{size}" unless size.positive?
1519

@@ -65,7 +69,15 @@ def ensure_workers
6569

6670
def spawn_worker
6771
Thread.new(@q) do |q|
68-
loop { q.pop.exec }
72+
Thread.handle_interrupt(IO_ERROR_NEVER) do
73+
loop do
74+
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) do
75+
q.pop.exec
76+
end
77+
end
78+
end
79+
rescue IOError
80+
# stream closed in another thread
6981
end
7082
end
7183
end

0 commit comments

Comments
 (0)