Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions src/Curl.jl
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ const NOCHANNEL = NoChannel()


Base.isopen(req::NoChannel) = false
Base.isempty(req::NoChannel) = true
Base.isempty(req::NoChannel) = true
Base.put!(req::NoChannel, ::IOBuffer) = false
Base.take!(req::NoChannel) = nothing
Base.close(req::NoChannel) = false
Base.iterate(req::NoChannel) = Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0)))
Base.iterate(req::NoChannel) =
Iterators.Stateful(Iterators.flatten(Iterators.repeated(nothing, 0)))


function write_callback(
Expand Down Expand Up @@ -197,8 +198,8 @@ mutable struct gRPCRequest
response::IOBuffer

# These are only used when the request or response is streaming
request_c::Union{Channel{IOBuffer}, NoChannel}
response_c::Union{Channel{IOBuffer}, NoChannel}
request_c::Union{Channel{IOBuffer},NoChannel}
response_c::Union{Channel{IOBuffer},NoChannel}

# The task making the request can block on this until the request is complete
ready::Event
Expand Down Expand Up @@ -231,13 +232,20 @@ mutable struct gRPCRequest
url::String,
request::IOBuffer,
response::IOBuffer,
request_c::Union{Channel{IOBuffer}, NoChannel},
response_c::Union{Channel{IOBuffer}, NoChannel};
request_c::Union{Channel{IOBuffer},NoChannel},
response_c::Union{Channel{IOBuffer},NoChannel};
deadline = 10,
keepalive = 60,
max_send_message_length = 4 * 1024 * 1024,
max_recieve_message_length = 4 * 1024 * 1024,
)
!isready(grpc) && throw(
gRPCServiceCallException(
GRPC_FAILED_PRECONDITION,
"gRPCCURL backend is not running, did you forget to call grpc_init()?",
),
)

# If the grpc handle is shutdown avoid acquiring the request semaphore and immediately throw an exception
if !grpc.running
throw(
Expand All @@ -259,8 +267,12 @@ mutable struct gRPCRequest
# curl_easy_setopt(easy_handle, CURLOPT_VERBOSE, UInt32(1))

curl_easy_setopt(easy_handle, CURLOPT_URL, url)
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000*deadline)))
curl_easy_setopt(easy_handle, CURLOPT_CONNECTTIMEOUT_MS, Clong(ceil(1000*deadline)))
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT_MS, Clong(ceil(1000 * deadline)))
curl_easy_setopt(
easy_handle,
CURLOPT_CONNECTTIMEOUT_MS,
Clong(ceil(1000 * deadline)),
)
curl_easy_setopt(easy_handle, CURLOPT_PIPEWAIT, Clong(1))
curl_easy_setopt(easy_handle, CURLOPT_POST, Clong(1))
curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, "POST")
Expand Down Expand Up @@ -702,30 +714,27 @@ mutable struct gRPCCURL
running::Bool
requests::Vector{gRPCRequest}
# Allows for controlling the maximum number of concurrent gRPC requests/streams
events::Channel{Event}
sem::Channel{Event}

function gRPCCURL(max_streams::Int = GRPC_MAX_STREAMS)
function gRPCCURL(; max_streams::Int = GRPC_MAX_STREAMS, running = true)
grpc = new(
Ptr{Cvoid}(0),
ReentrantLock(),
nothing,
Dict{curl_socket_t,CURLWatcher}(),
ReentrantLock(),
true,
running,
Vector{gRPCRequest}(),
Channel{Event}(max_streams),
)

# We use a channel as a Semaphore which also acts as a way to reuse Events to reduce allocations
for _ = 1:max_streams
put!(grpc.events, Event())
end

preserve_handle(grpc)
finalizer((x) -> close(x), grpc)

grpc_multi_init(grpc)
# This is used for the global const gRPCCURL handle
# The user is expected to call grpc_init() in order to use it
!running && return grpc

finalizer((x) -> close(x), grpc)
open(grpc)

return grpc
end
Expand Down Expand Up @@ -774,9 +783,9 @@ function Base.open(grpc::gRPCCURL)
grpc.watchers = Dict{curl_socket_t,CURLWatcher}()
end

grpc.events = Channel{Event}(grpc.events.sz_max)
for _ = 1:grpc.events.sz_max
put!(grpc.events, Event())
grpc.sem = Channel{Event}(grpc.sem.sz_max)
for _ = 1:grpc.sem.sz_max
put!(grpc.sem, Event())
end

grpc.requests = Vector{gRPCRequest}()
Expand All @@ -789,11 +798,13 @@ function Base.open(grpc::gRPCCURL)
end
end

max_reqs_dec(grpc::gRPCCURL) = take!(grpc.events)
isready(grpc::gRPCCURL) = grpc.running

max_reqs_dec(grpc::gRPCCURL) = take!(grpc.sem)
function max_reqs_inc(grpc::gRPCCURL, req::gRPCRequest)
# Reset before we recycle
reset(req.curl_done_reading)
put!(grpc.events, req.curl_done_reading)
put!(grpc.sem, req.curl_done_reading)
end

function cleanup_request(grpc::gRPCCURL, req::gRPCRequest)
Expand Down
4 changes: 2 additions & 2 deletions src/ProtoBuf.jl
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ function service_cb(io, t::CodeGenerators.ServiceType, ctx::CodeGenerators.Conte
end

import_cb(io, ctx, definitions) =
mapreduce(x->x isa CodeGenerators.ServiceType ? 1 : 0, +, values(definitions)) > 0 &&
println(io, "import gRPCClient")
mapreduce(x -> x isa CodeGenerators.ServiceType ? 1 : 0, +, values(definitions)) > 0 &&
println(io, "import gRPCClient")


grpc_register_service_codegen() = CodeGenerators.register_external_codegen_handler(
Expand Down
48 changes: 24 additions & 24 deletions src/Streaming.jl
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,10 @@ test_response = grpc_async_await(client, req)
function grpc_async_request(
client::gRPCServiceClient{TRequest,true,TResponse,false},
request::Channel{TRequest};
deadline=client.deadline,
keepalive=client.keepalive,
max_send_message_length=client.max_send_message_length,
max_recieve_message_length=client.max_recieve_message_length,
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
) where {TRequest<:Any,TResponse<:Any}

req = gRPCRequest(
Expand All @@ -176,10 +176,10 @@ function grpc_async_request(
IOBuffer(),
Channel{IOBuffer}(16),
NOCHANNEL;
deadline=deadline,
keepalive=keepalive,
max_send_message_length=max_send_message_length,
max_recieve_message_length=max_recieve_message_length,
deadline = deadline,
keepalive = keepalive,
max_send_message_length = max_send_message_length,
max_recieve_message_length = max_recieve_message_length,
)

request_task = Threads.@spawn grpc_async_stream_request(req, request)
Expand Down Expand Up @@ -249,10 +249,10 @@ function grpc_async_request(
client::gRPCServiceClient{TRequest,false,TResponse,true},
request::TRequest,
response::Channel{TResponse};
deadline=client.deadline,
keepalive=client.keepalive,
max_send_message_length=client.max_send_message_length,
max_recieve_message_length=client.max_recieve_message_length
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
) where {TRequest<:Any,TResponse<:Any}

request_buf = grpc_encode_request_iobuffer(
Expand All @@ -268,10 +268,10 @@ function grpc_async_request(
IOBuffer(),
NOCHANNEL,
Channel{IOBuffer}(16);
deadline=deadline,
keepalive=keepalive,
max_send_message_length=max_send_message_length,
max_recieve_message_length=max_recieve_message_length,
deadline = deadline,
keepalive = keepalive,
max_send_message_length = max_send_message_length,
max_recieve_message_length = max_recieve_message_length,
)

response_task = Threads.@spawn grpc_async_stream_response(req, response)
Expand Down Expand Up @@ -353,10 +353,10 @@ function grpc_async_request(
client::gRPCServiceClient{TRequest,true,TResponse,true},
request::Channel{TRequest},
response::Channel{TResponse};
deadline=client.deadline,
keepalive=client.keepalive,
max_send_message_length=client.max_send_message_length,
max_recieve_message_length=client.max_recieve_message_length,
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
) where {TRequest<:Any,TResponse<:Any}

req = gRPCRequest(
Expand All @@ -366,10 +366,10 @@ function grpc_async_request(
IOBuffer(),
Channel{IOBuffer}(16),
Channel{IOBuffer}(16);
deadline=deadline,
keepalive=keepalive,
max_send_message_length=max_send_message_length,
max_recieve_message_length=max_recieve_message_length,
deadline = deadline,
keepalive = keepalive,
max_send_message_length = max_send_message_length,
max_recieve_message_length = max_recieve_message_length,
)

request_task = Threads.@spawn grpc_async_stream_request(req, request)
Expand Down
28 changes: 18 additions & 10 deletions src/Unary.jl
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ end
"""
function grpc_async_request(
client::gRPCServiceClient{TRequest,false,TResponse,false},
request::TRequest,
request::TRequest;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
) where {TRequest<:Any,TResponse<:Any}

request_buf = grpc_encode_request_iobuffer(
Expand All @@ -76,10 +80,10 @@ function grpc_async_request(
IOBuffer(),
NOCHANNEL,
NOCHANNEL;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
deadline = deadline,
keepalive = keepalive,
max_send_message_length = max_send_message_length,
max_recieve_message_length = max_recieve_message_length,
)

req
Expand Down Expand Up @@ -154,7 +158,11 @@ function grpc_async_request(
client::gRPCServiceClient{TRequest,false,TResponse,false},
request::TRequest,
channel::Channel{gRPCAsyncChannelResponse{TResponse}},
index::Int64,
index::Int64;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
) where {TRequest<:Any,TResponse<:Any}

request_buf = grpc_encode_request_iobuffer(
Expand All @@ -170,10 +178,10 @@ function grpc_async_request(
IOBuffer(),
NOCHANNEL,
NOCHANNEL;
deadline = client.deadline,
keepalive = client.keepalive,
max_send_message_length = client.max_send_message_length,
max_recieve_message_length = client.max_recieve_message_length,
deadline = deadline,
keepalive = keepalive,
max_send_message_length = max_send_message_length,
max_recieve_message_length = max_recieve_message_length,
)

Threads.@spawn begin
Expand Down
2 changes: 1 addition & 1 deletion src/gRPC.jl
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const _grpc = gRPCCURL()
const _grpc = gRPCCURL(running = false)

"""
grpc_global_handle()
Expand Down