Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
sidekiq-queue-pause (0.1.0)
sidekiq-queue-pause (0.1.2)
sidekiq (>= 6.0, < 7.0)

GEM
Expand All @@ -13,7 +13,7 @@ GEM
backport (1.2.0)
benchmark (0.2.1)
coderay (1.1.3)
connection_pool (2.3.0)
connection_pool (2.4.1)
diff-lcs (1.5.0)
docile (1.4.0)
e2mmap (0.1.0)
Expand Down Expand Up @@ -56,9 +56,11 @@ GEM
lumberjack (1.2.8)
method_source (1.0.0)
mini_mime (1.1.2)
mini_portile2 (2.8.5)
multi_xml (0.6.0)
nenv (0.3.0)
nokogiri (1.14.0-x86_64-linux)
nokogiri (1.14.0)
mini_portile2 (~> 2.8.0)
racc (~> 1.4)
notiffany (0.1.3)
nenv (~> 0.1)
Expand Down Expand Up @@ -89,12 +91,12 @@ GEM
method_source (~> 1.0)
public_suffix (5.0.1)
racc (1.6.2)
rack (2.2.6.2)
rack (2.2.8.1)
rainbow (3.1.1)
rb-fsevent (0.11.2)
rb-inotify (0.10.1)
ffi (~> 1.0)
redis (4.8.0)
redis (4.8.1)
regexp_parser (2.6.2)
reverse_markdown (2.1.1)
nokogiri
Expand Down Expand Up @@ -134,7 +136,7 @@ GEM
addressable (>= 2.3.5)
faraday (>= 0.17.3, < 3)
shellany (0.0.1)
sidekiq (6.5.8)
sidekiq (6.5.12)
connection_pool (>= 2.2.5, < 3)
rack (~> 2.0)
redis (>= 4.5.0, < 5)
Expand Down Expand Up @@ -176,6 +178,7 @@ GEM
webrick (~> 1.7.0)

PLATFORMS
x86_64-darwin-23
x86_64-linux

DEPENDENCIES
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Initializer:

```ruby
Sidekiq.configure_server do |config|
Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch.new(Sidekiq.options)
Sidekiq.options[:fetch] = Sidekiq::QueuePause::PausingFetch.new(Sidekiq)

# Optionally, you may set some unique key identifying the
# Sidekiq process you want to control. This (server) process will
Expand Down
5 changes: 3 additions & 2 deletions lib/sidekiq-queue-pause.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ def retrieve_work
end

def retrieve_work_for_queues(qcmd)
work = Sidekiq.redis { |conn| conn.brpop(*qcmd) }
UnitOfWork.new(*work) if work
#queue, job = redis { |conn| conn.blocking_call(conn.read_timeout + TIMEOUT, "brpop", *qcmd, TIMEOUT) }
queue, job = redis { |conn| conn.brpop(*qcmd) }
UnitOfWork.new(queue, job, config) if queue
end

# Returns the list of unpause queue names.
Expand Down
2 changes: 1 addition & 1 deletion sidekiq-queue-pause.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = "sidekiq-queue-pause"
s.version = "0.1.1"
s.version = "0.1.2"
s.summary = "Pause a Sidekiq queue"
s.description = "Let's you pause/unpause individual sidekiq queues."
s.license = "MIT"
Expand Down
52 changes: 44 additions & 8 deletions spec/sidekiq-queue-pause_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,70 @@

describe Sidekiq::QueuePause do
describe Sidekiq::QueuePause::PausingFetch do
describe "#unpause_queues_cmd" do
let(:queuename) { "some_queue" }
let(:config) { {queues: [queuename], strict: true} }
let(:pausing_fetch) { described_class.new(config) }
let(:queue_name) { "some_queue" }
let(:logger) { double("logger") }
let(:job) { {queue: "some_queue", retry: true} }
let(:queue) { "queue:#{queue_name}" }
let(:queue_and_work) { [queue, job.to_json] }
let(:conn) { double("redis connection", read_timeout: 5, blocking_call: queue_and_work, brpop: queue_and_work) }
let(:config) { OpenStruct.new(queues: [queue_name], strict: true, logger: logger, redis: conn) }

subject(:pausing_fetch) { described_class.new(config) }

describe "instance methods from Component" do
it "responds to `logger`" do
expect(pausing_fetch).to respond_to(:logger)
end

it "responds to `redis`" do
expect(pausing_fetch).to respond_to(:redis)
end

it "config is not a `#{Hash}`" do
expect(pausing_fetch.config).not_to be_a(Hash)
end
end

describe "#unpause_queues_cmd" do
context "with Sidekiq > 6.5.6 the queues list can contain Hashes" do
let(:queue_list) { ["queue:#{queuename}", {timeout: 2}] }
let(:queue_list) { ["queue:#{queue_name}", {timeout: 2}] }

before { allow(pausing_fetch).to receive(:queues_cmd).and_return(queue_list) }

it "does not checked whether the Hash is paused" do
expect(Sidekiq::QueuePause).to receive(:paused?).with(queuename, Sidekiq::QueuePause.process_key).and_return(false)
expect(Sidekiq::QueuePause).to receive(:paused?).with(queue_name, Sidekiq::QueuePause.process_key).and_return(false)

expect(pausing_fetch.unpaused_queues_cmd).to match_array(queue_list)
end
end

context "with Sidekiq < 6.5.6 the queues list can contain an Integer" do
let(:queue_list) { ["queue:#{queuename}", 2] }
let(:queue_list) { ["queue:#{queue_name}", 2] }

before { allow(pausing_fetch).to receive(:queues_cmd).and_return(queue_list) }

it "does not check whether the Integer is paused" do
expect(Sidekiq::QueuePause).to receive(:paused?).with(queuename, Sidekiq::QueuePause.process_key).and_return(false)
expect(Sidekiq::QueuePause).to receive(:paused?).with(queue_name, Sidekiq::QueuePause.process_key).and_return(false)

expect(pausing_fetch.unpaused_queues_cmd).to match_array(queue_list)
end
end
end

describe "reenqueueing a unit of work" do

it "does not raise a `NoMethodError: undefined method `redis' for nil:NilClass` due to lack of `config`" do
expect(config).to receive(:redis).and_yield(conn).twice #one for fetch, one for requeue

expect(described_class::UnitOfWork).to receive(:new).with(queue, job.to_json, config).and_call_original
#expect(conn).to receive(:blocking_call).with(conn.read_timeout + described_class::TIMEOUT, "brpop", queue, described_class::TIMEOUT)
expect(conn).to receive(:brpop).with(queue)
expect(conn).to receive(:rpush).with(queue, job.to_json)

unit_of_work = pausing_fetch.retrieve_work_for_queues(queue)

expect { unit_of_work.requeue }.to_not raise_error
end
end
end
end