From 2568a9f168193bba27cd40e1261645db9f445c9f Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Fri, 2 Feb 2024 18:24:51 +0100 Subject: [PATCH 1/2] Ensure the selection operators maintains upstream demand --- .../java/mutiny/zero/operators/ProcessorBase.java | 4 ++++ .../src/main/java/mutiny/zero/operators/Select.java | 2 ++ .../test/java/mutiny/zero/operators/SelectTest.java | 12 ++++++++++++ 3 files changed, 18 insertions(+) diff --git a/mutiny-zero/src/main/java/mutiny/zero/operators/ProcessorBase.java b/mutiny-zero/src/main/java/mutiny/zero/operators/ProcessorBase.java index 4d046a3..8be4c78 100644 --- a/mutiny-zero/src/main/java/mutiny/zero/operators/ProcessorBase.java +++ b/mutiny-zero/src/main/java/mutiny/zero/operators/ProcessorBase.java @@ -14,6 +14,10 @@ protected boolean cancelled() { return cancelled.get(); } + protected Flow.Subscription upstreamSubscription() { + return upstreamSubscription; + } + protected Flow.Subscriber downstream() { return downstream; } diff --git a/mutiny-zero/src/main/java/mutiny/zero/operators/Select.java b/mutiny-zero/src/main/java/mutiny/zero/operators/Select.java index 97ded05..b43e681 100644 --- a/mutiny-zero/src/main/java/mutiny/zero/operators/Select.java +++ b/mutiny-zero/src/main/java/mutiny/zero/operators/Select.java @@ -42,6 +42,8 @@ public void onNext(T item) { try { if (predicate.test(item)) { downstream().onNext(item); + } else { + upstreamSubscription().request(1L); } } catch (Throwable failure) { cancel(); diff --git a/mutiny-zero/src/test/java/mutiny/zero/operators/SelectTest.java b/mutiny-zero/src/test/java/mutiny/zero/operators/SelectTest.java index 2cc7e41..f0bd6a6 100644 --- a/mutiny-zero/src/test/java/mutiny/zero/operators/SelectTest.java +++ b/mutiny-zero/src/test/java/mutiny/zero/operators/SelectTest.java @@ -25,6 +25,18 @@ void filterElements() { sub.assertCompleted().assertItems(2, 4); } + @Test + @DisplayName("Filter elements") + void maintainDemand() { + Flow.Publisher source = ZeroPublisher.fromItems(1, 2, 3, 4); + Select operator = new Select<>(source, n -> n % 2 == 0); + + AssertSubscriber sub = AssertSubscriber.create(3L); + operator.subscribe(sub); + + sub.assertCompleted().assertItems(2, 4); + } + @Test @DisplayName("Reject a null source") void rejectNullSource() { From f34994508a5545d7f9503e8c734293b0ef6aa20a Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Fri, 2 Feb 2024 18:36:19 +0100 Subject: [PATCH 2/2] Fix CompletionStagePublisher cancellation --- .../java/mutiny/zero/internal/CompletionStagePublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java b/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java index ea7d7c6..ede1a3d 100644 --- a/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java +++ b/mutiny-zero/src/main/java/mutiny/zero/internal/CompletionStagePublisher.java @@ -86,7 +86,7 @@ public void request(long n) { @Override public void cancel() { - cancelled.set(false); + cancelled.set(true); completableFuture.toCompletableFuture().cancel(false); } }