diff --git a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.10/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.8/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/5.9/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-main/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json b/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json index 4a3c90c5f3..2554cd2c39 100644 --- a/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json +++ b/Benchmarks/Thresholds/nightly-next/NIOCoreBenchmarks.NIOAsyncChannel.init.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 12 + "mallocCountTotal" : 10 } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift index 27ef318293..eda10858ab 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannel.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannel.swift @@ -315,16 +315,27 @@ extension Channel { ) throws -> (NIOAsyncChannelInboundStream, NIOAsyncChannelOutboundWriter) { self.eventLoop.assertInEventLoop() - let inboundStream = try NIOAsyncChannelInboundStream.makeWrappingHandler( - channel: self, + let handler = NIOAsyncChannelHandler( + eventLoop: self.eventLoop, + transformation: .syncWrapping { $0 }, + isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled + ) + + let inboundStream = try NIOAsyncChannelInboundStream( + eventLoop: self.eventLoop, + handler: handler, backPressureStrategy: backPressureStrategy, closeOnDeinit: closeOnDeinit ) + let writer = try NIOAsyncChannelOutboundWriter( - channel: self, + eventLoop: self.eventLoop, + handler: handler, isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, closeOnDeinit: closeOnDeinit ) + + try self.pipeline.syncOperations.addHandler(handler) return (inboundStream, writer) } @@ -338,17 +349,27 @@ extension Channel { ) throws -> (NIOAsyncChannelInboundStream, NIOAsyncChannelOutboundWriter) { self.eventLoop.assertInEventLoop() - let inboundStream = try NIOAsyncChannelInboundStream.makeTransformationHandler( - channel: self, + let handler = NIOAsyncChannelHandler( + eventLoop: self.eventLoop, + transformation: .transformation(channelReadTransformation: channelReadTransformation), + isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled + ) + + let inboundStream = try NIOAsyncChannelInboundStream( + eventLoop: self.eventLoop, + handler: handler, backPressureStrategy: backPressureStrategy, - closeOnDeinit: closeOnDeinit, - channelReadTransformation: channelReadTransformation + closeOnDeinit: closeOnDeinit ) + let writer = try NIOAsyncChannelOutboundWriter( - channel: self, + eventLoop: self.eventLoop, + handler: handler, isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled, closeOnDeinit: closeOnDeinit ) + + try self.pipeline.syncOperations.addHandler(handler) return (inboundStream, writer) } } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift similarity index 56% rename from Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift rename to Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift index dff04c3f37..af0f659c38 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStreamChannelHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift @@ -2,7 +2,7 @@ // // This source file is part of the SwiftNIO open source project // -// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors +// Copyright (c) 2022-2024 Apple Inc. and the SwiftNIO project authors // Licensed under Apache License v2.0 // // See LICENSE.txt for license information @@ -12,11 +12,15 @@ // //===----------------------------------------------------------------------===// +import DequeModule + /// A ``ChannelHandler`` that is used to transform the inbound portion of a NIO -/// ``Channel`` into an asynchronous sequence that supports back-pressure. +/// ``Channel`` into an asynchronous sequence that supports back-pressure. It's also used +/// to write the outbound portion of a NIO ``Channel`` from Swift Concurrency with back-pressure +/// support. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @usableFromInline -internal final class NIOAsyncChannelInboundStreamChannelHandler: ChannelDuplexHandler { +internal final class NIOAsyncChannelHandler { @usableFromInline enum _ProducingState { // Not .stopProducing @@ -29,18 +33,12 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler.Source /// The source of the asynchronous sequence. @@ -77,49 +75,103 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler + > + + @usableFromInline + typealias Sink = Writer.Sink + + /// The sink of the ``NIOAsyncWriter``. + @usableFromInline + var sink: Sink? + + /// The writer of the ``NIOAsyncWriter``. + /// + /// The reference is retained until `channelActive` is fired. This avoids situations + /// where `deinit` is called on the unfinished writer because the `Channel` was never returned + /// to the caller (e.g. because a connect failed or or happy-eyeballs created multiple + /// channels). + /// + /// Effectively `channelActive` is used at the point in time at which NIO cedes ownership of + /// the writer to the caller. + @usableFromInline + var writer: Writer? + + @usableFromInline + let isOutboundHalfClosureEnabled: Bool + @inlinable init( eventLoop: EventLoop, - transformation: Transformation + transformation: Transformation, + isOutboundHalfClosureEnabled: Bool ) { self.eventLoop = eventLoop self.transformation = transformation + self.isOutboundHalfClosureEnabled = isOutboundHalfClosureEnabled } +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncChannelHandler: ChannelInboundHandler { + @usableFromInline + typealias InboundIn = InboundIn - /// Creates a new ``NIOAsyncChannelInboundStreamChannelHandler`` which is used when the pipeline got synchronously wrapped. @inlinable - static func makeHandler( - eventLoop: EventLoop - ) -> NIOAsyncChannelInboundStreamChannelHandler where InboundIn == ProducerElement { - return .init( - eventLoop: eventLoop, - transformation: .syncWrapping { $0 } - ) + func handlerAdded(context: ChannelHandlerContext) { + self.context = context } - /// Creates a new ``NIOAsyncChannelInboundStreamChannelHandler`` which has hooks for transformations. @inlinable - static func makeHandlerWithTransformations( - eventLoop: EventLoop, - channelReadTransformation: @Sendable @escaping (InboundIn) -> EventLoopFuture - ) -> NIOAsyncChannelInboundStreamChannelHandler where InboundIn == Channel { - return .init( - eventLoop: eventLoop, - transformation: .transformation( - channelReadTransformation: channelReadTransformation - ) - ) + func handlerRemoved(context: ChannelHandlerContext) { + self._finishSource(context: context) + self.sink?.finish(error: ChannelError._ioOnClosedChannel) + self.context = nil + self.writer = nil } @inlinable - func handlerAdded(context: ChannelHandlerContext) { - self.context = context + func channelActive(context: ChannelHandlerContext) { + // Drop the writer ref, the caller is responsible for it now. + self.writer = nil + context.fireChannelActive() } @inlinable - func handlerRemoved(context: ChannelHandlerContext) { + func channelInactive(context: ChannelHandlerContext) { self._finishSource(context: context) - self.context = nil + self.sink?.finish(error: ChannelError._ioOnClosedChannel) + context.fireChannelInactive() + } + + + @inlinable + func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { + switch event { + case ChannelEvent.inputClosed: + self._finishSource(context: context) + case ChannelEvent.outputClosed: + self.sink?.finish() + default: + break + } + + context.fireUserInboundEventTriggered(event) + } + + @inlinable + func channelWritabilityChanged(context: ChannelHandlerContext) { + self.sink?.setWritability(to: context.channel.isWritable) + context.fireChannelWritabilityChanged() + } + + @inlinable + func errorCaught(context: ChannelHandlerContext, error: Error) { + self._finishSource(with: error, context: context) + context.fireErrorCaught(error) } @inlinable @@ -153,43 +205,20 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler - ) { - context.eventLoop.preconditionInEventLoop() - - switch result { - case .success(let transformed): - self.buffer.append(transformed) - // We are delivering out of band here since the future can complete at any point - self._deliverReads(context: context) - - case .failure: - // Transformation failed. Nothing to really do here this must be handled in the transformation - // futures themselves. - break - } - } - @inlinable func channelReadComplete(context: ChannelHandlerContext) { self._deliverReads(context: context) context.fireChannelReadComplete() } +} - @inlinable - func channelInactive(context: ChannelHandlerContext) { - self._finishSource(context: context) - context.fireChannelInactive() - } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncChannelHandler: ChannelOutboundHandler { + @usableFromInline + typealias OutboundIn = Any - @inlinable - func errorCaught(context: ChannelHandlerContext, error: Error) { - self._finishSource(with: error, context: context) - context.fireErrorCaught(error) - } + @usableFromInline + typealias OutboundOut = OutboundOut @inlinable func read(context: ChannelHandlerContext) { @@ -202,17 +231,28 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler + ) { + context.eventLoop.preconditionInEventLoop() + + switch result { + case .success(let transformed): + self.buffer.append(transformed) + // We are delivering out of band here since the future can complete at any point + self._deliverReads(context: context) + + case .failure: + // Transformation failed. Nothing to really do here this must be handled in the transformation + // futures themselves. break } - - context.fireUserInboundEventTriggered(event) } @inlinable @@ -258,8 +298,9 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler Void @inlinable - init(handler: NIOAsyncChannelInboundStreamChannelHandler) { + init(handler: NIOAsyncChannelHandler) { self.eventLoop = handler.eventLoop self._didTerminate = handler._didTerminate self._produceMore = handler._produceMore @@ -329,6 +370,126 @@ struct NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate: @unchecked Se } } +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +@usableFromInline +struct NIOAsyncChannelHandlerWriterDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable { + @usableFromInline + let eventLoop: EventLoop + + @usableFromInline + let _didYieldContentsOf: (Deque) -> Void + + @usableFromInline + let _didYield: (Element) -> Void + + @usableFromInline + let _didTerminate: ((any Error)?) -> Void + + @inlinable + init(handler: NIOAsyncChannelHandler) { + self.eventLoop = handler.eventLoop + self._didYieldContentsOf = handler._didYield(sequence:) + self._didYield = handler._didYield(element:) + self._didTerminate = handler._didTerminate(error:) + } + + @inlinable + func didYield(contentsOf sequence: Deque) { + if self.eventLoop.inEventLoop { + self._didYieldContentsOf(sequence) + } else { + self.eventLoop.execute { + self._didYieldContentsOf(sequence) + } + } + } + + @inlinable + func didYield(_ element: Element) { + if self.eventLoop.inEventLoop { + self._didYield(element) + } else { + self.eventLoop.execute { + self._didYield(element) + } + } + } + + @inlinable + func didTerminate(error: (any Error)?) { + if self.eventLoop.inEventLoop { + self._didTerminate(error) + } else { + self.eventLoop.execute { + self._didTerminate(error) + } + } + } +} + +@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) +extension NIOAsyncChannelHandler { + @inlinable + func _didYield(sequence: Deque) { + // This is always called from an async context, so we must loop-hop. + // Because we always loop-hop, we're always at the top of a stack frame. As this + // is the only source of writes for us, and as this channel handler doesn't implement + // func write(), we cannot possibly re-entrantly write. That means we can skip many of the + // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative + // write. + self.eventLoop.preconditionInEventLoop() + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } + + self._doOutboundWrites(context: context, writes: sequence) + } + + @inlinable + func _didYield(element: OutboundOut) { + // This is always called from an async context, so we must loop-hop. + // Because we always loop-hop, we're always at the top of a stack frame. As this + // is the only source of writes for us, and as this channel handler doesn't implement + // func write(), we cannot possibly re-entrantly write. That means we can skip many of the + // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative + // write. + self.eventLoop.preconditionInEventLoop() + guard let context = self.context else { + // Already removed from the channel by now, we can stop. + return + } + + self._doOutboundWrite(context: context, write: element) + } + + @inlinable + func _didTerminate(error: Error?) { + self.eventLoop.preconditionInEventLoop() + + if self.isOutboundHalfClosureEnabled { + self.context?.close(mode: .output, promise: nil) + } + + self.sink = nil + } + + @inlinable + func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque) { + for write in writes { + context.write(self.wrapOutboundOut(write), promise: nil) + } + + context.flush() + } + + @inlinable + func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundOut) { + context.write(self.wrapOutboundOut(write), promise: nil) + context.flush() + } +} + @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) @available(*, unavailable) -extension NIOAsyncChannelInboundStreamChannelHandler: Sendable {} +extension NIOAsyncChannelHandler: Sendable {} diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift index 0a672dc3ed..7134359a2c 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift @@ -18,7 +18,12 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct NIOAsyncChannelInboundStream: Sendable { @usableFromInline - typealias Producer = NIOThrowingAsyncSequenceProducer + typealias Producer = NIOThrowingAsyncSequenceProducer< + Inbound, + Error, + NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, + NIOAsyncChannelHandlerProducerDelegate + > /// A source used for driving a ``NIOAsyncChannelInboundStream`` during tests. public struct TestSource { @@ -77,13 +82,13 @@ public struct NIOAsyncChannelInboundStream: Sendable { } @inlinable - init( - channel: Channel, + init( + eventLoop: any EventLoop, + handler: NIOAsyncChannelHandler, backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, - closeOnDeinit: Bool, - handler: NIOAsyncChannelInboundStreamChannelHandler + closeOnDeinit: Bool ) throws { - channel.eventLoop.preconditionInEventLoop() + eventLoop.preconditionInEventLoop() let strategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark if let userProvided = backPressureStrategy { @@ -97,52 +102,12 @@ public struct NIOAsyncChannelInboundStream: Sendable { let sequence = Producer.makeSequence( backPressureStrategy: strategy, finishOnDeinit: closeOnDeinit, - delegate: NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate(handler: handler) + delegate: NIOAsyncChannelHandlerProducerDelegate(handler: handler) ) + handler.source = sequence.source - try channel.pipeline.syncOperations.addHandler(handler) self._backing = .producer(sequence.sequence) } - - /// Creates a new ``NIOAsyncChannelInboundStream`` which is used when the pipeline got synchronously wrapped. - @inlinable - static func makeWrappingHandler( - channel: Channel, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, - closeOnDeinit: Bool - ) throws -> NIOAsyncChannelInboundStream { - let handler = NIOAsyncChannelInboundStreamChannelHandler.makeHandler( - eventLoop: channel.eventLoop - ) - - return try .init( - channel: channel, - backPressureStrategy: backPressureStrategy, - closeOnDeinit: closeOnDeinit, - handler: handler - ) - } - - /// Creates a new ``NIOAsyncChannelInboundStream`` which has hooks for transformations. - @inlinable - static func makeTransformationHandler( - channel: Channel, - backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?, - closeOnDeinit: Bool, - channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture - ) throws -> NIOAsyncChannelInboundStream { - let handler = NIOAsyncChannelInboundStreamChannelHandler.makeHandlerWithTransformations( - eventLoop: channel.eventLoop, - channelReadTransformation: channelReadTransformation - ) - - return try .init( - channel: channel, - backPressureStrategy: backPressureStrategy, - closeOnDeinit: closeOnDeinit, - handler: handler - ) - } } @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift index 37731b6f37..8ef5f12cf8 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriter.swift @@ -20,7 +20,10 @@ @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public struct NIOAsyncChannelOutboundWriter: Sendable { @usableFromInline - typealias _Writer = NIOAsyncChannelOutboundWriterHandler.Writer + typealias _Writer = NIOAsyncWriter< + OutboundOut, + NIOAsyncChannelHandlerWriterDelegate + > /// An `AsyncSequence` backing a ``NIOAsyncChannelOutboundWriter`` for testing purposes. public struct TestSink: AsyncSequence { @@ -82,15 +85,13 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { } @inlinable - init( - channel: Channel, + init( + eventLoop: any EventLoop, + handler: NIOAsyncChannelHandler, isOutboundHalfClosureEnabled: Bool, closeOnDeinit: Bool ) throws { - let handler = NIOAsyncChannelOutboundWriterHandler( - eventLoop: channel.eventLoop, - isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled - ) + eventLoop.preconditionInEventLoop() let writer = _Writer.makeWriter( elementType: OutboundOut.self, isWritable: true, @@ -101,8 +102,6 @@ public struct NIOAsyncChannelOutboundWriter: Sendable { handler.sink = writer.sink handler.writer = writer.writer - try channel.pipeline.syncOperations.addHandler(handler) - self._backing = .writer(writer.writer) } diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift deleted file mode 100644 index 0e73fde347..0000000000 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift +++ /dev/null @@ -1,232 +0,0 @@ -//===----------------------------------------------------------------------===// -// -// This source file is part of the SwiftNIO open source project -// -// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors -// Licensed under Apache License v2.0 -// -// See LICENSE.txt for license information -// See CONTRIBUTORS.txt for the list of SwiftNIO project authors -// -// SPDX-License-Identifier: Apache-2.0 -// -//===----------------------------------------------------------------------===// - -import DequeModule - -/// A ``ChannelHandler`` that is used to write the outbound portion of a NIO -/// ``Channel`` from Swift Concurrency with back-pressure support. -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -@usableFromInline -internal final class NIOAsyncChannelOutboundWriterHandler: ChannelDuplexHandler { - @usableFromInline typealias InboundIn = Any - @usableFromInline typealias InboundOut = Any - @usableFromInline typealias OutboundIn = Any - @usableFromInline typealias OutboundOut = OutboundOut - - @usableFromInline - typealias Writer = NIOAsyncWriter< - OutboundOut, - NIOAsyncChannelOutboundWriterHandler.Delegate - > - - @usableFromInline - typealias Sink = Writer.Sink - - /// The sink of the ``NIOAsyncWriter``. - @usableFromInline - var sink: Sink? - - /// The writer of the ``NIOAsyncWriter``. - /// - /// The reference is retained until `channelActive` is fired. This avoids situations - /// where `deinit` is called on the unfinished writer because the `Channel` was never returned - /// to the caller (e.g. because a connect failed or or happy-eyeballs created multiple - /// channels). - /// - /// Effectively `channelActive` is used at the point in time at which NIO cedes ownership of - /// the writer to the caller. - @usableFromInline - var writer: Writer? - - /// The channel handler context. - @usableFromInline - var context: ChannelHandlerContext? - - /// The event loop. - @usableFromInline - let eventLoop: EventLoop - - @usableFromInline - let isOutboundHalfClosureEnabled: Bool - - @inlinable - init( - eventLoop: EventLoop, - isOutboundHalfClosureEnabled: Bool - ) { - self.eventLoop = eventLoop - self.isOutboundHalfClosureEnabled = isOutboundHalfClosureEnabled - } - - @inlinable - func _didYield(sequence: Deque) { - // This is always called from an async context, so we must loop-hop. - // Because we always loop-hop, we're always at the top of a stack frame. As this - // is the only source of writes for us, and as this channel handler doesn't implement - // func write(), we cannot possibly re-entrantly write. That means we can skip many of the - // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative - // write. - self.eventLoop.preconditionInEventLoop() - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - return - } - - self._doOutboundWrites(context: context, writes: sequence) - } - - @inlinable - func _didYield(element: OutboundOut) { - // This is always called from an async context, so we must loop-hop. - // Because we always loop-hop, we're always at the top of a stack frame. As this - // is the only source of writes for us, and as this channel handler doesn't implement - // func write(), we cannot possibly re-entrantly write. That means we can skip many of the - // awkward re-entrancy protections NIO usually requires, and can safely just do an iterative - // write. - self.eventLoop.preconditionInEventLoop() - guard let context = self.context else { - // Already removed from the channel by now, we can stop. - return - } - - self._doOutboundWrite(context: context, write: element) - } - - @inlinable - func _didTerminate(error: Error?) { - self.eventLoop.preconditionInEventLoop() - - if self.isOutboundHalfClosureEnabled { - self.context?.close(mode: .output, promise: nil) - } - - self.sink = nil - } - - @inlinable - func _doOutboundWrites(context: ChannelHandlerContext, writes: Deque) { - for write in writes { - context.write(self.wrapOutboundOut(write), promise: nil) - } - - context.flush() - } - - @inlinable - func _doOutboundWrite(context: ChannelHandlerContext, write: OutboundOut) { - context.write(self.wrapOutboundOut(write), promise: nil) - context.flush() - } - - @inlinable - func handlerAdded(context: ChannelHandlerContext) { - self.context = context - } - - @inlinable - func handlerRemoved(context: ChannelHandlerContext) { - self.context = nil - self.sink?.finish(error: ChannelError._ioOnClosedChannel) - self.writer = nil - } - - @inlinable - func channelActive(context: ChannelHandlerContext) { - // Drop the writer ref, the caller is responsible for it now. - self.writer = nil - context.fireChannelActive() - } - - @inlinable - func channelInactive(context: ChannelHandlerContext) { - self.sink?.finish(error: ChannelError._ioOnClosedChannel) - context.fireChannelInactive() - } - - @inlinable - func channelWritabilityChanged(context: ChannelHandlerContext) { - self.sink?.setWritability(to: context.channel.isWritable) - context.fireChannelWritabilityChanged() - } - - @inlinable - func userInboundEventTriggered(context: ChannelHandlerContext, event: Any) { - switch event { - case ChannelEvent.outputClosed: - self.sink?.finish() - default: - break - } - - context.fireUserInboundEventTriggered(event) - } -} - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -extension NIOAsyncChannelOutboundWriterHandler { - @usableFromInline - struct Delegate: @unchecked Sendable, NIOAsyncWriterSinkDelegate { - @usableFromInline - typealias Element = OutboundOut - - @usableFromInline - let eventLoop: EventLoop - - @usableFromInline - let handler: NIOAsyncChannelOutboundWriterHandler - - @inlinable - init(handler: NIOAsyncChannelOutboundWriterHandler) { - self.eventLoop = handler.eventLoop - self.handler = handler - } - - @inlinable - func didYield(contentsOf sequence: Deque) { - if self.eventLoop.inEventLoop { - self.handler._didYield(sequence: sequence) - } else { - self.eventLoop.execute { - self.handler._didYield(sequence: sequence) - } - } - } - - @inlinable - func didYield(_ element: OutboundOut) { - if self.eventLoop.inEventLoop { - self.handler._didYield(element: element) - } else { - self.eventLoop.execute { - self.handler._didYield(element: element) - } - } - } - - @inlinable - func didTerminate(error: Error?) { - if self.eventLoop.inEventLoop { - self.handler._didTerminate(error: error) - } else { - self.eventLoop.execute { - self.handler._didTerminate(error: error) - } - } - } - } -} - -@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) -@available(*, unavailable) -extension NIOAsyncChannelOutboundWriterHandler: Sendable {} diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index cbaec06640..4115b9dee8 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -251,7 +251,7 @@ final class AsyncChannelTests: XCTestCase { do { let strongSentinel: Sentinel? = Sentinel() sentinel = strongSentinel! - try await XCTAsyncAssertNotNil(await channel.pipeline.handler(type: NIOAsyncChannelInboundStreamChannelHandler.self).get()) + try await XCTAsyncAssertNotNil(await channel.pipeline.handler(type: NIOAsyncChannelHandler.self).get()) try await channel.writeInbound(strongSentinel!) _ = try await channel.readInbound(as: Sentinel.self) }