Skip to content

Commit

Permalink
Merge pull request #166 from smallrye/fix/select-maintain-demand
Browse files Browse the repository at this point in the history
Ensure the selection operators maintains upstream demand
  • Loading branch information
jponge authored Feb 2, 2024
2 parents 8d7bb6a + f349945 commit 8933003
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public void request(long n) {

@Override
public void cancel() {
cancelled.set(false);
cancelled.set(true);
completableFuture.toCompletableFuture().cancel(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ protected boolean cancelled() {
return cancelled.get();
}

protected Flow.Subscription upstreamSubscription() {
return upstreamSubscription;
}

protected Flow.Subscriber<? super O> downstream() {
return downstream;
}
Expand Down
2 changes: 2 additions & 0 deletions mutiny-zero/src/main/java/mutiny/zero/operators/Select.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ public void onNext(T item) {
try {
if (predicate.test(item)) {
downstream().onNext(item);
} else {
upstreamSubscription().request(1L);
}
} catch (Throwable failure) {
cancel();
Expand Down
12 changes: 12 additions & 0 deletions mutiny-zero/src/test/java/mutiny/zero/operators/SelectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,18 @@ void filterElements() {
sub.assertCompleted().assertItems(2, 4);
}

@Test
@DisplayName("Filter elements")
void maintainDemand() {
Flow.Publisher<Integer> source = ZeroPublisher.fromItems(1, 2, 3, 4);
Select<Integer> operator = new Select<>(source, n -> n % 2 == 0);

AssertSubscriber<Object> sub = AssertSubscriber.create(3L);
operator.subscribe(sub);

sub.assertCompleted().assertItems(2, 4);
}

@Test
@DisplayName("Reject a null source")
void rejectNullSource() {
Expand Down

0 comments on commit 8933003

Please sign in to comment.