forked from apple/swift-nio-extras
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathRequestResponseHandler.swift
126 lines (112 loc) · 4.77 KB
/
RequestResponseHandler.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2017-2021 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
import NIOCore
/// ``RequestResponseHandler`` receives a `Request` alongside an `EventLoopPromise<Response>` from the `Channel`'s
/// outbound side. It will fulfill the promise with the `Response` once it's received from the `Channel`'s inbound
/// side.
///
/// ``RequestResponseHandler`` does support pipelining `Request`s and it will send them pipelined further down the
/// `Channel`. Should ``RequestResponseHandler`` receive an error from the `Channel`, it will fail all promises meant for
/// the outstanding `Reponse`s and close the `Channel`. All requests enqueued after an error occured will be immediately
/// failed with the first error the channel received.
///
/// ``RequestResponseHandler`` requires that the `Response`s arrive on `Channel` in the same order as the `Request`s
/// were submitted.
public final class RequestResponseHandler<Request, Response>: ChannelDuplexHandler {
/// `Response` is the type this class expects to receive inbound.
public typealias InboundIn = Response
/// Don't expect to pass anything on in-bound.
public typealias InboundOut = Never
/// Type this class expect to receive in an outbound direction.
public typealias OutboundIn = (Request, EventLoopPromise<Response>)
/// Type this class passes out.
public typealias OutboundOut = Request
private enum State {
case operational
case error(Error)
var isOperational: Bool {
switch self {
case .operational:
return true
case .error:
return false
}
}
}
private var state: State = .operational
private var promiseBuffer: CircularBuffer<EventLoopPromise<Response>>
/// Create a new ``RequestResponseHandler``.
///
/// - parameters:
/// - initialBufferCapacity: ``RequestResponseHandler`` saves the promises for all outstanding responses in a
/// buffer. `initialBufferCapacity` is the initial capacity for this buffer. You usually do not need to set
/// this parameter unless you intend to pipeline very deeply and don't want the buffer to resize.
public init(initialBufferCapacity: Int = 4) {
self.promiseBuffer = CircularBuffer(initialCapacity: initialBufferCapacity)
}
public func channelInactive(context: ChannelHandlerContext) {
switch self.state {
case .error:
// We failed any outstanding promises when we entered the error state and will fail any
// new promises in write.
assert(self.promiseBuffer.count == 0)
case .operational:
let promiseBuffer = self.promiseBuffer
self.promiseBuffer.removeAll()
promiseBuffer.forEach { promise in
promise.fail(NIOExtrasErrors.ClosedBeforeReceivingResponse())
}
}
context.fireChannelInactive()
}
public func channelRead(context: ChannelHandlerContext, data: NIOAny) {
guard self.state.isOperational else {
// we're in an error state, ignore further responses
assert(self.promiseBuffer.count == 0)
return
}
let response = self.unwrapInboundIn(data)
let promise = self.promiseBuffer.removeFirst()
promise.succeed(response)
}
public func errorCaught(context: ChannelHandlerContext, error: Error) {
guard self.state.isOperational else {
assert(self.promiseBuffer.count == 0)
return
}
self.state = .error(error)
let promiseBuffer = self.promiseBuffer
self.promiseBuffer.removeAll()
context.close(promise: nil)
promiseBuffer.forEach {
$0.fail(error)
}
}
public func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
let (request, responsePromise) = self.unwrapOutboundIn(data)
switch self.state {
case .error(let error):
assert(self.promiseBuffer.count == 0)
responsePromise.fail(error)
promise?.fail(error)
case .operational:
self.promiseBuffer.append(responsePromise)
context.write(self.wrapOutboundOut(request), promise: promise)
}
}
}
#if swift(>=5.6)
@available(*, unavailable)
extension RequestResponseHandler: Sendable {}
#endif