From c9bc996173f1e2b3ddc5372a3d774209b30773a7 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 5 Nov 2024 10:56:42 +0800 Subject: [PATCH] fix(compaction): prevent double release on compaction shutdown Signed-off-by: Shichao Nie --- .../main/java/com/automq/stream/s3/StreamDataBlock.java | 7 +++++++ .../automq/stream/s3/compact/operator/DataBlockWriter.java | 3 +-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java index 00b8472fe0..d26374809a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java @@ -81,6 +81,13 @@ public CompletableFuture getDataCf() { return this.dataCf; } + public ByteBuf getAndReleaseData() { + if (refCount.getAndDecrement() == 0) { + throw new IllegalStateException("Data has already been released"); + } + return this.dataCf.join(); + } + public void releaseRef() { refCount.decrementAndGet(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 45c3d6f256..f3321dfbeb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -128,8 +128,7 @@ public CompletableFuture close() { private CompositeByteBuf groupWaitingBlocks() { CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); for (StreamDataBlock block : waitingUploadBlocks) { - buf.addComponent(true, block.getDataCf().join()); - block.releaseRef(); + buf.addComponent(true, block.getAndReleaseData()); completedBlocks.add(block); nextDataBlockPosition += block.getBlockSize(); }