From 0f4c110bd75f3ad31c9f2f9e2789ff66c85480ad Mon Sep 17 00:00:00 2001 From: George Barnett Date: Fri, 12 Jul 2024 09:02:04 +0100 Subject: [PATCH] Combine the two NIOAsyncChannel channel handlers (#2779) Motivation: The NIOAsyncChannel allocates 12 times on init. 4 of these allocations come from creating two channel handlers and two channel handler contexts. There's no inherent reason that these channel handlers can't be combined to eliminate two allocations (one handler and one context). Modifications: - Combine `NIOAsyncChannelInboundStreamChannelHandler` and `NIOAsyncChannelOutboundWriterHandler` into a single `NIOAsyncChannelHandler`. Most of this was straightforward as only a few handler operations were duplicated across both. - Add a 'NIOAsyncChannelHandlerWriterDelegate' in place of the 'NIOAsyncChannelOutboundWriterHandler.Delegate'. One knock on from this is that the new delegate stores callbacks rather than the concrete type of the handler. This is necessary to prevent the generics from the new channel handler bubbling up to the outbound writer (which would break API and be somewhat odd). Result: Fewer allocations --- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- ...reBenchmarks.NIOAsyncChannel.init.p90.json | 2 +- .../NIOCore/AsyncChannel/AsyncChannel.swift | 37 ++- ...andler.swift => AsyncChannelHandler.swift} | 311 +++++++++++++----- .../AsyncChannelInboundStream.swift | 61 +--- .../AsyncChannelOutboundWriter.swift | 17 +- .../AsyncChannelOutboundWriterHandler.swift | 232 ------------- .../AsyncChannel/AsyncChannelTests.swift | 2 +- 11 files changed, 292 insertions(+), 378 deletions(-) rename Sources/NIOCore/AsyncChannel/{AsyncChannelInboundStreamChannelHandler.swift => AsyncChannelHandler.swift} (56%) delete mode 100644 Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift diff --git a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index 27ef318293..eda10858ab 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -315,16 +315,27 @@ extension Channel { ) throws -> (NIOAsyncChannelInboundStream, NIOAsyncChannelOutboundWriter) { self.eventLoop.assertInEventLoop() - let inboundStream = try NIOAsyncChannelInboundStream.makeWrappingHandler( - channel: self, + let handler = NIOAsyncChannelHandler( + eventLoop: self.eventLoop, + transformation: .syncWrapping { $0 }, + isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled + ) + + let inboundStream = try NIOAsyncChannelInboundStream( + eventLoop: self.eventLoop, + handler: handler, backPressureStrategy: backPressureStrategy, closeOnDeinit: closeOnDeinit ) + let writer = try NIOAsyncChannelOutboundWriter( - channel: self, + eventLoop: self.eventLoop, + handler: handler, isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, closeOnDeinit: closeOnDeinit ) + + try self.pipeline.syncOperations.addHandler(handler) return (inboundStream, writer) } @@ -338,17 +349,27 @@ extension Channel { ) throws -> (NIOAsyncChannelInboundStream, NIOAsyncChannelOutboundWriter) { self.eventLoop.assertInEventLoop() - let inboundStream = try NIOAsyncChannelInboundStream.makeTransformationHandler( - channel: self, + let handler = NIOAsyncChannelHandler( + eventLoop: self.eventLoop, + transformation: .transformation(channelReadTransformation: channelReadTransformation), + isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled + ) + + let inboundStream = try NIOAsyncChannelInboundStream( + eventLoop: self.eventLoop, + handler: handler, backPressureStrategy: backPressureStrategy, - closeOnDeinit: closeOnDeinit, - channelReadTransformation: channelReadTransformation + closeOnDeinit: closeOnDeinit ) + let writer = try NIOAsyncChannelOutboundWriter( - channel: self, + eventLoop: self.eventLoop, + handler: handler, isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, closeOnDeinit: closeOnDeinit ) + + try self.pipeline.syncOperations.addHandler(handler) return (inboundStream, writer) } } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift similarity index 56% rename from Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift rename to Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift index dff04c3f37..af0f659c38 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors +// Copyright (c) 2022-2024 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,11 +12,15 @@ // //===----------------------------------------------------------------------===// +import DequeModule + /// A ``ChannelHandler`` that is used to transform the inbound portion of a NIO -/// ``Channel`` into an asynchronous sequence that supports back-pressure. +/// ``Channel`` into an asynchronous sequence that supports back-pressure. It's also used +/// to write the outbound portion of a NIO ``Channel`` from Swift Concurrency with back-pressure +/// support. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -internal final class NIOAsyncChannelInboundStreamChannelHandler: ChannelDuplexHandler { +internal final class NIOAsyncChannelHandler { @usableFromInline enum _ProducingState { // Not .stopProducing @@ -29,18 +33,12 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler.Source /// The source of the asynchronous sequence. @@ -77,49 +75,103 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler + > + + @usableFromInline + typealias Sink = Writer.Sink + + /// The sink of the ``NIOAsyncWriter``. + @usableFromInline + var sink: Sink? + + /// The writer of the ``NIOAsyncWriter``. + /// + /// The reference is retained until `channelActive` is fired. This avoids situations + /// where `deinit` is called on the unfinished writer because the `Channel` was never returned + /// to the caller (e.g. because a connect failed or or happy-eyeballs created multiple + /// channels). + /// + /// Effectively `channelActive` is used at the point in time at which NIO cedes ownership of + /// the writer to the caller. + @usableFromInline + var writer: Writer? + + @usableFromInline + let isOutboundHalfClosureEnabled: Bool + @inlinable init( eventLoop: EventLoop, - transformation: Transformation + transformation: Transformation, + isOutboundHalfClosureEnabled: Bool ) { self.eventLoop = eventLoop self.transformation = transformation + self.isOutboundHalfClosureEnabled = isOutboundHalfClosureEnabled } +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncChannelHandler: ChannelInboundHandler { + @usableFromInline + typealias InboundIn = InboundIn - /// Creates a new ``NIOAsyncChannelInboundStreamChannelHandler`` which is used when the pipeline got synchronously wrapped. @inlinable - static func makeHandler( - eventLoop: EventLoop - ) -> NIOAsyncChannelInboundStreamChannelHandler where InboundIn == ProducerElement { - return .init( - eventLoop: eventLoop, - transformation: .syncWrapping { $0 } - ) + func handlerAdded(context: ChannelHandlerContext) { + self.context = context } - /// Creates a new ``NIOAsyncChannelInboundStreamChannelHandler`` which has hooks for transformations. @inlinable - static func makeHandlerWithTransformations( - eventLoop: EventLoop, - channelReadTransformation: @Sendable @escaping (InboundIn) -> EventLoopFuture - ) -> NIOAsyncChannelInboundStreamChannelHandler where InboundIn == Channel { - return .init( - eventLoop: eventLoop, - transformation: .transformation( - channelReadTransformation: channelReadTransformation - ) - ) + func handlerRemoved(context: ChannelHandlerContext) { + self._finishSource(context: context) + self.sink?.finish(error: ChannelError._ioOnClosedChannel) + self.context = nil + self.writer = nil } @inlinable - func handlerAdded(context: ChannelHandlerContext) { - self.context = context + func channelActive(context: ChannelHandlerContext) { + // Drop the writer ref, the caller is responsible for it now. + self.writer = nil + context.fireChannelActive() } @inlinable - func handlerRemoved(context: ChannelHandlerContext) { + func channelInactive(context: ChannelHandlerContext) { self._finishSource(context: context) - self.context = nil + self.sink?.finish(error: ChannelError._ioOnClosedChannel) + context.fireChannelInactive() + } + + + @inlinable + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + switch event { + case ChannelEvent.inputClosed: + self._finishSource(context: context) + case ChannelEvent.outputClosed: + self.sink?.finish() + default: + break + } + + context.fireUserInboundEventTriggered(event) + } + + @inlinable + func channelWritabilityChanged(context: ChannelHandlerContext) { + self.sink?.setWritability(to: context.channel.isWritable) + context.fireChannelWritabilityChanged() + } + + @inlinable + func errorCaught(context: ChannelHandlerContext, error: Error) { + self._finishSource(with: error, context: context) + context.fireErrorCaught(error) } @inlinable @@ -153,43 +205,20 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler - ) { - context.eventLoop.preconditionInEventLoop() - - switch result { - case .success(let transformed): - self.buffer.append(transformed) - // We are delivering out of band here since the future can complete at any point - self._deliverReads(context: context) - - case .failure: - // Transformation failed. Nothing to really do here this must be handled in the transformation - // futures themselves. - break - } - } - @inlinable func channelReadComplete(context: ChannelHandlerContext) { self._deliverReads(context: context) context.fireChannelReadComplete() } +} - @inlinable - func channelInactive(context: ChannelHandlerContext) { - self._finishSource(context: context) - context.fireChannelInactive() - } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncChannelHandler: ChannelOutboundHandler { + @usableFromInline + typealias OutboundIn = Any - @inlinable - func errorCaught(context: ChannelHandlerContext, error: Error) { - self._finishSource(with: error, context: context) - context.fireErrorCaught(error) - } + @usableFromInline + typealias OutboundOut = OutboundOut @inlinable func read(context: ChannelHandlerContext) { @@ -202,17 +231,28 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler + ) { + context.eventLoop.preconditionInEventLoop() + + switch result { + case .success(let transformed): + self.buffer.append(transformed) + // We are delivering out of band here since the future can complete at any point + self._deliverReads(context: context) + + case .failure: + // Transformation failed. Nothing to really do here this must be handled in the transformation + // futures themselves. break } - - context.fireUserInboundEventTriggered(event) } @inlinable @@ -258,8 +298,9 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler Void @inlinable - init(handler: NIOAsyncChannelInboundStreamChannelHandler) { + init(handler: NIOAsyncChannelHandler) { self.eventLoop = handler.eventLoop self._didTerminate = handler._didTerminate self._produceMore = handler._produceMore @@ -329,6 +370,126 @@ struct NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate: @unchecked Se } } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +@usableFromInline +struct NIOAsyncChannelHandlerWriterDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable { + @usableFromInline + let eventLoop: EventLoop + + @usableFromInline + let _didYieldContentsOf: (Deque) -> Void + + @usableFromInline + let _didYield: (Element) -> Void + + @usableFromInline + let _didTerminate: ((any Error)?) -> Void + + @inlinable + init(handler: NIOAsyncChannelHandler) { + self.eventLoop = handler.eventLoop + self._didYieldContentsOf = handler._didYield(sequence:) + self._didYield = handler._didYield(element:) + self._didTerminate = handler._didTerminate(error:) + } + + @inlinable + func didYield(contentsOf sequence: Deque) { + if self.eventLoop.inEventLoop { + self._didYieldContentsOf(sequence) + } else { + self.eventLoop.execute { + self._didYieldContentsOf(sequence) + } + } + } + + @inlinable + func didYield(_ element: Element) { + if self.eventLoop.inEventLoop { + self._didYield(element) + } else { + self.eventLoop.execute { + self._didYield(element) + } + } + } + + @inlinable + func didTerminate(error: (any Error)?) { + if self.eventLoop.inEventLoop { + self._didTerminate(error) + } else { + self.eventLoop.execute { + self._didTerminate(error) + } + } + } +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncChannelHandler { + @inlinable + func _didYield(sequence: Deque) { + // This is always called from an async context, so we must loop-hop. + // Because we always loop-hop, we're always at the top of a stack frame. As this + // is the only source of writes for us, and as this channel handler doesn't implement + // func write(), we cannot possibly re-entrantly write. That means we can skip many of the + // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative + // write. + self.eventLoop.preconditionInEventLoop() + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } + + self._doOutboundWrites(context: context, writes: sequence) + } + + @inlinable + func _didYield(element: OutboundOut) { + // This is always called from an async context, so we must loop-hop. + // Because we always loop-hop, we're always at the top of a stack frame. As this + // is the only source of writes for us, and as this channel handler doesn't implement + // func write(), we cannot possibly re-entrantly write. That means we can skip many of the + // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative + // write. + self.eventLoop.preconditionInEventLoop() + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } + + self._doOutboundWrite(context: context, write: element) + } + + @inlinable + func _didTerminate(error: Error?) { + self.eventLoop.preconditionInEventLoop() + + if self.isOutboundHalfClosureEnabled { + self.context?.close(mode: .output, promise: nil) + } + + self.sink = nil + } + + @inlinable + func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque) { + for write in writes { + context.write(self.wrapOutboundOut(write), promise: nil) + } + + context.flush() + } + + @inlinable + func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundOut) { + context.write(self.wrapOutboundOut(write), promise: nil) + context.flush() + } +} + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @available(*, unavailable) -extension NIOAsyncChannelInboundStreamChannelHandler: Sendable {} +extension NIOAsyncChannelHandler: Sendable {} diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift index 0a672dc3ed..7134359a2c 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift @@ -18,7 +18,12 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct NIOAsyncChannelInboundStream: Sendable { @usableFromInline - typealias Producer = NIOThrowingAsyncSequenceProducer + typealias Producer = NIOThrowingAsyncSequenceProducer< + Inbound, + Error, + NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + NIOAsyncChannelHandlerProducerDelegate + > /// A source used for driving a ``NIOAsyncChannelInboundStream`` during tests. public struct TestSource { @@ -77,13 +82,13 @@ public struct NIOAsyncChannelInboundStream: Sendable { } @inlinable - init( - channel: Channel, + init( + eventLoop: any EventLoop, + handler: NIOAsyncChannelHandler, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, - closeOnDeinit: Bool, - handler: NIOAsyncChannelInboundStreamChannelHandler + closeOnDeinit: Bool ) throws { - channel.eventLoop.preconditionInEventLoop() + eventLoop.preconditionInEventLoop() let strategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark if let userProvided = backPressureStrategy { @@ -97,52 +102,12 @@ public struct NIOAsyncChannelInboundStream: Sendable { let sequence = Producer.makeSequence( backPressureStrategy: strategy, finishOnDeinit: closeOnDeinit, - delegate: NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate(handler: handler) + delegate: NIOAsyncChannelHandlerProducerDelegate(handler: handler) ) + handler.source = sequence.source - try channel.pipeline.syncOperations.addHandler(handler) self._backing = .producer(sequence.sequence) } - - /// Creates a new ``NIOAsyncChannelInboundStream`` which is used when the pipeline got synchronously wrapped. - @inlinable - static func makeWrappingHandler( - channel: Channel, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, - closeOnDeinit: Bool - ) throws -> NIOAsyncChannelInboundStream { - let handler = NIOAsyncChannelInboundStreamChannelHandler.makeHandler( - eventLoop: channel.eventLoop - ) - - return try .init( - channel: channel, - backPressureStrategy: backPressureStrategy, - closeOnDeinit: closeOnDeinit, - handler: handler - ) - } - - /// Creates a new ``NIOAsyncChannelInboundStream`` which has hooks for transformations. - @inlinable - static func makeTransformationHandler( - channel: Channel, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, - closeOnDeinit: Bool, - channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture - ) throws -> NIOAsyncChannelInboundStream { - let handler = NIOAsyncChannelInboundStreamChannelHandler.makeHandlerWithTransformations( - eventLoop: channel.eventLoop, - channelReadTransformation: channelReadTransformation - ) - - return try .init( - channel: channel, - backPressureStrategy: backPressureStrategy, - closeOnDeinit: closeOnDeinit, - handler: handler - ) - } } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index 37731b6f37..8ef5f12cf8 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -20,7 +20,10 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct NIOAsyncChannelOutboundWriter: Sendable { @usableFromInline - typealias _Writer = NIOAsyncChannelOutboundWriterHandler.Writer + typealias _Writer = NIOAsyncWriter< + OutboundOut, + NIOAsyncChannelHandlerWriterDelegate + > /// An `AsyncSequence` backing a ``NIOAsyncChannelOutboundWriter`` for testing purposes. public struct TestSink: AsyncSequence { @@ -82,15 +85,13 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { } @inlinable - init( - channel: Channel, + init( + eventLoop: any EventLoop, + handler: NIOAsyncChannelHandler, isOutboundHalfClosureEnabled: Bool, closeOnDeinit: Bool ) throws { - let handler = NIOAsyncChannelOutboundWriterHandler( - eventLoop: channel.eventLoop, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled - ) + eventLoop.preconditionInEventLoop() let writer = _Writer.makeWriter( elementType: OutboundOut.self, isWritable: true, @@ -101,8 +102,6 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { handler.sink = writer.sink handler.writer = writer.writer - try channel.pipeline.syncOperations.addHandler(handler) - self._backing = .writer(writer.writer) } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift deleted file mode 100644 index 0e73fde347..0000000000 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift +++ /dev/null @@ -1,232 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DequeModule - -/// A ``ChannelHandler`` that is used to write the outbound portion of a NIO -/// ``Channel`` from Swift Concurrency with back-pressure support. -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -@usableFromInline -internal final class NIOAsyncChannelOutboundWriterHandler: ChannelDuplexHandler { - @usableFromInline typealias InboundIn = Any - @usableFromInline typealias InboundOut = Any - @usableFromInline typealias OutboundIn = Any - @usableFromInline typealias OutboundOut = OutboundOut - - @usableFromInline - typealias Writer = NIOAsyncWriter< - OutboundOut, - NIOAsyncChannelOutboundWriterHandler.Delegate - > - - @usableFromInline - typealias Sink = Writer.Sink - - /// The sink of the ``NIOAsyncWriter``. - @usableFromInline - var sink: Sink? - - /// The writer of the ``NIOAsyncWriter``. - /// - /// The reference is retained until `channelActive` is fired. This avoids situations - /// where `deinit` is called on the unfinished writer because the `Channel` was never returned - /// to the caller (e.g. because a connect failed or or happy-eyeballs created multiple - /// channels). - /// - /// Effectively `channelActive` is used at the point in time at which NIO cedes ownership of - /// the writer to the caller. - @usableFromInline - var writer: Writer? - - /// The channel handler context. - @usableFromInline - var context: ChannelHandlerContext? - - /// The event loop. - @usableFromInline - let eventLoop: EventLoop - - @usableFromInline - let isOutboundHalfClosureEnabled: Bool - - @inlinable - init( - eventLoop: EventLoop, - isOutboundHalfClosureEnabled: Bool - ) { - self.eventLoop = eventLoop - self.isOutboundHalfClosureEnabled = isOutboundHalfClosureEnabled - } - - @inlinable - func _didYield(sequence: Deque) { - // This is always called from an async context, so we must loop-hop. - // Because we always loop-hop, we're always at the top of a stack frame. As this - // is the only source of writes for us, and as this channel handler doesn't implement - // func write(), we cannot possibly re-entrantly write. That means we can skip many of the - // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative - // write. - self.eventLoop.preconditionInEventLoop() - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - return - } - - self._doOutboundWrites(context: context, writes: sequence) - } - - @inlinable - func _didYield(element: OutboundOut) { - // This is always called from an async context, so we must loop-hop. - // Because we always loop-hop, we're always at the top of a stack frame. As this - // is the only source of writes for us, and as this channel handler doesn't implement - // func write(), we cannot possibly re-entrantly write. That means we can skip many of the - // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative - // write. - self.eventLoop.preconditionInEventLoop() - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - return - } - - self._doOutboundWrite(context: context, write: element) - } - - @inlinable - func _didTerminate(error: Error?) { - self.eventLoop.preconditionInEventLoop() - - if self.isOutboundHalfClosureEnabled { - self.context?.close(mode: .output, promise: nil) - } - - self.sink = nil - } - - @inlinable - func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque) { - for write in writes { - context.write(self.wrapOutboundOut(write), promise: nil) - } - - context.flush() - } - - @inlinable - func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundOut) { - context.write(self.wrapOutboundOut(write), promise: nil) - context.flush() - } - - @inlinable - func handlerAdded(context: ChannelHandlerContext) { - self.context = context - } - - @inlinable - func handlerRemoved(context: ChannelHandlerContext) { - self.context = nil - self.sink?.finish(error: ChannelError._ioOnClosedChannel) - self.writer = nil - } - - @inlinable - func channelActive(context: ChannelHandlerContext) { - // Drop the writer ref, the caller is responsible for it now. - self.writer = nil - context.fireChannelActive() - } - - @inlinable - func channelInactive(context: ChannelHandlerContext) { - self.sink?.finish(error: ChannelError._ioOnClosedChannel) - context.fireChannelInactive() - } - - @inlinable - func channelWritabilityChanged(context: ChannelHandlerContext) { - self.sink?.setWritability(to: context.channel.isWritable) - context.fireChannelWritabilityChanged() - } - - @inlinable - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - switch event { - case ChannelEvent.outputClosed: - self.sink?.finish() - default: - break - } - - context.fireUserInboundEventTriggered(event) - } -} - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension NIOAsyncChannelOutboundWriterHandler { - @usableFromInline - struct Delegate: @unchecked Sendable, NIOAsyncWriterSinkDelegate { - @usableFromInline - typealias Element = OutboundOut - - @usableFromInline - let eventLoop: EventLoop - - @usableFromInline - let handler: NIOAsyncChannelOutboundWriterHandler - - @inlinable - init(handler: NIOAsyncChannelOutboundWriterHandler) { - self.eventLoop = handler.eventLoop - self.handler = handler - } - - @inlinable - func didYield(contentsOf sequence: Deque) { - if self.eventLoop.inEventLoop { - self.handler._didYield(sequence: sequence) - } else { - self.eventLoop.execute { - self.handler._didYield(sequence: sequence) - } - } - } - - @inlinable - func didYield(_ element: OutboundOut) { - if self.eventLoop.inEventLoop { - self.handler._didYield(element: element) - } else { - self.eventLoop.execute { - self.handler._didYield(element: element) - } - } - } - - @inlinable - func didTerminate(error: Error?) { - if self.eventLoop.inEventLoop { - self.handler._didTerminate(error: error) - } else { - self.eventLoop.execute { - self.handler._didTerminate(error: error) - } - } - } - } -} - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -@available(*, unavailable) -extension NIOAsyncChannelOutboundWriterHandler: Sendable {} diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index cbaec06640..4115b9dee8 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -251,7 +251,7 @@ final class AsyncChannelTests: XCTestCase { do { let strongSentinel: Sentinel? = Sentinel() sentinel = strongSentinel! - try await XCTAsyncAssertNotNil(await channel.pipeline.handler(type: NIOAsyncChannelInboundStreamChannelHandler.self).get()) + try await XCTAsyncAssertNotNil(await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).get()) try await channel.writeInbound(strongSentinel!) _ = try await channel.readInbound(as: Sentinel.self) }