From 0fccba8286bf0124185cff7d1e6fa0272ac2c315 Mon Sep 17 00:00:00 2001 From: Michael Nitschinger Date: Wed, 9 Dec 2020 18:47:15 +0100 Subject: [PATCH] Raise libcouchbase errors on encode This partially addresses #95 in that it covers the encode path. --- couchbase/src/io/lcb/encode.rs | 754 ++++++++++++++++++++++--------- couchbase/src/io/lcb/instance.rs | 6 +- couchbase/src/io/lcb/mod.rs | 30 +- 3 files changed, 563 insertions(+), 227 deletions(-) diff --git a/couchbase/src/io/lcb/encode.rs b/couchbase/src/io/lcb/encode.rs index eff25dbe..d36af02a 100644 --- a/couchbase/src/io/lcb/encode.rs +++ b/couchbase/src/io/lcb/encode.rs @@ -1,16 +1,23 @@ -use crate::api::options::StoreSemantics; use crate::api::{LookupInSpec, MutateInSpec}; use crate::io::lcb::callbacks::{analytics_callback, query_callback, search_callback}; use crate::io::lcb::{AnalyticsCookie, HttpCookie, QueryCookie, SearchCookie}; use crate::io::request::*; -use log::warn; +use crate::{api::options::StoreSemantics, CouchbaseResult, ErrorContext}; +use futures::channel::oneshot::Sender; +use log::{debug, warn}; +use serde_json::Value; use couchbase_sys::*; -use std::ffi::CString; +use std::ffi::{CStr, CString}; use std::os::raw::c_void; use std::ptr; use uuid::Uuid; +use super::callbacks::couchbase_error_from_lcb_status; + +#[derive(Debug)] +pub struct EncodeFailure(lcb_STATUS); + /// Helper method to turn a string into a tuple of CString and its length. #[inline] pub fn into_cstring>>(input: T) -> (usize, CString) { @@ -21,12 +28,127 @@ pub fn into_cstring>>(input: T) -> (usize, CString) { ) } +/// Verifies the libcouchbase return status code and fails the original request. +fn verify( + status: lcb_STATUS, + sender: *mut Sender>, +) -> Result<(), EncodeFailure> { + if status != lcb_STATUS_LCB_SUCCESS { + if sender.is_null() { + warn!("Failed to notify request of encode failure because the pointer is null. This is a bug!"); + return Ok(()); + } + let sender = unsafe { Box::from_raw(sender) }; + let mut ctx = ErrorContext::default(); + if let Ok(msg) = unsafe { CStr::from_ptr(lcb_strerror_short(status)) }.to_str() { + ctx.insert("msg", Value::String(msg.to_string())); + } + let err = couchbase_error_from_lcb_status(status, ctx); + if let Err(_) = sender.send(Err(err)) { + debug!("Failed to notify request of encode failure, because the listener has been already dropped."); + } + return Err(EncodeFailure(status)); + } + Ok(()) +} + +fn verify_query(status: lcb_STATUS, sender: *mut QueryCookie) -> Result<(), EncodeFailure> { + if status != lcb_STATUS_LCB_SUCCESS { + if sender.is_null() { + warn!("Failed to notify request of encode failure because the pointer is null. This is a bug!"); + return Ok(()); + } + let mut sender = unsafe { Box::from_raw(sender) }; + let mut ctx = ErrorContext::default(); + if let Ok(msg) = unsafe { CStr::from_ptr(lcb_strerror_short(status)) }.to_str() { + ctx.insert("msg", Value::String(msg.to_string())); + } + let err = couchbase_error_from_lcb_status(status, ctx); + if let Err(_) = sender.sender.take().unwrap().send(Err(err)) { + debug!("Failed to notify request of encode failure, because the listener has been already dropped."); + } + // Close the rest that needs to be closed + sender.rows_sender.close_channel(); + return Err(EncodeFailure(status)); + } + Ok(()) +} + +fn verify_analytics(status: lcb_STATUS, sender: *mut AnalyticsCookie) -> Result<(), EncodeFailure> { + if status != lcb_STATUS_LCB_SUCCESS { + if sender.is_null() { + warn!("Failed to notify request of encode failure because the pointer is null. This is a bug!"); + return Ok(()); + } + let mut sender = unsafe { Box::from_raw(sender) }; + let mut ctx = ErrorContext::default(); + if let Ok(msg) = unsafe { CStr::from_ptr(lcb_strerror_short(status)) }.to_str() { + ctx.insert("msg", Value::String(msg.to_string())); + } + let err = couchbase_error_from_lcb_status(status, ctx); + if let Err(_) = sender.sender.take().unwrap().send(Err(err)) { + debug!("Failed to notify request of encode failure, because the listener has been already dropped."); + } + // Close the rest that needs to be closed + sender.rows_sender.close_channel(); + return Err(EncodeFailure(status)); + } + Ok(()) +} + +fn verify_search(status: lcb_STATUS, sender: *mut SearchCookie) -> Result<(), EncodeFailure> { + if status != lcb_STATUS_LCB_SUCCESS { + if sender.is_null() { + warn!("Failed to notify request of encode failure because the pointer is null. This is a bug!"); + return Ok(()); + } + let mut sender = unsafe { Box::from_raw(sender) }; + let mut ctx = ErrorContext::default(); + if let Ok(msg) = unsafe { CStr::from_ptr(lcb_strerror_short(status)) }.to_str() { + ctx.insert("msg", Value::String(msg.to_string())); + } + let err = couchbase_error_from_lcb_status(status, ctx); + if let Err(_) = sender.sender.take().unwrap().send(Err(err)) { + debug!("Failed to notify request of encode failure, because the listener has been already dropped."); + } + // Close the rest that needs to be closed + sender.rows_sender.close_channel(); + return Err(EncodeFailure(status)); + } + Ok(()) +} + +fn verify_http(status: lcb_STATUS, sender: *mut HttpCookie) -> Result<(), EncodeFailure> { + if status != lcb_STATUS_LCB_SUCCESS { + if sender.is_null() { + warn!("Failed to notify request of encode failure because the pointer is null. This is a bug!"); + return Ok(()); + } + let sender = unsafe { Box::from_raw(sender) }; + let mut ctx = ErrorContext::default(); + if let Ok(msg) = unsafe { CStr::from_ptr(lcb_strerror_short(status)) }.to_str() { + ctx.insert("msg", Value::String(msg.to_string())); + } + let err = couchbase_error_from_lcb_status(status, ctx); + match *sender { + HttpCookie::GenericManagementRequest { sender } => { + if let Err(_) = sender.send(Err(err)) { + debug!("Failed to notify request of encode failure, because the listener has been already dropped."); + } + } + } + + return Err(EncodeFailure(status)); + } + Ok(()) +} + /// Encodes a `GetRequest` into its libcouchbase `lcb_CMDGET` representation. /// /// Note that this method also handles get_and_lock and get_and_touch by looking /// at the ty (type) enum of the get request. If one of them is used their inner /// duration is passed down to libcouchbase either as a locktime or the expiry. -pub fn encode_get(instance: *mut lcb_INSTANCE, request: GetRequest) { +pub fn encode_get(instance: *mut lcb_INSTANCE, request: GetRequest) -> Result<(), EncodeFailure> { let (id_len, id) = into_cstring(request.id); let cookie = Box::into_raw(Box::new(request.sender)); let (scope_len, scope) = into_cstring(request.scope); @@ -34,46 +156,67 @@ pub fn encode_get(instance: *mut lcb_INSTANCE, request: GetRequest) { let mut command: *mut lcb_CMDGET = ptr::null_mut(); unsafe { - lcb_cmdget_create(&mut command); - lcb_cmdget_key(command, id.as_ptr(), id_len); - lcb_cmdget_collection( - command, - scope.as_ptr(), - scope_len, - collection.as_ptr(), - collection_len, - ); + verify(lcb_cmdget_create(&mut command), cookie)?; + verify(lcb_cmdget_key(command, id.as_ptr(), id_len), cookie)?; + verify( + lcb_cmdget_collection( + command, + scope.as_ptr(), + scope_len, + collection.as_ptr(), + collection_len, + ), + cookie, + )?; match request.ty { GetRequestType::Get { options } => { if let Some(timeout) = options.timeout { - lcb_cmdget_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdget_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } } GetRequestType::GetAndLock { lock_time, options } => { - lcb_cmdget_locktime(command, lock_time.as_micros() as u32); + verify( + lcb_cmdget_locktime(command, lock_time.as_micros() as u32), + cookie, + )?; if let Some(timeout) = options.timeout { - lcb_cmdget_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdget_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } } GetRequestType::GetAndTouch { expiry, options } => { - lcb_cmdget_expiry(command, expiry.as_micros() as u32); + verify( + lcb_cmdget_expiry(command, expiry.as_micros() as u32), + cookie, + )?; if let Some(timeout) = options.timeout { - lcb_cmdget_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdget_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } } }; - warn!("Sending now"); - lcb_get(instance, cookie as *mut c_void, command); - lcb_cmdget_destroy(command); + verify(lcb_get(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_cmdget_destroy(command), cookie)?; } + Ok(()) } /// Encodes a `ExistsRequest` into its libcouchbase `lcb_CMDEXISTS` representation. -pub fn encode_exists(instance: *mut lcb_INSTANCE, request: ExistsRequest) { +pub fn encode_exists( + instance: *mut lcb_INSTANCE, + request: ExistsRequest, +) -> Result<(), EncodeFailure> { let (id_len, id) = into_cstring(request.id); let cookie = Box::into_raw(Box::new(request.sender)); let (scope_len, scope) = into_cstring(request.scope); @@ -81,30 +224,41 @@ pub fn encode_exists(instance: *mut lcb_INSTANCE, request: ExistsRequest) { let mut command: *mut lcb_CMDEXISTS = ptr::null_mut(); unsafe { - lcb_cmdexists_create(&mut command); - lcb_cmdexists_key(command, id.as_ptr(), id_len); - lcb_cmdexists_collection( - command, - scope.as_ptr(), - scope_len, - collection.as_ptr(), - collection_len, - ); + verify(lcb_cmdexists_create(&mut command), cookie)?; + verify(lcb_cmdexists_key(command, id.as_ptr(), id_len), cookie)?; + verify( + lcb_cmdexists_collection( + command, + scope.as_ptr(), + scope_len, + collection.as_ptr(), + collection_len, + ), + cookie, + )?; if let Some(timeout) = request.options.timeout { - lcb_cmdexists_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdexists_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } - lcb_exists(instance, cookie as *mut c_void, command); - lcb_cmdexists_destroy(command); + verify(lcb_exists(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_cmdexists_destroy(command), cookie)?; } + + Ok(()) } /// Encodes a `MutateRequest` into its libcouchbase `lcb_CMDSTORE` representation. /// /// This method covers insert, upsert and replace since they are very similar and /// only differ on certain properties. -pub fn encode_mutate(instance: *mut lcb_INSTANCE, request: MutateRequest) { +pub fn encode_mutate( + instance: *mut lcb_INSTANCE, + request: MutateRequest, +) -> Result<(), EncodeFailure> { let (id_len, id) = into_cstring(request.id); let (value_len, value) = into_cstring(request.content); let cookie = Box::into_raw(Box::new(request.sender)); @@ -115,53 +269,91 @@ pub fn encode_mutate(instance: *mut lcb_INSTANCE, request: MutateRequest) { unsafe { match request.ty { MutateRequestType::Upsert { options } => { - lcb_cmdstore_create(&mut command, lcb_STORE_OPERATION_LCB_STORE_UPSERT); + verify( + lcb_cmdstore_create(&mut command, lcb_STORE_OPERATION_LCB_STORE_UPSERT), + cookie, + )?; if let Some(timeout) = options.timeout { - lcb_cmdstore_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdstore_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } if let Some(expiry) = options.expiry { - lcb_cmdstore_expiry(command, expiry.as_secs() as u32); + verify( + lcb_cmdstore_expiry(command, expiry.as_secs() as u32), + cookie, + )?; } } MutateRequestType::Insert { options } => { - lcb_cmdstore_create(&mut command, lcb_STORE_OPERATION_LCB_STORE_INSERT); + verify( + lcb_cmdstore_create(&mut command, lcb_STORE_OPERATION_LCB_STORE_INSERT), + cookie, + )?; if let Some(timeout) = options.timeout { - lcb_cmdstore_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdstore_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } if let Some(expiry) = options.expiry { - lcb_cmdstore_expiry(command, expiry.as_secs() as u32); + verify( + lcb_cmdstore_expiry(command, expiry.as_secs() as u32), + cookie, + )?; } } MutateRequestType::Replace { options } => { - lcb_cmdstore_create(&mut command, lcb_STORE_OPERATION_LCB_STORE_REPLACE); + verify( + lcb_cmdstore_create(&mut command, lcb_STORE_OPERATION_LCB_STORE_REPLACE), + cookie, + )?; if let Some(cas) = options.cas { - lcb_cmdstore_cas(command, cas); + verify(lcb_cmdstore_cas(command, cas), cookie)?; } if let Some(timeout) = options.timeout { - lcb_cmdstore_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdstore_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } if let Some(expiry) = options.expiry { - lcb_cmdstore_expiry(command, expiry.as_secs() as u32); + verify( + lcb_cmdstore_expiry(command, expiry.as_secs() as u32), + cookie, + )?; } } } - lcb_cmdstore_key(command, id.as_ptr(), id_len); - lcb_cmdstore_value(command, value.as_ptr(), value_len); - lcb_cmdstore_collection( - command, - scope.as_ptr(), - scope_len, - collection.as_ptr(), - collection_len, - ); - - lcb_store(instance, cookie as *mut c_void, command); - lcb_cmdstore_destroy(command); + verify(lcb_cmdstore_key(command, id.as_ptr(), id_len), cookie)?; + verify( + lcb_cmdstore_value(command, value.as_ptr(), value_len), + cookie, + )?; + verify( + lcb_cmdstore_collection( + command, + scope.as_ptr(), + scope_len, + collection.as_ptr(), + collection_len, + ), + cookie, + )?; + + verify(lcb_store(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_cmdstore_destroy(command), cookie)?; } + + Ok(()) } /// Encodes a `RemoveRequest` into its libcouchbase `lcb_CMDREMOVE` representation. -pub fn encode_remove(instance: *mut lcb_INSTANCE, request: RemoveRequest) { +pub fn encode_remove( + instance: *mut lcb_INSTANCE, + request: RemoveRequest, +) -> Result<(), EncodeFailure> { let (id_len, id) = into_cstring(request.id); let cookie = Box::into_raw(Box::new(request.sender)); let (scope_len, scope) = into_cstring(request.scope); @@ -169,30 +361,41 @@ pub fn encode_remove(instance: *mut lcb_INSTANCE, request: RemoveRequest) { let mut command: *mut lcb_CMDREMOVE = ptr::null_mut(); unsafe { - lcb_cmdremove_create(&mut command); - lcb_cmdremove_key(command, id.as_ptr(), id_len); - lcb_cmdremove_collection( - command, - scope.as_ptr(), - scope_len, - collection.as_ptr(), - collection_len, - ); + verify(lcb_cmdremove_create(&mut command), cookie)?; + verify(lcb_cmdremove_key(command, id.as_ptr(), id_len), cookie)?; + verify( + lcb_cmdremove_collection( + command, + scope.as_ptr(), + scope_len, + collection.as_ptr(), + collection_len, + ), + cookie, + )?; if let Some(cas) = request.options.cas { - lcb_cmdremove_cas(command, cas); + verify(lcb_cmdremove_cas(command, cas), cookie)?; } if let Some(timeout) = request.options.timeout { - lcb_cmdremove_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdremove_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } - lcb_remove(instance, cookie as *mut c_void, command); - lcb_cmdremove_destroy(command); + verify(lcb_remove(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_cmdremove_destroy(command), cookie)?; } + + Ok(()) } /// Encodes a `QueryRequest` into its libcouchbase `lcb_CMDQUERY` representation. -pub fn encode_query(instance: *mut lcb_INSTANCE, mut request: QueryRequest) { +pub fn encode_query( + instance: *mut lcb_INSTANCE, + mut request: QueryRequest, +) -> Result<(), EncodeFailure> { request.options.statement = Some(request.statement); let (payload_len, payload) = into_cstring(serde_json::to_vec(&request.options).unwrap()); @@ -208,21 +411,29 @@ pub fn encode_query(instance: *mut lcb_INSTANCE, mut request: QueryRequest) { let mut command: *mut lcb_CMDQUERY = ptr::null_mut(); unsafe { - lcb_cmdquery_create(&mut command); - lcb_cmdquery_payload(command, payload.as_ptr(), payload_len); + verify_query(lcb_cmdquery_create(&mut command), cookie)?; + verify_query( + lcb_cmdquery_payload(command, payload.as_ptr(), payload_len), + cookie, + )?; if let Some(a) = request.options.adhoc { - lcb_cmdquery_adhoc(command, a.into()); + verify_query(lcb_cmdquery_adhoc(command, a.into()), cookie)?; } - lcb_cmdquery_callback(command, Some(query_callback)); - lcb_query(instance, cookie as *mut c_void, command); - lcb_cmdquery_destroy(command); + verify_query(lcb_cmdquery_callback(command, Some(query_callback)), cookie)?; + verify_query(lcb_query(instance, cookie as *mut c_void, command), cookie)?; + verify_query(lcb_cmdquery_destroy(command), cookie)?; } + + Ok(()) } /// Encodes a `AnalyticsRequest` into its libcouchbase `lcb_CMDANALYTICS` representation. -pub fn encode_analytics(instance: *mut lcb_INSTANCE, mut request: AnalyticsRequest) { +pub fn encode_analytics( + instance: *mut lcb_INSTANCE, + mut request: AnalyticsRequest, +) -> Result<(), EncodeFailure> { request.options.statement = Some(request.statement); let (payload_len, payload) = into_cstring(serde_json::to_vec(&request.options).unwrap()); @@ -238,16 +449,30 @@ pub fn encode_analytics(instance: *mut lcb_INSTANCE, mut request: AnalyticsReque let mut command: *mut lcb_CMDANALYTICS = ptr::null_mut(); unsafe { - lcb_cmdanalytics_create(&mut command); - lcb_cmdanalytics_payload(command, payload.as_ptr(), payload_len); - lcb_cmdanalytics_callback(command, Some(analytics_callback)); - lcb_analytics(instance, cookie as *mut c_void, command); - lcb_cmdanalytics_destroy(command); + verify_analytics(lcb_cmdanalytics_create(&mut command), cookie)?; + verify_analytics( + lcb_cmdanalytics_payload(command, payload.as_ptr(), payload_len), + cookie, + )?; + verify_analytics( + lcb_cmdanalytics_callback(command, Some(analytics_callback)), + cookie, + )?; + verify_analytics( + lcb_analytics(instance, cookie as *mut c_void, command), + cookie, + )?; + verify_analytics(lcb_cmdanalytics_destroy(command), cookie)?; } + + Ok(()) } /// Encodes a `SearchRequest` into its libcouchbase `lcb_CMDSEARCH` representation. -pub fn encode_search(instance: *mut lcb_INSTANCE, mut request: SearchRequest) { +pub fn encode_search( + instance: *mut lcb_INSTANCE, + mut request: SearchRequest, +) -> Result<(), EncodeFailure> { request.options.index = Some(request.index); request.options.query = Some(request.query); @@ -265,12 +490,20 @@ pub fn encode_search(instance: *mut lcb_INSTANCE, mut request: SearchRequest) { let mut command: *mut lcb_CMDSEARCH = ptr::null_mut(); unsafe { - lcb_cmdsearch_create(&mut command); - lcb_cmdsearch_payload(command, payload.as_ptr(), payload_len); - lcb_cmdsearch_callback(command, Some(search_callback)); - lcb_search(instance, cookie as *mut c_void, command); - lcb_cmdsearch_destroy(command); + verify_search(lcb_cmdsearch_create(&mut command), cookie)?; + verify_search( + lcb_cmdsearch_payload(command, payload.as_ptr(), payload_len), + cookie, + )?; + verify_search( + lcb_cmdsearch_callback(command, Some(search_callback)), + cookie, + )?; + verify_search(lcb_search(instance, cookie as *mut c_void, command), cookie)?; + verify_search(lcb_cmdsearch_destroy(command), cookie)?; } + + Ok(()) } enum EncodedLookupSpec { @@ -280,7 +513,10 @@ enum EncodedLookupSpec { } /// Encodes a `LookupInRequest` into its libcouchbase `lcb_CMDSUBDOC` representation. -pub fn encode_lookup_in(instance: *mut lcb_INSTANCE, request: LookupInRequest) { +pub fn encode_lookup_in( + instance: *mut lcb_INSTANCE, + request: LookupInRequest, +) -> Result<(), EncodeFailure> { let (id_len, id) = into_cstring(request.id); let cookie = Box::into_raw(Box::new(request.sender)); let (scope_len, scope) = into_cstring(request.scope); @@ -308,46 +544,69 @@ pub fn encode_lookup_in(instance: *mut lcb_INSTANCE, request: LookupInRequest) { let mut command: *mut lcb_CMDSUBDOC = ptr::null_mut(); let mut specs: *mut lcb_SUBDOCSPECS = ptr::null_mut(); unsafe { - lcb_subdocspecs_create(&mut specs, lookup_specs.len()); + verify( + lcb_subdocspecs_create(&mut specs, lookup_specs.len()), + cookie, + )?; let mut idx = 0; for lookup_spec in &lookup_specs { match lookup_spec { EncodedLookupSpec::Get { path_len, path } => { - lcb_subdocspecs_get(specs, idx, 0, path.as_ptr(), *path_len); + verify( + lcb_subdocspecs_get(specs, idx, 0, path.as_ptr(), *path_len), + cookie, + )?; } EncodedLookupSpec::Exists { path_len, path } => { - lcb_subdocspecs_exists(specs, idx, 0, path.as_ptr(), *path_len); + verify( + lcb_subdocspecs_exists(specs, idx, 0, path.as_ptr(), *path_len), + cookie, + )?; } EncodedLookupSpec::Count { path_len, path } => { - lcb_subdocspecs_get_count(specs, idx, 0, path.as_ptr(), *path_len); + verify( + lcb_subdocspecs_get_count(specs, idx, 0, path.as_ptr(), *path_len), + cookie, + )?; } } idx += 1; } - lcb_cmdsubdoc_create(&mut command); - lcb_cmdsubdoc_key(command, id.as_ptr(), id_len); - lcb_cmdsubdoc_collection( - command, - scope.as_ptr(), - scope_len, - collection.as_ptr(), - collection_len, - ); + verify(lcb_cmdsubdoc_create(&mut command), cookie)?; + verify(lcb_cmdsubdoc_key(command, id.as_ptr(), id_len), cookie)?; + verify( + lcb_cmdsubdoc_collection( + command, + scope.as_ptr(), + scope_len, + collection.as_ptr(), + collection_len, + ), + cookie, + )?; if let Some(timeout) = request.options.timeout { - lcb_cmdsubdoc_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdsubdoc_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } if let Some(access_deleted) = request.options.access_deleted { - lcb_cmdsubdoc_access_deleted(command, if access_deleted { 1 } else { 0 }); + verify( + lcb_cmdsubdoc_access_deleted(command, if access_deleted { 1 } else { 0 }), + cookie, + )?; } - lcb_cmdsubdoc_specs(command, specs); - lcb_subdoc(instance, cookie as *mut c_void, command); - lcb_subdocspecs_destroy(specs); - lcb_cmdsubdoc_destroy(command); + verify(lcb_cmdsubdoc_specs(command, specs), cookie)?; + verify(lcb_subdoc(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_subdocspecs_destroy(specs), cookie)?; + verify(lcb_cmdsubdoc_destroy(command), cookie)?; } + + Ok(()) } pub enum EncodedMutateSpec { @@ -405,7 +664,10 @@ pub enum EncodedMutateSpec { } /// Encodes a `MutateInRequest` into its libcouchbase `lcb_CMDSUBDOC` representation. -pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { +pub fn encode_mutate_in( + instance: *mut lcb_INSTANCE, + request: MutateInRequest, +) -> Result<(), EncodeFailure> { let (id_len, id) = into_cstring(request.id); let cookie = Box::into_raw(Box::new(request.sender)); let (scope_len, scope) = into_cstring(request.scope); @@ -503,7 +765,10 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { let mut command: *mut lcb_CMDSUBDOC = ptr::null_mut(); let mut specs: *mut lcb_SUBDOCSPECS = ptr::null_mut(); unsafe { - lcb_subdocspecs_create(&mut specs, mutate_specs.len()); + verify( + lcb_subdocspecs_create(&mut specs, mutate_specs.len()), + cookie, + )?; let mut idx = 0; for mutate_spec in &mutate_specs { @@ -514,15 +779,18 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_dict_add( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_dict_add( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } EncodedMutateSpec::Upsert { path_len, @@ -530,15 +798,18 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_dict_upsert( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_dict_upsert( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } EncodedMutateSpec::Replace { path_len, @@ -546,25 +817,34 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_replace( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_replace( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } EncodedMutateSpec::Remove { path_len, path } => { - lcb_subdocspecs_remove(specs, idx, 0, path.as_ptr(), *path_len); + verify( + lcb_subdocspecs_remove(specs, idx, 0, path.as_ptr(), *path_len), + cookie, + )?; } EncodedMutateSpec::Counter { path_len, path, delta, } => { - lcb_subdocspecs_counter(specs, idx, 0, path.as_ptr(), *path_len, *delta); + verify( + lcb_subdocspecs_counter(specs, idx, 0, path.as_ptr(), *path_len, *delta), + cookie, + )?; } EncodedMutateSpec::ArrayAppend { path_len, @@ -572,15 +852,18 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_array_add_last( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_array_add_last( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } EncodedMutateSpec::ArrayPrepend { path_len, @@ -588,15 +871,18 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_array_add_last( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_array_add_last( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } EncodedMutateSpec::ArrayAddUnique { path_len, @@ -604,15 +890,18 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_array_add_unique( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_array_add_unique( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } EncodedMutateSpec::ArrayInsert { path_len, @@ -620,35 +909,44 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { value_len, value, } => { - lcb_subdocspecs_array_insert( - specs, - idx, - 0, - path.as_ptr(), - *path_len, - value.as_ptr(), - *value_len, - ); + verify( + lcb_subdocspecs_array_insert( + specs, + idx, + 0, + path.as_ptr(), + *path_len, + value.as_ptr(), + *value_len, + ), + cookie, + )?; } } idx += 1; } - lcb_cmdsubdoc_create(&mut command); - lcb_cmdsubdoc_key(command, id.as_ptr(), id_len); - lcb_cmdsubdoc_collection( - command, - scope.as_ptr(), - scope_len, - collection.as_ptr(), - collection_len, - ); + verify(lcb_cmdsubdoc_create(&mut command), cookie)?; + verify(lcb_cmdsubdoc_key(command, id.as_ptr(), id_len), cookie)?; + verify( + lcb_cmdsubdoc_collection( + command, + scope.as_ptr(), + scope_len, + collection.as_ptr(), + collection_len, + ), + cookie, + )?; if let Some(timeout) = request.options.timeout { - lcb_cmdsubdoc_timeout(command, timeout.as_micros() as u32); + verify( + lcb_cmdsubdoc_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } if let Some(cas) = request.options.cas { - lcb_cmdsubdoc_cas(command, cas); + verify(lcb_cmdsubdoc_cas(command, cas), cookie)?; } if let Some(semantics) = request.options.store_semantics { let ss = match semantics { @@ -656,26 +954,34 @@ pub fn encode_mutate_in(instance: *mut lcb_INSTANCE, request: MutateInRequest) { StoreSemantics::Upsert => lcb_SUBDOC_STORE_SEMANTICS_LCB_SUBDOC_STORE_UPSERT, StoreSemantics::Insert => lcb_SUBDOC_STORE_SEMANTICS_LCB_SUBDOC_STORE_INSERT, }; - lcb_cmdsubdoc_store_semantics(command, ss); + verify(lcb_cmdsubdoc_store_semantics(command, ss), cookie)?; } if let Some(expiry) = request.options.expiry { - lcb_cmdsubdoc_expiry(command, expiry.as_micros() as u32); + verify( + lcb_cmdsubdoc_expiry(command, expiry.as_micros() as u32), + cookie, + )?; } if let Some(access_deleted) = request.options.access_deleted { - lcb_cmdsubdoc_access_deleted(command, if access_deleted { 1 } else { 0 }); + verify( + lcb_cmdsubdoc_access_deleted(command, if access_deleted { 1 } else { 0 }), + cookie, + )?; } - lcb_cmdsubdoc_specs(command, specs); - lcb_subdoc(instance, cookie as *mut c_void, command); - lcb_subdocspecs_destroy(specs); - lcb_cmdsubdoc_destroy(command); + verify(lcb_cmdsubdoc_specs(command, specs), cookie)?; + verify(lcb_subdoc(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_subdocspecs_destroy(specs), cookie)?; + verify(lcb_cmdsubdoc_destroy(command), cookie)?; } + + Ok(()) } pub fn encode_generic_management_request( instance: *mut lcb_INSTANCE, request: GenericManagementRequest, -) { +) -> Result<(), EncodeFailure> { let (path_len, path) = into_cstring(request.path); let cookie = Box::into_raw(Box::new(HttpCookie::GenericManagementRequest { sender: request.sender, @@ -687,7 +993,10 @@ pub fn encode_generic_management_request( let mut command: *mut lcb_CMDHTTP = ptr::null_mut(); unsafe { - lcb_cmdhttp_create(&mut command, lcb_HTTP_TYPE_LCB_HTTP_TYPE_MANAGEMENT); + verify_http( + lcb_cmdhttp_create(&mut command, lcb_HTTP_TYPE_LCB_HTTP_TYPE_MANAGEMENT), + cookie, + )?; let method = match request.method.as_str() { "get" => lcb_HTTP_METHOD_LCB_HTTP_METHOD_GET, "put" => lcb_HTTP_METHOD_LCB_HTTP_METHOD_PUT, @@ -695,28 +1004,39 @@ pub fn encode_generic_management_request( "delete" => lcb_HTTP_METHOD_LCB_HTTP_METHOD_DELETE, _ => panic!("Unknown HTTP method used"), }; - lcb_cmdhttp_method(command, method); - lcb_cmdhttp_path(command, path.as_ptr(), path_len); + verify_http(lcb_cmdhttp_method(command, method), cookie)?; + verify_http(lcb_cmdhttp_path(command, path.as_ptr(), path_len), cookie)?; if let Some(timeout) = request.timeout { - lcb_cmdhttp_timeout(command, timeout.as_micros() as u32); + verify_http( + lcb_cmdhttp_timeout(command, timeout.as_micros() as u32), + cookie, + )?; } if content_type_len > 0 { - lcb_cmdhttp_content_type(command, content_type.as_ptr(), content_type_len); + verify_http( + lcb_cmdhttp_content_type(command, content_type.as_ptr(), content_type_len), + cookie, + )?; } if body_len > 0 { - lcb_cmdhttp_body(command, body.as_ptr(), body_len); + verify_http(lcb_cmdhttp_body(command, body.as_ptr(), body_len), cookie)?; } - lcb_http(instance, cookie as *mut c_void, command); - lcb_cmdhttp_destroy(command); + verify_http(lcb_http(instance, cookie as *mut c_void, command), cookie)?; + verify_http(lcb_cmdhttp_destroy(command), cookie)?; } + + Ok(()) } #[cfg(feature = "volatile")] -pub fn encode_kv_stats(instance: *mut lcb_INSTANCE, request: KvStatsRequest) { +pub fn encode_kv_stats( + instance: *mut lcb_INSTANCE, + request: KvStatsRequest, +) -> Result<(), EncodeFailure> { let (scope_len, scope) = into_cstring(String::from("")); let (collection_len, collection) = into_cstring(String::from("")); let (key_len, key) = into_cstring(request.key.unwrap_or_default()); @@ -751,12 +1071,17 @@ pub fn encode_kv_stats(instance: *mut lcb_INSTANCE, request: KvStatsRequest) { stats_receiver: Some(stats_receiver), })); unsafe { - lcb_stats3(instance, cookie as *mut c_void, &command); + verify( + lcb_stats3(instance, cookie as *mut c_void, &command), + cookie, + )?; } + + Ok(()) } /// Encodes a `PingRequest` into its libcouchbase `lcb_CMDPING` representation. -pub fn encode_ping(instance: *mut lcb_INSTANCE, request: PingRequest) { +pub fn encode_ping(instance: *mut lcb_INSTANCE, request: PingRequest) -> Result<(), EncodeFailure> { let cookie = Box::into_raw(Box::new(request.sender)); let report_id = request @@ -767,10 +1092,15 @@ pub fn encode_ping(instance: *mut lcb_INSTANCE, request: PingRequest) { let mut command: *mut lcb_CMDPING = ptr::null_mut(); unsafe { - lcb_cmdping_create(&mut command); - lcb_cmdping_report_id(command, c_report_id.as_ptr(), report_id_len); - lcb_cmdping_all(command); - lcb_ping(instance, cookie as *mut c_void, command); - lcb_cmdping_destroy(command); + verify(lcb_cmdping_create(&mut command), cookie)?; + verify( + lcb_cmdping_report_id(command, c_report_id.as_ptr(), report_id_len), + cookie, + )?; + verify(lcb_cmdping_all(command), cookie)?; + verify(lcb_ping(instance, cookie as *mut c_void, command), cookie)?; + verify(lcb_cmdping_destroy(command), cookie)?; } + + Ok(()) } diff --git a/couchbase/src/io/lcb/instance.rs b/couchbase/src/io/lcb/instance.rs index 3082089f..7adbf923 100644 --- a/couchbase/src/io/lcb/instance.rs +++ b/couchbase/src/io/lcb/instance.rs @@ -171,8 +171,10 @@ impl LcbInstance { } pub fn handle_request(&mut self, request: Request) { - encode_request(self.inner, request); - self.increment_outstanding_requests(); + match encode_request(self.inner, request) { + Ok(_) => self.increment_outstanding_requests(), + Err(e) => warn!("Failed to encode request because of {:?}", e), + } } } diff --git a/couchbase/src/io/lcb/mod.rs b/couchbase/src/io/lcb/mod.rs index acb72a42..b9e76480 100644 --- a/couchbase/src/io/lcb/mod.rs +++ b/couchbase/src/io/lcb/mod.rs @@ -8,6 +8,8 @@ use crate::api::results::{ SearchMetaData, SearchResult, }; +use encode::EncodeFailure; + use crate::io::request::Request; use instance::{LcbInstance, LcbInstances}; @@ -173,24 +175,26 @@ pub enum IoRequest { Shutdown, } -fn encode_request(instance: *mut lcb_INSTANCE, request: Request) { +fn encode_request(instance: *mut lcb_INSTANCE, request: Request) -> Result<(), EncodeFailure> { match request { - Request::Get(r) => encode::encode_get(instance, r), - Request::Query(r) => encode::encode_query(instance, r), - Request::Analytics(r) => encode::encode_analytics(instance, r), - Request::Search(r) => encode::encode_search(instance, r), - Request::Mutate(r) => encode::encode_mutate(instance, r), - Request::Exists(r) => encode::encode_exists(instance, r), - Request::Remove(r) => encode::encode_remove(instance, r), - Request::LookupIn(r) => encode::encode_lookup_in(instance, r), - Request::MutateIn(r) => encode::encode_mutate_in(instance, r), + Request::Get(r) => encode::encode_get(instance, r)?, + Request::Query(r) => encode::encode_query(instance, r)?, + Request::Analytics(r) => encode::encode_analytics(instance, r)?, + Request::Search(r) => encode::encode_search(instance, r)?, + Request::Mutate(r) => encode::encode_mutate(instance, r)?, + Request::Exists(r) => encode::encode_exists(instance, r)?, + Request::Remove(r) => encode::encode_remove(instance, r)?, + Request::LookupIn(r) => encode::encode_lookup_in(instance, r)?, + Request::MutateIn(r) => encode::encode_mutate_in(instance, r)?, Request::GenericManagementRequest(r) => { - encode::encode_generic_management_request(instance, r) + encode::encode_generic_management_request(instance, r)? } #[cfg(feature = "volatile")] - Request::KvStatsRequest(r) => encode::encode_kv_stats(instance, r), - Request::Ping(r) => encode::encode_ping(instance, r), + Request::KvStatsRequest(r) => encode::encode_kv_stats(instance, r)?, + Request::Ping(r) => encode::encode_ping(instance, r)?, } + + Ok(()) } struct QueryCookie {