Skip to content

Commit

Permalink
Delay client async writer starting (#1531)
Browse files Browse the repository at this point in the history
Motivation:

It's possible for async streaming clients to fail and drop messages in
some situations. The situation leading to this happens because streaming
calls set up state and then write out headers. While setting up state
the HTTP/2 stream channel is configured, when it becomes active gRPC
calls outs to enable the async writer to start emitting writes. This can
happen before the headers are written so if a write is already pending
then it can race the headers being written. If the message is written
first then the write promise is failed and the message is dropped.

Modifications:

Delay letting the async writer emit writes until the headers have been
written.

Result:

Correct ordering is enforced.
  • Loading branch information
glbrntt authored Dec 13, 2022
1 parent 524f06b commit 4312579
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
5 changes: 4 additions & 1 deletion Sources/GRPC/Interceptor/ClientTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,6 @@ extension ClientTransport {
self._pipeline?.logger = self.logger
self.logger.debug("activated stream channel")
self.channel = channel
self.onStart()
self.unbuffer()

case .close:
Expand Down Expand Up @@ -943,6 +942,10 @@ extension ClientTransport {
case let .metadata(headers):
let head = self.makeRequestHead(with: headers)
channel.write(self.wrapOutboundOut(.head(head)), promise: promise)
// Messages are buffered by this class and in the async writer for async calls. Initially the
// async writer is not allowed to emit messages; the call to 'onStart()' signals that messages
// may be emitted. We call it here to avoid races between writing headers and messages.
self.onStart()

case let .message(request, metadata):
do {
Expand Down
39 changes: 39 additions & 0 deletions Tests/GRPCTests/ClientCallTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -206,4 +206,43 @@ class ClientCallTests: GRPCTestCase {
// Cancellation should now fail, we've already cancelled.
assertThat(try get.cancel().wait(), .throws(.instanceOf(GRPCError.AlreadyComplete.self)))
}

func testWriteMessageOnStart() throws {
// This test isn't deterministic so run a bunch of iterations.
for _ in 0 ..< 100 {
let call = self.update()
let promise = call.eventLoop.makePromise(of: Void.self)
let finished = call.eventLoop.makePromise(of: Void.self)

call.invokeStreamingRequests {
// Send in onStart.
call.send(
.message(.with { $0.text = "foo" }, .init(compress: false, flush: false)),
promise: promise
)
} onError: { _ in // ignore errors
} onResponsePart: {
switch $0 {
case .metadata, .message:
()
case .end:
finished.succeed(())
}
}

// End the stream.
promise.futureResult.whenComplete { _ in
call.send(.end, promise: nil)
}

do {
try promise.futureResult.wait()
try finished.futureResult.wait()
} catch {
// Stop on the first error.
XCTFail("Unexpected error: \(error)")
return
}
}
}
}

0 comments on commit 4312579

Please sign in to comment.