Skip to content

Commit

Permalink
make token-aware policy use local replicas only
Browse files Browse the repository at this point in the history
  • Loading branch information
Bulat Shakirzyanov committed Oct 1, 2014
1 parent 1a703a0 commit 50bd0d5
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 37 deletions.
35 changes: 18 additions & 17 deletions lib/cassandra/load_balancing/policies/token_aware.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,22 @@ def initialize(hosts, policy, keyspace, statement, options)
end

def has_next?
return true unless @hosts.empty?
until @hosts.empty?
host = @hosts.shift

if @policy.distance(host) == :local
@seen[host] = true
@next = host
break
end
end

return true if @next

@plan ||= @policy.plan(@keyspace, @statement, @options)

while plan.has_next?
host = plan.next
while @plan.has_next?
host = @plan.next

unless @seen[host]
@next = host
Expand All @@ -47,20 +59,9 @@ def has_next?
end

def next
unless @hosts.empty?
host = @hosts.shift
@seen[host] = true

return host
end

@next
end

private

def plan
@plan ||= @policy.plan(@keyspace, @statement, @options)
host = @next
@next = nil
host
end
end

Expand Down
73 changes: 53 additions & 20 deletions spec/cassandra/load_balancing/policies/token_aware_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ module Cassandra
module LoadBalancing
module Policies
describe(TokenAware) do
let(:policy) { double('load balancing policy').as_null_object }
let(:policy) { double('load balancing policy') }
let(:cluster) { double('cassandra cluster') }

subject { TokenAware.new(policy) }

before do
allow(policy).to receive(:respond_to?) { true }
end

describe('#setup') do
it 'sets up wrapped policy' do
expect(policy).to receive(:setup).once.with(cluster)
Expand Down Expand Up @@ -67,6 +71,7 @@ module Policies

context('when set up') do
before do
allow(policy).to receive(:setup)
subject.setup(cluster)
expect(cluster).to receive(:find_replicas).once.with(keyspace, statement).and_return(replicas)
end
Expand All @@ -91,21 +96,57 @@ module Policies
}
let(:plan) { subject.plan(keyspace, statement, options) }

it 'prioritizes found replicas' do
expect(plan.next).to eq(replicas[0])
expect(plan.next).to eq(replicas[1])
expect(plan.next).to eq(replicas[2])
context('and all replicas are local') do
before do
allow(policy).to receive(:distance) { :local }
end

it 'prioritizes found replicas' do
expect(plan.has_next?).to eq(true)
expect(plan.next).to eq(replicas[0])
expect(plan.has_next?).to eq(true)
expect(plan.next).to eq(replicas[1])
expect(plan.has_next?).to eq(true)
expect(plan.next).to eq(replicas[2])
end

context('and all replicas failed') do
before do
replicas.size.times do
expect(plan.has_next?).to eq(true)
plan.next
end

allow(child_plan).to receive(:next).and_return(next_host)
allow(child_plan).to receive(:has_next?).and_return(true, false)
allow(policy).to receive(:plan).and_return(child_plan)
end

let(:next_host) { double('next host from the wrapped policy plan') }
let(:child_plan) { double('wrapped policy plan') }

it 'delegates to the wrapped policy' do
expect(plan.has_next?).to be_truthy
expect(plan.next).to eq(next_host)
end

context('and replica host returned from the child plan') do
let(:next_host) { replicas.sample }

it 'is ignored' do
expect(plan.has_next?).to be_falsey
end
end
end
end

context('and all replicas failed') do
context('and all replicas are not local') do
before do
replicas.size.times do
plan.next
end
allow(policy).to receive(:distance) { :remote }
allow(policy).to receive(:plan) { child_plan }

allow(child_plan).to receive(:next).and_return(next_host)
allow(child_plan).to receive(:has_next?).and_return(true, false)
allow(policy).to receive(:plan).and_return(child_plan)
allow(child_plan).to receive(:next) { next_host }
allow(child_plan).to receive(:has_next?) { true }
end

let(:next_host) { double('next host from the wrapped policy plan') }
Expand All @@ -115,14 +156,6 @@ module Policies
expect(plan.has_next?).to be_truthy
expect(plan.next).to eq(next_host)
end

context('and replica host returned from the child plan') do
let(:next_host) { replicas.sample }

it 'is ignored' do
expect(plan.has_next?).to be_falsey
end
end
end
end
end
Expand Down

0 comments on commit 50bd0d5

Please sign in to comment.