Skip to content

Commit c93d7d4

Browse files
ice1000overvenus
authored andcommitted
Zero copy for receive (#222)
Signed-off-by: ice1000 <[email protected]>
1 parent f941e23 commit c93d7d4

File tree

14 files changed

+395
-134
lines changed

14 files changed

+395
-134
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ Cargo.lock
33
*.rs.bk
44
*.rs.fmt
55
.vscode
6+
.idea
7+
*.o

benchmark/src/bench.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@
1414
// TODO: Remove it once Rust's tool_lints is stabilized.
1515
#![cfg_attr(feature = "cargo-clippy", allow(renamed_and_removed_lints))]
1616

17+
use std::io::Read;
1718
use std::sync::atomic::{AtomicBool, Ordering};
1819
use std::sync::Arc;
1920

2021
use futures::{Future, Sink, Stream};
2122
use grpc::{
22-
self, ClientStreamingSink, DuplexSink, Method, MethodType, RequestStream, RpcContext,
23-
RpcStatus, RpcStatusCode, ServerStreamingSink, ServiceBuilder, UnarySink, WriteFlags,
23+
self, ClientStreamingSink, DuplexSink, MessageReader, Method, MethodType, RequestStream,
24+
RpcContext, RpcStatus, RpcStatusCode, ServerStreamingSink, ServiceBuilder, UnarySink,
25+
WriteFlags,
2426
};
2527
use grpc_proto::testing::messages::{SimpleRequest, SimpleResponse};
2628
use grpc_proto::testing::services_grpc::BenchmarkService;
@@ -115,8 +117,10 @@ pub fn bin_ser(t: &Vec<u8>, buf: &mut Vec<u8>) {
115117
}
116118

117119
#[inline]
118-
pub fn bin_de(buf: &[u8]) -> grpc::Result<Vec<u8>> {
119-
Ok(buf.to_vec())
120+
pub fn bin_de(mut reader: MessageReader) -> grpc::Result<Vec<u8>> {
121+
let mut buf = vec![];
122+
reader.read_to_end(&mut buf).unwrap();
123+
Ok(buf)
120124
}
121125

122126
pub const METHOD_BENCHMARK_SERVICE_GENERIC_CALL: Method<Vec<u8>, Vec<u8>> = Method {

benchmark/src/client.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,11 +325,13 @@ fn execute<B: Backoff + Send + 'static>(
325325
}
326326
RequestExecutor::new(ctx, ch, cfg).execute_unary_async()
327327
}
328-
RpcType::STREAMING => if cfg.get_payload_config().has_bytebuf_params() {
329-
GenericExecutor::new(ctx, ch, cfg).execute_stream()
330-
} else {
331-
RequestExecutor::new(ctx, ch, cfg).execute_stream_ping_pong()
332-
},
328+
RpcType::STREAMING => {
329+
if cfg.get_payload_config().has_bytebuf_params() {
330+
GenericExecutor::new(ctx, ch, cfg).execute_stream()
331+
} else {
332+
RequestExecutor::new(ctx, ch, cfg).execute_stream_ping_pong()
333+
}
334+
}
333335
_ => unimplemented!(),
334336
},
335337
_ => unimplemented!(),

grpc-sys/grpc_wrap.cc

Lines changed: 29 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -297,42 +297,25 @@ grpcwrap_batch_context_recv_initial_metadata(
297297
return &(ctx->recv_initial_metadata);
298298
}
299299

300-
GPR_EXPORT size_t GPR_CALLTYPE
301-
grpcwrap_batch_context_recv_message_length(const grpcwrap_batch_context* ctx) {
302-
grpc_byte_buffer_reader reader;
303-
if (!ctx->recv_message) {
304-
return (size_t)-1;
305-
}
306-
307-
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, ctx->recv_message));
308-
size_t result = grpc_byte_buffer_length(reader.buffer_out);
309-
grpc_byte_buffer_reader_destroy(&reader);
300+
GPR_EXPORT const char *GPR_CALLTYPE
301+
grpcwrap_slice_raw_offset(const grpc_slice *slice, size_t offset, size_t *len) {
302+
*len = GRPC_SLICE_LENGTH(*slice) - offset;
303+
return (const char *)(GRPC_SLICE_START_PTR(*slice)) + offset;
304+
}
310305

311-
return result;
306+
GPR_EXPORT size_t GPR_CALLTYPE
307+
grpcwrap_slice_length(const grpc_slice *slice) {
308+
return GRPC_SLICE_LENGTH(*slice);
312309
}
313310

314-
/*
315-
* Copies data from recv_message to a buffer. Fatal error occurs if
316-
* buffer is too small.
317-
*/
318-
GPR_EXPORT void GPR_CALLTYPE grpcwrap_batch_context_recv_message_to_buffer(
319-
const grpcwrap_batch_context* ctx, char* buffer, size_t buffer_len) {
320-
grpc_byte_buffer_reader reader;
321-
grpc_slice slice;
322-
size_t offset = 0;
323-
324-
GPR_ASSERT(grpc_byte_buffer_reader_init(&reader, ctx->recv_message));
325-
326-
while (grpc_byte_buffer_reader_next(&reader, &slice)) {
327-
size_t len = GRPC_SLICE_LENGTH(slice);
328-
GPR_ASSERT(offset + len <= buffer_len);
329-
memcpy(buffer + offset, GRPC_SLICE_START_PTR(slice),
330-
GRPC_SLICE_LENGTH(slice));
331-
offset += len;
332-
grpc_slice_unref(slice);
311+
GPR_EXPORT grpc_byte_buffer *GPR_CALLTYPE
312+
grpcwrap_batch_context_take_recv_message(grpcwrap_batch_context *ctx) {
313+
grpc_byte_buffer *buf = NULL;
314+
if (ctx->recv_message) {
315+
buf = ctx->recv_message;
316+
ctx->recv_message = NULL;
333317
}
334-
335-
grpc_byte_buffer_reader_destroy(&reader);
318+
return buf;
336319
}
337320

338321
GPR_EXPORT grpc_status_code GPR_CALLTYPE
@@ -858,3 +841,17 @@ grpcwrap_ssl_server_credentials_create(
858841
}
859842

860843
#endif
844+
845+
/* Sanity check for complicated types */
846+
847+
GPR_EXPORT void GPR_CALLTYPE grpcwrap_sanity_check_slice(size_t size,
848+
size_t align) {
849+
GPR_ASSERT(sizeof(grpc_slice) == size);
850+
GPR_ASSERT(alignof(grpc_slice) == align);
851+
}
852+
853+
GPR_EXPORT void GPR_CALLTYPE
854+
grpcwrap_sanity_check_byte_buffer_reader(size_t size, size_t align) {
855+
GPR_ASSERT(sizeof(grpc_byte_buffer_reader) == size);
856+
GPR_ASSERT(alignof(grpc_byte_buffer_reader) == align);
857+
}

grpc-sys/src/lib.rs

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
// limitations under the License.
1313

1414
#![allow(unknown_lints)]
15+
#![allow(renamed_and_removed_lints)]
16+
// remove this after Rust's tool_lints is stabilized
1517

1618
extern crate libc;
1719

18-
use libc::{c_char, c_int, c_uint, c_void, int32_t, int64_t, size_t, uint32_t};
20+
use libc::{c_char, c_int, c_uint, c_void, int32_t, int64_t, size_t, uint32_t, uint8_t};
21+
use std::mem;
1922
use std::time::Duration;
2023

2124
/// The clocks gRPC supports.
@@ -348,6 +351,52 @@ pub struct GrpcMetadataArray {
348351
pub metadata: *mut GrpcMetadata,
349352
}
350353

354+
#[repr(C)]
355+
#[derive(Clone, Copy)]
356+
pub struct GrpcSliceRefCounted {
357+
bytes: *mut uint8_t,
358+
length: size_t,
359+
}
360+
361+
#[repr(C)]
362+
#[derive(Clone, Copy)]
363+
pub struct GrpcSliceInlined {
364+
length: uint8_t,
365+
// TODO: use size_of when it becomes a const function.
366+
#[cfg(target_pointer_width = "64")]
367+
bytes: [uint8_t; 23],
368+
#[cfg(target_pointer_width = "32")]
369+
bytes: [uint8_t; 11],
370+
}
371+
372+
#[repr(C)]
373+
#[derive(Clone, Copy)]
374+
pub union GrpcSliceData {
375+
ref_counted: GrpcSliceRefCounted,
376+
inlined: GrpcSliceInlined,
377+
}
378+
379+
pub enum GrpcSliceRefCount {}
380+
381+
#[repr(C)]
382+
#[derive(Clone, Copy)]
383+
pub struct GrpcSlice {
384+
ref_count: *mut GrpcSliceRefCount,
385+
data: GrpcSliceData,
386+
}
387+
388+
#[repr(C)]
389+
pub union GrpcByteBufferReaderCurrent {
390+
index: c_uint,
391+
}
392+
393+
#[repr(C)]
394+
pub struct GrpcByteBufferReader {
395+
pub buffer_in: *mut GrpcByteBuffer,
396+
pub buffer_out: *mut GrpcByteBuffer,
397+
current: GrpcByteBufferReaderCurrent,
398+
}
399+
351400
pub const GRPC_INITIAL_METADATA_IDEMPOTENT_REQUEST: uint32_t = 0x0000_0010;
352401
pub const GRPC_INITIAL_METADATA_WAIT_FOR_READY: uint32_t = 0x0000_0020;
353402
pub const GRPC_INITIAL_METADATA_CACHEABLE_REQUEST: uint32_t = 0x0000_0040;
@@ -356,7 +405,6 @@ pub const GRPC_WRITE_BUFFER_HINT: uint32_t = 0x0000_0001;
356405
pub const GRPC_WRITE_NO_COMPRESS: uint32_t = 0x0000_0002;
357406

358407
pub enum GrpcMetadata {}
359-
pub enum GrpcSlice {}
360408
pub enum GrpcCallDetails {}
361409
pub enum GrpcCompletionQueue {}
362410
pub enum GrpcChannel {}
@@ -443,17 +491,33 @@ extern "C" {
443491
) -> *mut GrpcChannel;
444492
pub fn grpc_channel_destroy(channel: *mut GrpcChannel);
445493

494+
pub fn grpc_slice_unref(slice: GrpcSlice);
495+
pub fn grpc_byte_buffer_length(buf: *const GrpcByteBuffer) -> size_t;
496+
pub fn grpcwrap_slice_length(slice: *const GrpcSlice) -> size_t;
497+
pub fn grpcwrap_slice_raw_offset(
498+
slice: *const GrpcSlice,
499+
offset: size_t,
500+
len: *mut size_t,
501+
) -> *const c_char;
502+
pub fn grpc_byte_buffer_reader_init(
503+
reader: *mut GrpcByteBufferReader,
504+
buf: *mut GrpcByteBuffer,
505+
) -> c_int;
506+
pub fn grpc_byte_buffer_reader_next(
507+
reader: *mut GrpcByteBufferReader,
508+
buf: *mut GrpcSlice,
509+
) -> c_int;
510+
pub fn grpc_byte_buffer_reader_destroy(reader: *mut GrpcByteBufferReader);
511+
pub fn grpc_byte_buffer_destroy(buf: *mut GrpcByteBuffer);
512+
446513
pub fn grpcwrap_batch_context_create() -> *mut GrpcBatchContext;
447514
pub fn grpcwrap_batch_context_destroy(ctx: *mut GrpcBatchContext);
448515
pub fn grpcwrap_batch_context_recv_initial_metadata(
449516
ctx: *mut GrpcBatchContext,
450517
) -> *const GrpcMetadataArray;
451-
pub fn grpcwrap_batch_context_recv_message_length(ctx: *mut GrpcBatchContext) -> size_t;
452-
pub fn grpcwrap_batch_context_recv_message_to_buffer(
518+
pub fn grpcwrap_batch_context_take_recv_message(
453519
ctx: *mut GrpcBatchContext,
454-
buffer: *mut c_char,
455-
buffer_len: size_t,
456-
);
520+
) -> *mut GrpcByteBuffer;
457521
pub fn grpcwrap_batch_context_recv_status_on_client_status(
458522
ctx: *mut GrpcBatchContext,
459523
) -> GrpcStatusCode;
@@ -476,7 +540,7 @@ extern "C" {
476540
pub fn grpcwrap_call_start_unary(
477541
call: *mut GrpcCall,
478542
ctx: *mut GrpcBatchContext,
479-
send_bufer: *const c_char,
543+
send_buffer: *const c_char,
480544
send_buffer_len: size_t,
481545
write_flags: uint32_t,
482546
initial_metadata: *mut GrpcMetadataArray,
@@ -493,7 +557,7 @@ extern "C" {
493557
pub fn grpcwrap_call_start_server_streaming(
494558
call: *mut GrpcCall,
495559
ctx: *mut GrpcBatchContext,
496-
send_bufer: *const c_char,
560+
send_buffer: *const c_char,
497561
send_buffer_len: size_t,
498562
write_flags: uint32_t,
499563
initial_metadata: *mut GrpcMetadataArray,
@@ -515,7 +579,7 @@ extern "C" {
515579
pub fn grpcwrap_call_send_message(
516580
call: *mut GrpcCall,
517581
ctx: *mut GrpcBatchContext,
518-
send_bufer: *const c_char,
582+
send_buffer: *const c_char,
519583
send_buffer_len: size_t,
520584
write_flags: uint32_t,
521585
send_empty_initial_metadata: uint32_t,
@@ -570,7 +634,7 @@ extern "C" {
570634
server: *mut GrpcServer,
571635
method: *const c_char,
572636
host: *const c_char,
573-
paylod_handling: GrpcServerRegisterMethodPayloadHandling,
637+
payload_handling: GrpcServerRegisterMethodPayloadHandling,
574638
flags: uint32_t,
575639
) -> *mut c_void;
576640
pub fn grpc_server_create(
@@ -646,6 +710,19 @@ extern "C" {
646710
pub fn grpcwrap_metadata_array_cleanup(array: *mut GrpcMetadataArray);
647711

648712
pub fn gpr_free(p: *mut c_void);
713+
714+
pub fn grpcwrap_sanity_check_slice(size: size_t, align: size_t);
715+
pub fn grpcwrap_sanity_check_byte_buffer_reader(size: size_t, align: size_t);
716+
}
717+
718+
/// Make sure the complicated struct written in rust is the same with
719+
/// its C one.
720+
pub unsafe fn sanity_check() {
721+
grpcwrap_sanity_check_slice(mem::size_of::<GrpcSlice>(), mem::align_of::<GrpcSlice>());
722+
grpcwrap_sanity_check_byte_buffer_reader(
723+
mem::size_of::<GrpcByteBufferReader>(),
724+
mem::align_of::<GrpcByteBufferReader>(),
725+
);
649726
}
650727

651728
#[cfg(feature = "secure")]
@@ -655,6 +732,7 @@ mod secure_component {
655732
use super::{GrpcChannel, GrpcChannelArgs, GrpcServer};
656733

657734
pub enum GrpcChannelCredentials {}
735+
658736
pub enum GrpcServerCredentials {}
659737

660738
extern "C" {
@@ -703,6 +781,7 @@ mod tests {
703781
fn smoke() {
704782
unsafe {
705783
super::grpc_init();
784+
super::sanity_check();
706785
let cq = super::grpc_completion_queue_create_for_next(ptr::null_mut());
707786
super::grpc_completion_queue_destroy(cq);
708787
super::grpc_shutdown();

src/async/callback.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,8 @@ impl UnaryRequest {
6969
return;
7070
}
7171

72-
let data = self.ctx.batch_ctx().recv_message();
73-
self.ctx
74-
.handle(&mut rc, cq, data.as_ref().map(|v| v.as_slice()));
72+
let reader = self.ctx.batch_ctx_mut().recv_message();
73+
self.ctx.handle(&mut rc, cq, reader);
7574
server::request_call(rc, cq);
7675
}
7776
}

src/async/mod.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use self::callback::{Abort, Request as RequestCallback, UnaryRequest as UnaryReq
2626
use self::executor::SpawnNotify;
2727
use self::promise::{Batch as BatchPromise, Shutdown as ShutdownPromise};
2828
use call::server::RequestContext;
29-
use call::{BatchContext, Call};
29+
use call::{BatchContext, Call, MessageReader};
3030
use cq::CompletionQueue;
3131
use error::{Error, Result};
3232
use server::RequestCallContext;
@@ -115,9 +115,8 @@ impl<T> Future for CqFuture<T> {
115115
}
116116
}
117117

118-
pub type BatchMessage = Option<Vec<u8>>;
119118
/// Future object for batch jobs.
120-
pub type BatchFuture = CqFuture<BatchMessage>;
119+
pub type BatchFuture = CqFuture<Option<MessageReader>>;
121120

122121
/// A result holder for asynchronous execution.
123122
// This enum is going to be passed to FFI, so don't use trait or generic here.

src/async/promise.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
use std::fmt::{self, Debug, Formatter};
1515
use std::sync::Arc;
1616

17-
use super::{BatchMessage, Inner};
18-
use call::{BatchContext, RpcStatusCode};
17+
use super::Inner;
18+
use call::{BatchContext, MessageReader, RpcStatusCode};
1919
use error::Error;
2020

2121
/// Batch job type.
@@ -33,11 +33,11 @@ pub enum BatchType {
3333
pub struct Batch {
3434
ty: BatchType,
3535
ctx: BatchContext,
36-
inner: Arc<Inner<BatchMessage>>,
36+
inner: Arc<Inner<Option<MessageReader>>>,
3737
}
3838

3939
impl Batch {
40-
pub fn new(ty: BatchType, inner: Arc<Inner<BatchMessage>>) -> Batch {
40+
pub fn new(ty: BatchType, inner: Arc<Inner<Option<MessageReader>>>) -> Batch {
4141
Batch {
4242
ty,
4343
ctx: BatchContext::new(),

0 commit comments

Comments
 (0)