Skip to content

Commit 359d5fd

Browse files
authored
fix: interruption handling for worker threads (#445)
1 parent ab153ac commit 359d5fd

File tree

2 files changed

+7
-7
lines changed

2 files changed

+7
-7
lines changed

lib/redis_client/cluster/concurrent_worker/pooled.rb

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ module ConcurrentWorker
1111
# So it consumes memory 1 MB multiplied a number of workers.
1212
class Pooled
1313
IO_ERROR_NEVER = { IOError => :never }.freeze
14-
IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze
15-
private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING
14+
IO_ERROR_IMMEDIATE = { IOError => :immediate }.freeze
15+
private_constant :IO_ERROR_NEVER, :IO_ERROR_IMMEDIATE
1616

1717
def initialize(size:)
1818
raise ArgumentError, "size must be positive: #{size}" unless size.positive?
@@ -73,7 +73,7 @@ def spawn_worker
7373
Thread.new(@q) do |q|
7474
Thread.handle_interrupt(IO_ERROR_NEVER) do
7575
loop do
76-
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) do
76+
Thread.handle_interrupt(IO_ERROR_IMMEDIATE) do
7777
q.pop.exec
7878
end
7979
end

lib/redis_client/cluster/pub_sub.rb

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ class Cluster
88
class PubSub
99
class State
1010
IO_ERROR_NEVER = { IOError => :never }.freeze
11-
IO_ERROR_ON_BLOCKING = { IOError => :on_blocking }.freeze
12-
private_constant :IO_ERROR_NEVER, :IO_ERROR_ON_BLOCKING
11+
IO_ERROR_IMMEDIATE = { IOError => :immediate }.freeze
12+
private_constant :IO_ERROR_NEVER, :IO_ERROR_IMMEDIATE
1313

1414
def initialize(client, queue)
1515
@client = client
@@ -45,12 +45,12 @@ def spawn_worker(client, queue)
4545
Thread.new(client, queue, nil) do |pubsub, q, prev_err|
4646
Thread.handle_interrupt(IO_ERROR_NEVER) do
4747
loop do
48-
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << pubsub.next_event }
48+
Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << pubsub.next_event }
4949
prev_err = nil
5050
rescue StandardError => e
5151
next sleep 0.005 if e.instance_of?(prev_err.class) && e.message == prev_err&.message
5252

53-
Thread.handle_interrupt(IO_ERROR_ON_BLOCKING) { q << e }
53+
Thread.handle_interrupt(IO_ERROR_IMMEDIATE) { q << e }
5454
prev_err = e
5555
end
5656
end

0 commit comments

Comments
 (0)