Skip to content

Commit

Permalink
Use rb_mutex_synchronize
Browse files Browse the repository at this point in the history
  • Loading branch information
Watson1978 committed Feb 10, 2024
1 parent 8f88171 commit 6bf7a32
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions ext/ilios/future.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,31 +195,12 @@ VALUE future_create(CassFuture *future, VALUE session, future_kind kind)
return cassandra_future_obj;
}

/**
* Run block when future resolves to a value.
*
* @yieldparam value [Cassandra::Statement, Cassandra::Result] A value.
* Yields +Cassandra::Statement+ object when future was created by +Cassandra::Session#prepare_async+.
* Yields +Cassandra::Result+ object when future was created by +Cassandra::Session#execute_async+.
* @return [Cassandra::Future] self.
* @raise [Cassandra::ExecutionError] If this method will be called twice.
* @raise [ArgumentError] If no block was given.
*/
static VALUE future_on_success(VALUE self)
static VALUE future_on_success_synchronize(VALUE future)
{
CassandraFuture *cassandra_future;
bool wakeup_thread = false;

GET_FUTURE(self, cassandra_future);

if (cassandra_future->on_success_block) {
rb_raise(eExecutionError, "It should not call twice");
}
if (!rb_block_given_p()) {
rb_raise(rb_eArgError, "no block given");
}

rb_mutex_lock(cassandra_future->proc_mutex);
GET_FUTURE(future, cassandra_future);

if (!cassandra_future->on_failure_block) {
// Invoke the callback with thread pool only once
Expand All @@ -229,44 +210,52 @@ static VALUE future_on_success(VALUE self)
cassandra_future->on_success_block = rb_block_proc();

if (cass_future_ready(cassandra_future->future)) {
rb_mutex_unlock(cassandra_future->proc_mutex);
uv_sem_post(&cassandra_future->sem);
if (cass_future_error_code(cassandra_future->future) == CASS_OK) {
future_result_success_yield(cassandra_future);
}
return self;
return future;
}

if (wakeup_thread) {
future_queue_push(future_thread_pool_get(cassandra_future), self);
future_queue_push(future_thread_pool_get(cassandra_future), future);
}
rb_mutex_unlock(cassandra_future->proc_mutex);

return self;
return future;
}

/**
* Run block when future resolves to error.
* Run block when future resolves to a value.
*
* @yieldparam value [Cassandra::Statement, Cassandra::Result] A value.
* Yields +Cassandra::Statement+ object when future was created by +Cassandra::Session#prepare_async+.
* Yields +Cassandra::Result+ object when future was created by +Cassandra::Session#execute_async+.
* @return [Cassandra::Future] self.
* @raise [Cassandra::ExecutionError] If this method will be called twice.
* @raise [ArgumentError] If no block was given.
*/
static VALUE future_on_failure(VALUE self)
static VALUE future_on_success(VALUE self)
{
CassandraFuture *cassandra_future;
bool wakeup_thread = false;

GET_FUTURE(self, cassandra_future);

if (cassandra_future->on_failure_block) {
if (cassandra_future->on_success_block) {
rb_raise(eExecutionError, "It should not call twice");
}
if (!rb_block_given_p()) {
rb_raise(rb_eArgError, "no block given");
}

rb_mutex_lock(cassandra_future->proc_mutex);
return rb_mutex_synchronize(cassandra_future->proc_mutex, future_on_success_synchronize, self);
}

static VALUE future_on_failure_synchronize(VALUE future)
{
CassandraFuture *cassandra_future;
bool wakeup_thread = false;

GET_FUTURE(future, cassandra_future);

if (!cassandra_future->on_success_block) {
// Invoke the callback with thread pool only once
Expand All @@ -276,20 +265,41 @@ static VALUE future_on_failure(VALUE self)
cassandra_future->on_failure_block = rb_block_proc();

if (cass_future_ready(cassandra_future->future)) {
rb_mutex_unlock(cassandra_future->proc_mutex);
uv_sem_post(&cassandra_future->sem);
if (cass_future_error_code(cassandra_future->future) != CASS_OK) {
future_result_failure_yield(cassandra_future);
}
return self;
return future;
}

if (wakeup_thread) {
future_queue_push(future_thread_pool_get(cassandra_future), self);
future_queue_push(future_thread_pool_get(cassandra_future), future);
}
rb_mutex_unlock(cassandra_future->proc_mutex);

return self;
return future;
}

/**
* Run block when future resolves to error.
*
* @return [Cassandra::Future] self.
* @raise [Cassandra::ExecutionError] If this method will be called twice.
* @raise [ArgumentError] If no block was given.
*/
static VALUE future_on_failure(VALUE self)
{
CassandraFuture *cassandra_future;

GET_FUTURE(self, cassandra_future);

if (cassandra_future->on_failure_block) {
rb_raise(eExecutionError, "It should not call twice");
}
if (!rb_block_given_p()) {
rb_raise(rb_eArgError, "no block given");
}

return rb_mutex_synchronize(cassandra_future->proc_mutex, future_on_failure_synchronize, self);
}

/**
Expand Down

0 comments on commit 6bf7a32

Please sign in to comment.