Skip to content

Commit

Permalink
Use SizedQueue to limit the number of requests for asynchronous proce…
Browse files Browse the repository at this point in the history
…ssing
  • Loading branch information
Watson1978 committed Mar 7, 2024
1 parent 7c7ebf2 commit 279a26e
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 80 deletions.
61 changes: 23 additions & 38 deletions example/benchmark_insert.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ def run_execute(x)
end

def run_execute_async(x)
futures = []

x.report('cassandra-driver:execute_async') do
future = @session.execute_async(
statement,
Expand All @@ -78,16 +76,7 @@ def run_execute_async(x)
)
future.on_success do |rows|
end

futures << future

if futures.size > 100
tmp = futures.slice!(0..10)
Cassandra::Future.all(*tmp).get
end
end

Cassandra::Future.all(*futures).get
end

def statement
Expand Down Expand Up @@ -116,8 +105,6 @@ def run_execute(x)
end

def run_execute_async(x)
futures = []

x.report('ilios:execute_async') do
statement.bind(
{
Expand All @@ -129,13 +116,7 @@ def run_execute_async(x)
future = Ilios::Cassandra.session.execute_async(statement)
future.on_success do |results|
end

futures << future

futures.slice!(0..10).each(&:await) if futures.size > 100
end

futures.each(&:await)
end

def statement
Expand All @@ -149,22 +130,22 @@ def statement
end
end

Benchmark.ips do |x|
x.warmup = 0
x.time = 10
BenchmarkCassandra.new.run_execute(x)
BenchmarkCassandra.new.run_execute_async(x)
end

GC.start
sleep 20
case ENV['RUN']
when 'cassandra'
Benchmark.ips do |x|
x.warmup = 0
x.time = 20
BenchmarkCassandra.new.run_execute(x)
BenchmarkCassandra.new.run_execute_async(x)
end

puts ''
Benchmark.ips do |x|
x.warmup = 0
x.time = 10
BenchmarkIlios.new.run_execute(x)
BenchmarkIlios.new.run_execute_async(x)
when 'ilios', nil
Benchmark.ips do |x|
x.warmup = 0
x.time = 20
BenchmarkIlios.new.run_execute(x)
BenchmarkIlios.new.run_execute_async(x)
end
end

=begin
Expand All @@ -176,13 +157,17 @@ def statement
- Ruby: ruby 3.3.0
## Results
### cassandra-driver
$ RUN=cassandra ruby benchmark_insert.rb
Calculating -------------------------------------
cassandra-driver:execute
4.121k (±19.4%) i/s - 39.035k in 9.979254s
4.022k (±19.1%) i/s - 76.144k in 19.956985s
cassandra-driver:execute_async
18.461k (±20.5%) i/s - 132.226k in 9.951913s
45.162k (±20.6%) i/s - 514.720k in 19.872873s
### ilios
$ RUN=ilios ruby benchmark_insert.rb
Calculating -------------------------------------
ilios:execute 4.880k23.8%) i/s - 45.928k in 9.978697s
ilios:execute_async 348.102k53.6%) i/s - 966.952k in 9.745057s
ilios:execute 4.857k24.0%) i/s - 91.235k in 19.960159s
ilios:execute_async 351.393k55.5%) i/s - 2.029M in 19.462660s
=end
61 changes: 23 additions & 38 deletions example/benchmark_select.rb
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,11 @@ def run_execute(x)
end

def run_execute_async(x)
futures = []

x.report('cassandra-driver:execute_async') do
future = @session.execute_async(statement)
future.on_success do |rows|
end

futures << future

if futures.size > 100
tmp = futures.slice!(0..10)
Cassandra::Future.all(*tmp).get
end
end

Cassandra::Future.all(*futures).get
end

def statement
Expand All @@ -107,21 +96,13 @@ def run_execute(x)
end

def run_execute_async(x)
futures = []

x.report('ilios:execute_async') do
future = Ilios::Cassandra.session.execute_async(statement)
future.on_success do |results|
results.each do |row|
end
end

futures << future

futures.slice!(0..10).each(&:await) if futures.size > 100
end

futures.each(&:await)
end

def statement
Expand All @@ -131,22 +112,22 @@ def statement
end
end

Benchmark.ips do |x|
x.warmup = 0
x.time = 10
BenchmarkCassandra.new.run_execute(x)
BenchmarkCassandra.new.run_execute_async(x)
end

GC.start
sleep 20
case ENV['RUN']
when 'cassandra'
Benchmark.ips do |x|
x.warmup = 0
x.time = 20
BenchmarkCassandra.new.run_execute(x)
BenchmarkCassandra.new.run_execute_async(x)
end

puts ''
Benchmark.ips do |x|
x.warmup = 0
x.time = 10
BenchmarkIlios.new.run_execute(x)
BenchmarkIlios.new.run_execute_async(x)
when 'ilios', nil
Benchmark.ips do |x|
x.warmup = 0
x.time = 20
BenchmarkIlios.new.run_execute(x)
BenchmarkIlios.new.run_execute_async(x)
end
end

=begin
Expand All @@ -158,13 +139,17 @@ def statement
- Ruby: ruby 3.3.0
## Results
### cassandra-driver
$ RUN=cassandra ruby benchmark_select.rb
Calculating -------------------------------------
cassandra-driver:execute
139.5717.2%) i/s - 1.387k in 9.998314s
140.8979.2%) i/s - 2.794k in 19.995255s
cassandra-driver:execute_async
11.554k86.6%) i/s - 2.467k in 10.001188s
95.534k17.8%) i/s - 1.003M in 20.788894s
### ilios
RUN=ilios ruby benchmark_select.rb
Calculating -------------------------------------
ilios:execute 327.71516.5%) i/s - 3.187k in 9.994403s
ilios:execute_async 454.625k78.4%) i/s - 10.161k in 9.996146s
ilios:execute 327.60315.6%) i/s - 6.400k in 19.991734s
ilios:execute_async 421.849k80.8%) i/s - 19.441k in 19.987868s
=end
3 changes: 2 additions & 1 deletion ext/ilios/future.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "ilios.h"

#define THREAD_MAX 5
#define QUEUE_MAX 100

typedef struct
{
Expand Down Expand Up @@ -31,7 +32,7 @@ const rb_data_type_t cassandra_future_data_type = {

static void future_thread_pool_init(future_thread_pool *pool)
{
pool->queue = rb_funcall(cQueue, id_new, 0);
pool->queue = rb_funcall(cSizedQueue, id_new, 1, INT2NUM(QUEUE_MAX));
rb_gc_register_mark_object(pool->queue);
}

Expand Down
4 changes: 2 additions & 2 deletions ext/ilios/ilios.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ VALUE eConnectError;
VALUE eExecutionError;
VALUE eStatementError;

VALUE cQueue;
VALUE cSizedQueue;

VALUE id_to_time;
VALUE id_new;
Expand Down Expand Up @@ -85,7 +85,7 @@ void Init_ilios(void)
eExecutionError = rb_define_class_under(mCassandra, "ExecutionError", rb_eStandardError);
eStatementError = rb_define_class_under(mCassandra, "StatementError", rb_eStandardError);

cQueue = rb_const_get(rb_cThread, rb_intern("Queue"));
cSizedQueue = rb_const_get(rb_cThread, rb_intern("SizedQueue"));

id_to_time = rb_intern("to_time");
id_new = rb_intern("new");
Expand Down
2 changes: 1 addition & 1 deletion ext/ilios/ilios.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ extern VALUE eConnectError;
extern VALUE eExecutionError;
extern VALUE eStatementError;

extern VALUE cQueue;
extern VALUE cSizedQueue;

extern VALUE id_to_time;
extern VALUE id_new;
Expand Down

0 comments on commit 279a26e

Please sign in to comment.