Skip to content

Commit

Permalink
[client] Pre-allocate memories for ArrowLogWriteBatch to fix the Buff…
Browse files Browse the repository at this point in the history
…erExhaustedException (#290)

This closes #267
  • Loading branch information
swuferhong authored and wuchong committed Jan 14, 2025
1 parent c25c3b0 commit 71f97f1
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

import com.alibaba.fluss.annotation.Internal;
import com.alibaba.fluss.exception.FlussRuntimeException;
import com.alibaba.fluss.memory.ManagedPagedOutputView;
import com.alibaba.fluss.memory.AbstractPagedOutputView;
import com.alibaba.fluss.memory.MemorySegment;
import com.alibaba.fluss.memory.MemorySegmentPool;
import com.alibaba.fluss.memory.PreAllocatedManagedPagedOutputView;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.TableBucket;
import com.alibaba.fluss.record.MemoryLogRecordsArrowBuilder;
Expand Down Expand Up @@ -48,19 +49,23 @@
@Internal
public class ArrowLogWriteBatch extends WriteBatch {
private final MemoryLogRecordsArrowBuilder recordsBuilder;
private final ManagedPagedOutputView outputView;
private final AbstractPagedOutputView outputView;
private final List<MemorySegment> preAllocatedMemorySegments;

public ArrowLogWriteBatch(
TableBucket tableBucket,
PhysicalTablePath physicalTablePath,
int schemaId,
ArrowWriter arrowWriter,
MemorySegment initMemorySegment,
List<MemorySegment> preAllocatedMemorySegments,
MemorySegmentPool memorySegmentSource) {
super(tableBucket, physicalTablePath);
this.outputView = new ManagedPagedOutputView(initMemorySegment, memorySegmentSource);
this.outputView =
new PreAllocatedManagedPagedOutputView(
preAllocatedMemorySegments, memorySegmentSource);
this.recordsBuilder =
MemoryLogRecordsArrowBuilder.builder(schemaId, arrowWriter, outputView);
this.preAllocatedMemorySegments = preAllocatedMemorySegments;
}

@Override
Expand Down Expand Up @@ -124,9 +129,15 @@ public int sizeInBytes() {

@Override
public List<MemorySegment> memorySegments() {
return outputView.getSegmentBytesViewList().stream()
.map(MemorySegmentBytesView::getMemorySegment)
.collect(Collectors.toList());
List<MemorySegment> usedMemorySegments =
outputView.getSegmentBytesViewList().stream()
.map(MemorySegmentBytesView::getMemorySegment)
.collect(Collectors.toList());
if (usedMemorySegments.size() > preAllocatedMemorySegments.size()) {
return usedMemorySegments;
} else {
return preAllocatedMemorySegments;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -183,7 +182,7 @@ public RecordAppendResult append(
// We keep track of the number of appending thread to make sure we do not miss batches in
// abortIncompleteBatches().
appendsInProgress.incrementAndGet();
MemorySegment memorySegment = null;
List<MemorySegment> memorySegments = new ArrayList<>();
WriteBatch.WriteBatchType writeBatchType = null;
List<WriteBatch> batchesToBuild = new ArrayList<>(1);
try {
Expand All @@ -207,7 +206,7 @@ public RecordAppendResult append(

TableInfo tableInfo = cluster.getTableOrElseThrow(physicalTablePath.getTablePath());
writeBatchType = getWriteBatchType(writeRecord, tableInfo);
memorySegment = allocateMemorySegment(writeRecord, writeBatchType);
memorySegments = allocateMemorySegments(writeRecord, writeBatchType);
synchronized (dq) {
RecordAppendResult appendResult =
appendNewBatch(
Expand All @@ -217,19 +216,19 @@ public RecordAppendResult append(
tableInfo,
writeBatchType,
dq,
memorySegment,
memorySegments,
cluster,
batchesToBuild);
if (appendResult.newBatchCreated) {
memorySegment = null;
memorySegments = new ArrayList<>();
}
return appendResult;
}
} finally {
// Other append operations by the Sender thread may have created a new batch, causing
// the temporarily allocated memorySegment here to go unused, and therefore, it needs to
// be released.
deallocateMemorySegment(memorySegment, writeBatchType);
// the temporarily allocated memorySegments here to go unused, and therefore, it needs
// to be released.
deallocateMemorySegment(memorySegments, writeBatchType);
appendsInProgress.decrementAndGet();

// we need to serialize the batch (may allocate memory segments) out of the
Expand Down Expand Up @@ -400,29 +399,37 @@ private WriteBatch.WriteBatchType getWriteBatchType(
}
}

private MemorySegment allocateMemorySegment(
private List<MemorySegment> allocateMemorySegments(
WriteRecord writeRecord, WriteBatch.WriteBatchType writeBatchType)
throws InterruptedException {
throws InterruptedException, IOException {
if (writeBatchType == WriteBatch.WriteBatchType.ARROW_LOG) {
return memorySegmentPool.nextSegment(true);
// pre-allocate a list of memory segments to hold by the arrow batch to avoid deadlock
// in sender thread.
int pageSize = memorySegmentPool.pageSize();
int memorySegmentSize = batchSize / pageSize;
List<MemorySegment> memorySegments = new ArrayList<>(memorySegmentSize);
for (int i = 0; i < memorySegmentSize; i++) {
memorySegments.add(memorySegmentPool.nextSegment(true));
}
return memorySegments;
} else {
// get the new size.
int size = Math.max(batchSize, writeRecord.getEstimatedSizeInBytes());
// TODO check the remaining time to wait for allocating memory segment.
return writerMemoryBuffer.allocate(size, Long.MAX_VALUE);
return Collections.singletonList(writerMemoryBuffer.allocate(size, Long.MAX_VALUE));
}
}

private void deallocateMemorySegment(
@Nullable MemorySegment memorySegment, WriteBatch.WriteBatchType writeBatchType) {
if (memorySegment == null) {
List<MemorySegment> memorySegmentList, WriteBatch.WriteBatchType writeBatchType) {
if (memorySegmentList.isEmpty()) {
return;
}

if (writeBatchType == WriteBatch.WriteBatchType.ARROW_LOG) {
memorySegmentPool.returnPage(memorySegment);
memorySegmentPool.returnAll(memorySegmentList);
} else {
writerMemoryBuffer.deallocate(memorySegment);
writerMemoryBuffer.deallocate(memorySegmentList.get(0));
}
}

Expand Down Expand Up @@ -505,7 +512,7 @@ private RecordAppendResult appendNewBatch(
TableInfo tableInfo,
WriteBatch.WriteBatchType writeBatchType,
Deque<WriteBatch> deque,
MemorySegment segment,
List<MemorySegment> segments,
Cluster cluster,
List<WriteBatch> batchesToBuild)
throws Exception {
Expand All @@ -521,14 +528,15 @@ private RecordAppendResult appendNewBatch(
// If the table is kv table we need to create a kv batch, otherwise we create a log batch.
WriteBatch batch;
if (writeBatchType == WriteBatch.WriteBatchType.KV) {
MemorySegment memorySegment = segments.get(0);
batch =
new KvWriteBatch(
tb,
physicalTablePath,
DefaultKvRecordBatch.Builder.builder(
tableInfo.getSchemaId(),
segment.size(),
new MemorySegmentOutputView(segment),
memorySegment.size(),
new MemorySegmentOutputView(memorySegment),
tableInfo.getTableDescriptor().getKvFormat()),
writeRecord.getTargetColumns());
} else if (writeBatchType == WriteBatch.WriteBatchType.ARROW_LOG) {
Expand All @@ -544,15 +552,16 @@ private RecordAppendResult appendNewBatch(
physicalTablePath,
tableInfo.getSchemaId(),
arrowWriter,
segment,
segments,
memorySegmentPool);
} else {
MemorySegment memorySegment = segments.get(0);
batch =
new IndexedLogWriteBatch(
tb,
physicalTablePath,
MemoryLogRecordsIndexedBuilder.builder(
tableInfo.getSchemaId(), segment.size(), segment));
tableInfo.getSchemaId(), memorySegment.size(), memorySegment));
}

batch.tryAppend(writeRecord, callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static com.alibaba.fluss.record.LogRecordReadContext.createArrowReadContext;
import static com.alibaba.fluss.record.TestData.DATA1_PHYSICAL_TABLE_PATH;
import static com.alibaba.fluss.record.TestData.DATA1_ROW_TYPE;
Expand Down Expand Up @@ -102,6 +106,65 @@ void testAppend() throws Exception {
}
}

@Test
void testAppendWithPreAllocatedMemorySegments() throws Exception {
int bucketId = 0;
int maxSizeInBytes = 1024;
int pageSize = 128;
List<MemorySegment> memorySegmentList = new ArrayList<>();
for (int i = 0; i < maxSizeInBytes / pageSize; i++) {
memorySegmentList.add(MemorySegment.wrap(new byte[pageSize]));
}

TableBucket tb = new TableBucket(DATA1_TABLE_ID, bucketId);
ArrowLogWriteBatch arrowLogWriteBatch =
new ArrowLogWriteBatch(
tb,
DATA1_PHYSICAL_TABLE_PATH,
DATA1_TABLE_INFO.getSchemaId(),
writerProvider.getOrCreateWriter(
tb.getTableId(),
DATA1_TABLE_INFO.getSchemaId(),
maxSizeInBytes,
DATA1_ROW_TYPE),
memorySegmentList,
new TestingMemorySegmentPool(128));
int count = 0;
while (arrowLogWriteBatch.tryAppend(
createWriteRecord(row(DATA1_ROW_TYPE, new Object[] {count, "a" + count})),
newWriteCallback())) {
count++;
}

// batch full.
boolean appendResult =
arrowLogWriteBatch.tryAppend(
createWriteRecord(row(DATA1_ROW_TYPE, new Object[] {1, "a"})),
newWriteCallback());
assertThat(appendResult).isFalse();

// close this batch.
arrowLogWriteBatch.close();
arrowLogWriteBatch.serialize();
BytesView bytesView = arrowLogWriteBatch.build();
MemoryLogRecords records =
MemoryLogRecords.pointToByteBuffer(bytesView.getByteBuf().nioBuffer());
LogRecordBatch batch = records.batches().iterator().next();
assertThat(batch.getRecordCount()).isEqualTo(count);
try (LogRecordReadContext readContext =
createArrowReadContext(DATA1_ROW_TYPE, DATA1_TABLE_INFO.getSchemaId());
CloseableIterator<LogRecord> recordsIter = batch.records(readContext)) {
int readCount = 0;
while (recordsIter.hasNext()) {
LogRecord record = recordsIter.next();
assertThat(record.getRow().getInt(0)).isEqualTo(readCount);
assertThat(record.getRow().getString(1).toString()).isEqualTo("a" + readCount);
readCount++;
}
assertThat(readCount).isEqualTo(count);
}
}

private WriteRecord createWriteRecord(IndexedRow row) {
return new WriteRecord(DATA1_PHYSICAL_TABLE_PATH, WriteKind.APPEND, row, null);
}
Expand All @@ -116,7 +179,7 @@ private ArrowLogWriteBatch createArrowLogWriteBatch(TableBucket tb, int maxSizeI
DATA1_TABLE_INFO.getSchemaId(),
maxSizeInBytes,
DATA1_ROW_TYPE),
MemorySegment.wrap(new byte[10 * 1024]),
Collections.singletonList(MemorySegment.wrap(new byte[10 * 1024])),
new TestingMemorySegmentPool(10 * 1024));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ final class SenderTest {
private static final int TOTAL_MEMORY_SIZE = 1024 * 1024;
private static final int MAX_REQUEST_SIZE = 1024 * 1024;
private static final int BATCH_SIZE = 16 * 1024;
private static final int PAGE_SIZE = 256;
private static final int REQUEST_TIMEOUT = 5000;
private static final short ACKS_ALL = -1;
private static final int MAX_INFLIGHT_REQUEST_PER_BUCKET = 5;
Expand Down Expand Up @@ -583,6 +584,7 @@ private Sender setupWithIdempotenceState(
Configuration conf = new Configuration();
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new MemorySize(TOTAL_MEMORY_SIZE));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new MemorySize(BATCH_SIZE));
conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new MemorySize(PAGE_SIZE));
conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, Duration.ofMillis(batchTimeoutMs));
accumulator = new RecordAccumulator(conf, idempotenceManager, writerMetricGroup);
return new Sender(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alibaba.fluss.memory;

import com.alibaba.fluss.record.bytesview.MemorySegmentBytesView;
import com.alibaba.fluss.utils.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,6 +56,8 @@ protected AbstractPagedOutputView(MemorySegment initialSegment, int pageSize) {
if (initialSegment == null) {
throw new NullPointerException("Initial Segment may not be null");
}
Preconditions.checkArgument(
initialSegment.size() == pageSize, "Initial segment size must match page size.");
this.pageSize = pageSize;
this.currentSegment = initialSegment;
this.positionInSegment = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@
import com.alibaba.fluss.annotation.VisibleForTesting;
import com.alibaba.fluss.config.ConfigOptions;
import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.exception.BufferExhaustedException;
import com.alibaba.fluss.exception.FlussRuntimeException;

import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;

import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -49,7 +50,7 @@
public class LazyMemorySegmentPool implements MemorySegmentPool, Closeable {

private static final long PER_REQUEST_MEMORY_SIZE = 16 * 1024 * 1024;
private static final long DEFAULT_WAIT_TIMEOUT_MS = 1000 * 60 * 2;
private static final long DEFAULT_WAIT_TIMEOUT_MS = 1000 * 60 * 5;

/** The lock to guard the memory pool. */
private final ReentrantLock lock = new ReentrantLock();
Expand Down Expand Up @@ -106,7 +107,7 @@ public static LazyMemorySegmentPool create(Configuration conf) {
}

@Override
public MemorySegment nextSegment(boolean waiting) {
public MemorySegment nextSegment(boolean waiting) throws IOException {
return inLock(
lock,
() -> {
Expand All @@ -132,14 +133,14 @@ public MemorySegment nextSegment(boolean waiting) {
});
}

private MemorySegment waitForSegment() {
private MemorySegment waitForSegment() throws EOFException {
Condition moreMemory = lock.newCondition();
waiters.addLast(moreMemory);
try {
while (cachePages.isEmpty()) {
boolean success = moreMemory.await(maxTimeToBlockMs, TimeUnit.MILLISECONDS);
if (!success) {
throw new BufferExhaustedException(
throw new EOFException(
"Failed to allocate new segment within the configured max blocking time "
+ maxTimeToBlockMs
+ " ms. Total memory: "
Expand All @@ -152,6 +153,8 @@ private MemorySegment waitForSegment() {
}
checkClosed();
}

this.pageUsage++;
return cachePages.remove(cachePages.size() - 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class ManagedPagedOutputView extends AbstractPagedOutputView {
private final MemorySegmentPool segmentPool;

public ManagedPagedOutputView(MemorySegmentPool segmentPool) {
public ManagedPagedOutputView(MemorySegmentPool segmentPool) throws IOException {
this(segmentPool.nextSegment(true), segmentPool);
}

Expand Down
Loading

0 comments on commit 71f97f1

Please sign in to comment.