From 04b53f4765ecfaca156d4e2cb5536bd9734b93f8 Mon Sep 17 00:00:00 2001 From: Shichao Nie Date: Tue, 5 Nov 2024 11:09:20 +0800 Subject: [PATCH] fix(compaction): prevent double release on compaction shutdown (#2115) Signed-off-by: Shichao Nie --- .../main/java/com/automq/stream/s3/StreamDataBlock.java | 7 +++++++ .../automq/stream/s3/compact/operator/DataBlockWriter.java | 3 +-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java index 00b8472fe0..d26374809a 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java +++ b/s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java @@ -81,6 +81,13 @@ public CompletableFuture getDataCf() { return this.dataCf; } + public ByteBuf getAndReleaseData() { + if (refCount.getAndDecrement() == 0) { + throw new IllegalStateException("Data has already been released"); + } + return this.dataCf.join(); + } + public void releaseRef() { refCount.decrementAndGet(); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java index 45c3d6f256..f3321dfbeb 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/operator/DataBlockWriter.java @@ -128,8 +128,7 @@ public CompletableFuture close() { private CompositeByteBuf groupWaitingBlocks() { CompositeByteBuf buf = ByteBufAlloc.compositeByteBuffer(); for (StreamDataBlock block : waitingUploadBlocks) { - buf.addComponent(true, block.getDataCf().join()); - block.releaseRef(); + buf.addComponent(true, block.getAndReleaseData()); completedBlocks.add(block); nextDataBlockPosition += block.getBlockSize(); }