Skip to content

Commit 50bd0d5

Browse files
author
Bulat Shakirzyanov
committed
make token-aware policy use local replicas only
1 parent 1a703a0 commit 50bd0d5

File tree

2 files changed

+71
-37
lines changed

2 files changed

+71
-37
lines changed

lib/cassandra/load_balancing/policies/token_aware.rb

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,22 @@ def initialize(hosts, policy, keyspace, statement, options)
3232
end
3333

3434
def has_next?
35-
return true unless @hosts.empty?
35+
until @hosts.empty?
36+
host = @hosts.shift
37+
38+
if @policy.distance(host) == :local
39+
@seen[host] = true
40+
@next = host
41+
break
42+
end
43+
end
44+
45+
return true if @next
46+
47+
@plan ||= @policy.plan(@keyspace, @statement, @options)
3648

37-
while plan.has_next?
38-
host = plan.next
49+
while @plan.has_next?
50+
host = @plan.next
3951

4052
unless @seen[host]
4153
@next = host
@@ -47,20 +59,9 @@ def has_next?
4759
end
4860

4961
def next
50-
unless @hosts.empty?
51-
host = @hosts.shift
52-
@seen[host] = true
53-
54-
return host
55-
end
56-
57-
@next
58-
end
59-
60-
private
61-
62-
def plan
63-
@plan ||= @policy.plan(@keyspace, @statement, @options)
62+
host = @next
63+
@next = nil
64+
host
6465
end
6566
end
6667

spec/cassandra/load_balancing/policies/token_aware_spec.rb

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,15 @@ module Cassandra
2222
module LoadBalancing
2323
module Policies
2424
describe(TokenAware) do
25-
let(:policy) { double('load balancing policy').as_null_object }
25+
let(:policy) { double('load balancing policy') }
2626
let(:cluster) { double('cassandra cluster') }
2727

2828
subject { TokenAware.new(policy) }
2929

30+
before do
31+
allow(policy).to receive(:respond_to?) { true }
32+
end
33+
3034
describe('#setup') do
3135
it 'sets up wrapped policy' do
3236
expect(policy).to receive(:setup).once.with(cluster)
@@ -67,6 +71,7 @@ module Policies
6771

6872
context('when set up') do
6973
before do
74+
allow(policy).to receive(:setup)
7075
subject.setup(cluster)
7176
expect(cluster).to receive(:find_replicas).once.with(keyspace, statement).and_return(replicas)
7277
end
@@ -91,21 +96,57 @@ module Policies
9196
}
9297
let(:plan) { subject.plan(keyspace, statement, options) }
9398

94-
it 'prioritizes found replicas' do
95-
expect(plan.next).to eq(replicas[0])
96-
expect(plan.next).to eq(replicas[1])
97-
expect(plan.next).to eq(replicas[2])
99+
context('and all replicas are local') do
100+
before do
101+
allow(policy).to receive(:distance) { :local }
102+
end
103+
104+
it 'prioritizes found replicas' do
105+
expect(plan.has_next?).to eq(true)
106+
expect(plan.next).to eq(replicas[0])
107+
expect(plan.has_next?).to eq(true)
108+
expect(plan.next).to eq(replicas[1])
109+
expect(plan.has_next?).to eq(true)
110+
expect(plan.next).to eq(replicas[2])
111+
end
112+
113+
context('and all replicas failed') do
114+
before do
115+
replicas.size.times do
116+
expect(plan.has_next?).to eq(true)
117+
plan.next
118+
end
119+
120+
allow(child_plan).to receive(:next).and_return(next_host)
121+
allow(child_plan).to receive(:has_next?).and_return(true, false)
122+
allow(policy).to receive(:plan).and_return(child_plan)
123+
end
124+
125+
let(:next_host) { double('next host from the wrapped policy plan') }
126+
let(:child_plan) { double('wrapped policy plan') }
127+
128+
it 'delegates to the wrapped policy' do
129+
expect(plan.has_next?).to be_truthy
130+
expect(plan.next).to eq(next_host)
131+
end
132+
133+
context('and replica host returned from the child plan') do
134+
let(:next_host) { replicas.sample }
135+
136+
it 'is ignored' do
137+
expect(plan.has_next?).to be_falsey
138+
end
139+
end
140+
end
98141
end
99142

100-
context('and all replicas failed') do
143+
context('and all replicas are not local') do
101144
before do
102-
replicas.size.times do
103-
plan.next
104-
end
145+
allow(policy).to receive(:distance) { :remote }
146+
allow(policy).to receive(:plan) { child_plan }
105147

106-
allow(child_plan).to receive(:next).and_return(next_host)
107-
allow(child_plan).to receive(:has_next?).and_return(true, false)
108-
allow(policy).to receive(:plan).and_return(child_plan)
148+
allow(child_plan).to receive(:next) { next_host }
149+
allow(child_plan).to receive(:has_next?) { true }
109150
end
110151

111152
let(:next_host) { double('next host from the wrapped policy plan') }
@@ -115,14 +156,6 @@ module Policies
115156
expect(plan.has_next?).to be_truthy
116157
expect(plan.next).to eq(next_host)
117158
end
118-
119-
context('and replica host returned from the child plan') do
120-
let(:next_host) { replicas.sample }
121-
122-
it 'is ignored' do
123-
expect(plan.has_next?).to be_falsey
124-
end
125-
end
126159
end
127160
end
128161
end

0 commit comments

Comments
 (0)