Skip to content

Commit

Permalink
Fix connector archetype test
Browse files Browse the repository at this point in the history
  • Loading branch information
ozangunalp committed Nov 13, 2023
1 parent 386fc06 commit e8dae05
Showing 1 changed file with 3 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package ${package};

import java.util.Objects;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -38,8 +39,9 @@ public class ${connectorPrefix}IncomingChannel {
this.failureHandler = ${connectorPrefix}FailureHandler.create(this.client);
this.tracingEnabled = cfg.getTracingEnabled();
Multi<? extends Message<?>> receiveMulti = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().completionStage(this.client.poll()))
.uni(() -> Uni.createFrom().completionStage(this.client == null ? null : this.client.poll()))
.until(__ -> closed.get())
.filter(Objects::nonNull)
.emitOn(context::runOnContext)
.map(consumed -> new ${connectorPrefix}Message<>(consumed, ackHandler, failureHandler));

Expand Down

0 comments on commit e8dae05

Please sign in to comment.