Skip to content

Commit

Permalink
Ensure sendProcessor is disposed
Browse files Browse the repository at this point in the history
  • Loading branch information
rdegnan committed Sep 17, 2018
1 parent 03e0e4b commit 73b9fd9
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 258 deletions.
4 changes: 4 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
}

Expand All @@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

@Override
public Mono<Void> metadataPush(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
}

Expand Down
4 changes: 1 addition & 3 deletions rsocket-core/src/main/java/io/rsocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public Frame touch(@Nullable Object hint) {
return this;
}

/**
* Called once {@link #refCnt()} is equals 0.
*/
/** Called once {@link #refCnt()} is equals 0. */
@Override
protected void deallocate() {
content.release();
Expand Down
18 changes: 7 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

abstract class KeepAliveHandler {
abstract class KeepAliveHandler implements Disposable {
private final KeepAlive keepAlive;
private final UnicastProcessor<Frame> sent = UnicastProcessor.create();
private final MonoProcessor<KeepAlive> timeout = MonoProcessor.create();
private final Flux<Long> interval;
private Disposable intervalDisposable;
private volatile long lastReceivedMillis;

Expand All @@ -26,20 +25,17 @@ static KeepAliveHandler ofClient(KeepAlive keepAlive) {

private KeepAliveHandler(KeepAlive keepAlive) {
this.keepAlive = keepAlive;
this.interval = Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()));
}

public void start() {
this.lastReceivedMillis = System.currentTimeMillis();
intervalDisposable = interval.subscribe(v -> onIntervalTick());
this.intervalDisposable =
Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()))
.subscribe(v -> onIntervalTick());
}

public void stop() {
@Override
public void dispose() {
sent.onComplete();
timeout.onComplete();
if (intervalDisposable != null) {
intervalDisposable.dispose();
}
intervalDisposable.dispose();
}

public void receive(Frame keepAliveFrame) {
Expand Down
Loading

0 comments on commit 73b9fd9

Please sign in to comment.