diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java index 00b8472fe0..d26374809a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java @@ -81,6 +81,13 @@ public CompletableFuture getDataCf() { return this.dataCf; } + public ByteBuf getAndReleaseData() { + if (refCount.getAndDecrement() == 0) { + throw new IllegalStateException("Data has already been released"); + } + return this.dataCf.join(); + } + public void releaseRef() { refCount.decrementAndGet(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 45c3d6f256..f3321dfbeb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -128,8 +128,7 @@ public CompletableFuture close() { private CompositeByteBuf groupWaitingBlocks() { CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); for (StreamDataBlock block : waitingUploadBlocks) { - buf.addComponent(true, block.getDataCf().join()); - block.releaseRef(); + buf.addComponent(true, block.getAndReleaseData()); completedBlocks.add(block); nextDataBlockPosition += block.getBlockSize(); }