Skip to content

Commit

Permalink
Merge pull request #508 from rdegnan/fix-leaks
Browse files Browse the repository at this point in the history
Ensure sendProcessor is disposed
  • Loading branch information
rdegnan authored Sep 17, 2018
2 parents 1a4844f + 73b9fd9 commit 08705ea
Show file tree
Hide file tree
Showing 11 changed files with 281 additions and 325 deletions.
4 changes: 4 additions & 0 deletions rsocket-core/src/main/java/io/rsocket/AbstractRSocket.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ public abstract class AbstractRSocket implements RSocket {

@Override
public Mono<Void> fireAndForget(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Fire and forget not implemented."));
}

@Override
public Mono<Payload> requestResponse(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Request-Response not implemented."));
}

@Override
public Flux<Payload> requestStream(Payload payload) {
payload.release();
return Flux.error(new UnsupportedOperationException("Request-Stream not implemented."));
}

Expand All @@ -51,6 +54,7 @@ public Flux<Payload> requestChannel(Publisher<Payload> payloads) {

@Override
public Mono<Void> metadataPush(Payload payload) {
payload.release();
return Mono.error(new UnsupportedOperationException("Metadata-Push not implemented."));
}

Expand Down
72 changes: 22 additions & 50 deletions rsocket-core/src/main/java/io/rsocket/Frame.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.rsocket.frame.FrameHeaderFlyweight.FLAGS_M;

import io.netty.buffer.*;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
Expand All @@ -33,7 +34,6 @@
import io.rsocket.frame.VersionFlyweight;
import io.rsocket.framing.FrameType;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -43,7 +43,7 @@
*
* <p>This provides encoding, decoding and field accessors.
*/
public class Frame implements Payload, ByteBufHolder {
public class Frame extends AbstractReferenceCounted implements Payload, ByteBufHolder {
private static final Recycler<Frame> RECYCLER =
new Recycler<Frame>() {
protected Frame newObject(Handle<Frame> handle) {
Expand All @@ -58,12 +58,6 @@ private Frame(final Handle<Frame> handle) {
this.handle = handle;
}

/** Clear and recycle this instance. */
private void recycle() {
content = null;
handle.recycle(this);
}

/** Return the content which is held by this {@link Frame}. */
@Override
public ByteBuf content() {
Expand Down Expand Up @@ -105,26 +99,17 @@ public Frame replace(ByteBuf content) {
return from(content);
}

/**
* Returns the reference count of this object. If {@code 0}, it means this object has been
* deallocated.
*/
@Override
public int refCnt() {
return content.refCnt();
}

/** Increases the reference count by {@code 1}. */
@Override
public Frame retain() {
content.retain();
super.retain();
return this;
}

/** Increases the reference count by the specified {@code increment}. */
@Override
public Frame retain(int increment) {
content.retain(increment);
super.retain(increment);
return this;
}

Expand All @@ -150,36 +135,12 @@ public Frame touch(@Nullable Object hint) {
return this;
}

/**
* Decreases the reference count by {@code 1} and deallocates this object if the reference count
* reaches at {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has
* been deallocated
*/
@Override
public boolean release() {
if (content != null && content.release()) {
recycle();
return true;
}
return false;
}

/**
* Decreases the reference count by the specified {@code decrement} and deallocates this object if
* the reference count reaches at {@code 0}.
*
* @return {@code true} if and only if the reference count became {@code 0} and this object has
* been deallocated
*/
/** Called once {@link #refCnt()} is equals 0. */
@Override
public boolean release(int decrement) {
if (content != null && content.release(decrement)) {
recycle();
return true;
}
return false;
protected void deallocate() {
content.release();
content = null;
handle.recycle(this);
}

/**
Expand Down Expand Up @@ -239,6 +200,7 @@ public int flags() {
*/
public static Frame from(final ByteBuf content) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content = content;

return frame;
Expand Down Expand Up @@ -281,6 +243,7 @@ public static Frame from(
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
SetupFrameFlyweight.computeFrameLength(
Expand Down Expand Up @@ -347,6 +310,7 @@ public static Frame from(int streamId, final Throwable throwable, ByteBuf dataBu

final int code = ErrorFrameFlyweight.errorCodeFromException(throwable);
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
ErrorFrameFlyweight.computeFrameLength(dataBuffer.readableBytes()));
Expand Down Expand Up @@ -378,6 +342,7 @@ private Lease() {}

public static Frame from(int ttl, int numberOfRequests, ByteBuf metadata) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
LeaseFrameFlyweight.computeFrameLength(metadata.readableBytes()));
Expand Down Expand Up @@ -411,6 +376,7 @@ public static Frame from(int streamId, int requestN) {
}

final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content = ByteBufAllocator.DEFAULT.buffer(RequestNFrameFlyweight.computeFrameLength());
frame.content.writerIndex(RequestNFrameFlyweight.encode(frame.content, streamId, requestN));
return frame;
Expand Down Expand Up @@ -438,6 +404,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init
final ByteBuf data = payload.sliceData();

final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
RequestFrameFlyweight.computeFrameLength(
Expand All @@ -464,6 +431,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int init

public static Frame from(int streamId, FrameType type, int flags) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(RequestFrameFlyweight.computeFrameLength(type, null, 0));
frame.content.writerIndex(
Expand All @@ -480,6 +448,7 @@ public static Frame from(
int initialRequestN,
int flags) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
RequestFrameFlyweight.computeFrameLength(
Expand Down Expand Up @@ -543,6 +512,7 @@ public static Frame from(int streamId, FrameType type, Payload payload, int flag
public static Frame from(
int streamId, FrameType type, @Nullable ByteBuf metadata, ByteBuf data, int flags) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
FrameHeaderFlyweight.computeFrameHeaderLength(
Expand All @@ -559,6 +529,7 @@ private Cancel() {}

public static Frame from(int streamId) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
FrameHeaderFlyweight.computeFrameHeaderLength(FrameType.CANCEL, null, 0));
Expand All @@ -575,6 +546,7 @@ private Keepalive() {}

public static Frame from(ByteBuf data, boolean respond) {
final Frame frame = RECYCLER.get();
frame.setRefCnt(1);
frame.content =
ByteBufAllocator.DEFAULT.buffer(
KeepaliveFrameFlyweight.computeFrameLength(data.readableBytes()));
Expand Down Expand Up @@ -611,12 +583,12 @@ public boolean equals(Object o) {
return false;
}
final Frame frame = (Frame) o;
return Objects.equals(content, frame.content);
return content.equals(frame.content());
}

@Override
public int hashCode() {
return Objects.hash(content);
return content.hashCode();
}

@Override
Expand Down
18 changes: 7 additions & 11 deletions rsocket-core/src/main/java/io/rsocket/KeepAliveHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
import reactor.core.publisher.MonoProcessor;
import reactor.core.publisher.UnicastProcessor;

abstract class KeepAliveHandler {
abstract class KeepAliveHandler implements Disposable {
private final KeepAlive keepAlive;
private final UnicastProcessor<Frame> sent = UnicastProcessor.create();
private final MonoProcessor<KeepAlive> timeout = MonoProcessor.create();
private final Flux<Long> interval;
private Disposable intervalDisposable;
private volatile long lastReceivedMillis;

Expand All @@ -26,20 +25,17 @@ static KeepAliveHandler ofClient(KeepAlive keepAlive) {

private KeepAliveHandler(KeepAlive keepAlive) {
this.keepAlive = keepAlive;
this.interval = Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()));
}

public void start() {
this.lastReceivedMillis = System.currentTimeMillis();
intervalDisposable = interval.subscribe(v -> onIntervalTick());
this.intervalDisposable =
Flux.interval(Duration.ofMillis(keepAlive.getTickPeriod()))
.subscribe(v -> onIntervalTick());
}

public void stop() {
@Override
public void dispose() {
sent.onComplete();
timeout.onComplete();
if (intervalDisposable != null) {
intervalDisposable.dispose();
}
intervalDisposable.dispose();
}

public void receive(Frame keepAliveFrame) {
Expand Down
Loading

0 comments on commit 08705ea

Please sign in to comment.