diff --git a/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/__connectorPrefix__IncomingChannel.java b/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/__connectorPrefix__IncomingChannel.java index 81ca2f10c9..fe15898095 100644 --- a/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/__connectorPrefix__IncomingChannel.java +++ b/smallrye-reactive-messaging-connector-archetype/src/main/resources/archetype-resources/src/main/java/__connectorPrefix__IncomingChannel.java @@ -1,5 +1,6 @@ package ${package}; +import java.util.Objects; import java.util.concurrent.Flow; import java.util.concurrent.atomic.AtomicBoolean; @@ -38,8 +39,9 @@ public class ${connectorPrefix}IncomingChannel { this.failureHandler = ${connectorPrefix}FailureHandler.create(this.client); this.tracingEnabled = cfg.getTracingEnabled(); Multi> receiveMulti = Multi.createBy().repeating() - .uni(() -> Uni.createFrom().completionStage(this.client.poll())) + .uni(() -> Uni.createFrom().completionStage(this.client == null ? null : this.client.poll())) .until(__ -> closed.get()) + .filter(Objects::nonNull) .emitOn(context::runOnContext) .map(consumed -> new ${connectorPrefix}Message<>(consumed, ackHandler, failureHandler));