Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Manual Compaction to HaloDB (#45) #48

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 54 additions & 48 deletions src/main/java/com/oath/halodb/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,18 @@

package com.oath.halodb;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.RateLimiter;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.channels.FileChannel;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.util.concurrent.RateLimiter;

class CompactionManager {
private static final Logger logger = LoggerFactory.getLogger(CompactionManager.class);

Expand All @@ -41,7 +41,11 @@ class CompactionManager {
private volatile long totalSizeOfRecordsCopied = 0;
private volatile long compactionStartTime = System.currentTimeMillis();

private static final int STOP_SIGNAL = -10101;
// These are purposely 'newed' up because we use reference equality to check the signals and the value does not matter
// signal for the compactor to top its thread after finishing already queued tasks
private static final Integer STOP_SIGNAL = new Integer(-1);
// signal for the compactor thread to stop its thread after finishing any active task but not taking more tasks;
private static final Integer WAKE_SIGNAL = new Integer(-1);

private final ReentrantLock startStopLock = new ReentrantLock();
private volatile boolean stopInProgress = false;
Expand All @@ -53,16 +57,21 @@ class CompactionManager {
}

// If a file is being compacted we wait for it complete before stopping.
boolean stopCompactionThread(boolean closeCurrentWriteFile) throws IOException {
boolean stopCompactionThread(boolean closeCurrentWriteFile, boolean awaitPending) throws IOException {
stopInProgress = true;
startStopLock.lock();
try {
isRunning = false;
if (isCompactionRunning()) {
// We don't want to call interrupt on compaction thread as it
// may interrupt IO operations and leave files in an inconsistent state.
// instead we use -10101 as a stop signal.
compactionQueue.put(STOP_SIGNAL);
if (awaitPending) {
// we send a stop signal that will stop the thread after existing items in the queue complete
compactionQueue.put(STOP_SIGNAL);
} else {
// set the running flag to false, then send the wake signal. If the queue is empty it will immediately
// consume the signal to wake up the thread and stop.
// if the queue is not empty, then after the current task completes the 'isRunning' flag will stop it
isRunning = false;
compactionQueue.put(WAKE_SIGNAL);
}
compactionThread.join();
if (closeCurrentWriteFile && currentWriteFile != null) {
currentWriteFile.flushToDisk();
Expand Down Expand Up @@ -95,9 +104,14 @@ void startCompactionThread() {
}
}

void pauseCompactionThread() throws IOException {
/**
* Stop the compaction thread, blocking until it has stopped.
* If awaitPending is true, stops after all outstanding compaction tasks in the queue
* have completed. Otherwise, stops after the current task completes.
**/
void pauseCompactionThread(boolean awaitPending) throws IOException {
logger.info("Pausing compaction thread ...");
stopCompactionThread(false);
stopCompactionThread(false, awaitPending);
}

void resumeCompaction() {
Expand All @@ -109,7 +123,7 @@ int getCurrentWriteFileId() {
return currentWriteFile != null ? currentWriteFile.getFileId() : -1;
}

boolean submitFileForCompaction(int fileId) {
boolean submitFileForCompaction(Integer fileId) {
return compactionQueue.offer(fileId);
}

Expand Down Expand Up @@ -178,39 +192,53 @@ private class CompactionThread extends Thread {
startStopLock.lock();
try {
compactionThread = null;
startCompactionThread();
if (isRunning) {
startCompactionThread();
}
} finally {
startStopLock.unlock();
}
}
else {
logger.info("Not restarting thread as the lock is held by stop compaction method.");
}

});
}

@Override
public void run() {
logger.info("Starting compaction thread ...");
int fileToCompact = -1;

while (isRunning) {
Integer fileToCompact = null;
try {
fileToCompact = compactionQueue.take();
if (fileToCompact == STOP_SIGNAL) {
fileToCompact = compactionQueue.poll(1, TimeUnit.SECONDS);
if (fileToCompact == STOP_SIGNAL) { // reference, not value equality on purpose, these are sentinel objects
logger.debug("Received a stop signal.");
// skip rest of the steps and check status of isRunning flag.
// while pausing/stopping compaction isRunning flag must be set to false.
// in this case, isRunning was not set to false already. The signal had to work its way through the
// queue behind the other tasks. So set 'isRunning' to false and break out of the loop to halt.
isRunning = false;
break;
}
if (fileToCompact == WAKE_SIGNAL || fileToCompact == null) {
// scenario: the queue has a long list of files to compact. We add this signal to the queue after
// setting 'isRunning' to false, so all we need to do is break out of the loop and it will shut down
// without processing more tasks.
// If we do break out of this loop with tasks in the queue, then this signal may still be in the queue
// behind those tasks.
// If the thread is resumed later, the signal will be processed after resuming the compactor.
// If we were to set 'isRunning' to false here, that would shut down the
// recently resumed thread when this signal arrived.
continue;
}
logger.debug("Compacting {} ...", fileToCompact);
copyFreshRecordsToNewFile(fileToCompact);
logger.debug("Completed compacting {} to {}", fileToCompact, getCurrentWriteFileId());
dbInternal.markFileAsCompacted(fileToCompact);
dbInternal.deleteHaloDBFile(fileToCompact);
}
catch (Exception e) {
fileToCompact = null;
} catch (InterruptedException ie) {
break;
} catch (Exception e) {
logger.error(String.format("Error while compacting file %d to %d", fileToCompact, getCurrentWriteFileId()), e);
}
}
Expand Down Expand Up @@ -322,26 +350,4 @@ void forceRolloverCurrentWriteFile() throws IOException {
dbInternal.getDbDirectory().syncMetaData();
currentWriteFileOffset = 0;
}

// Used only for tests. to be called only after all writes in the test have been performed.
@VisibleForTesting
synchronized boolean isCompactionComplete() {

if (!isCompactionRunning())
return true;

if (compactionQueue.isEmpty()) {
try {
isRunning = false;
submitFileForCompaction(STOP_SIGNAL);
compactionThread.join();
} catch (InterruptedException e) {
logger.error("Error in isCompactionComplete", e);
}

return true;
}

return false;
}
}
28 changes: 17 additions & 11 deletions src/main/java/com/oath/halodb/HaloDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,10 @@

package com.oath.halodb;

import com.google.common.annotations.VisibleForTesting;

import java.io.File;
import java.io.IOException;
import java.util.Set;

import com.google.common.annotations.VisibleForTesting;

public final class HaloDB {

Expand Down Expand Up @@ -80,14 +79,27 @@ public HaloDBIterator newIterator() throws HaloDBException {
return new HaloDBIterator(dbInternal);
}

public void pauseCompaction() throws HaloDBException {
/**
* Force a compaction on all data files that have more stale data than the provided threshold ratio.
* A compactionThreshold of 0 would force all files that have any stale data to compact,
* 0.1 would force those that have more than 10% space stale to compact.
**/
public void forceCompaction(float compactionThreshold) {
dbInternal.forceCompaction(compactionThreshold);
}

public void pauseCompaction(boolean awaitPending) throws HaloDBException {
try {
dbInternal.pauseCompaction();
dbInternal.pauseCompaction(awaitPending);
} catch (IOException e) {
throw new HaloDBException("Error while trying to pause compaction thread", e);
}
}

public void pauseCompaction() throws HaloDBException {
pauseCompaction(false);
}

public boolean snapshot() {
return dbInternal.takeSnapshot();
}
Expand All @@ -105,12 +117,6 @@ public void resumeCompaction() {
}

// methods used in tests.

@VisibleForTesting
boolean isCompactionComplete() {
return dbInternal.isCompactionComplete();
}

@VisibleForTesting
boolean isTombstoneFilesMerging() {
return dbInternal.isTombstoneFilesMerging();
Expand Down
24 changes: 14 additions & 10 deletions src/main/java/com/oath/halodb/HaloDBInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class HaloDBInternal {

private volatile Thread tombstoneMergeThread;

private Map<Integer, HaloDBFile> readFileMap = new ConcurrentHashMap<>();
private final Map<Integer, HaloDBFile> readFileMap = new ConcurrentHashMap<>();

HaloDBOptions options;

Expand Down Expand Up @@ -167,7 +167,7 @@ synchronized void close() throws IOException {
isClosing = true;

try {
if(!compactionManager.stopCompactionThread(true))
if(!compactionManager.stopCompactionThread(true, false))
setIOErrorFlag();
} catch (IOException e) {
logger.error("Error while stopping compaction thread. Setting IOError flag", e);
Expand Down Expand Up @@ -314,7 +314,7 @@ synchronized boolean takeSnapshot() {

try {
final int currentWriteFileId;
compactionManager.pauseCompactionThread();
compactionManager.pauseCompactionThread(false);

// Only support one snapshot now
// TODO: support multiple snapshots if needed
Expand Down Expand Up @@ -417,8 +417,8 @@ void setIOErrorFlag() throws IOException {
metaData.storeToFile();
}

void pauseCompaction() throws IOException {
compactionManager.pauseCompactionThread();
void pauseCompaction(boolean awaitPending) throws IOException {
compactionManager.pauseCompactionThread(awaitPending);
}

void resumeCompaction() {
Expand Down Expand Up @@ -481,6 +481,15 @@ private void markPreviousVersionAsStale(byte[] key, InMemoryIndexMetaData record
addFileToCompactionQueueIfThresholdCrossed(recordMetaData.getFileId(), staleRecordSize);
}

void forceCompaction(float compactionThreshold) {
staleDataPerFileMap.forEach((fileId, staleData) -> {
HaloDBFile file = readFileMap.get(fileId);
if (staleData > 0 && staleData >= file.getSize() * compactionThreshold) {
compactionManager.submitFileForCompaction(fileId);
}
});
}

void addFileToCompactionQueueIfThresholdCrossed(int fileId, int staleRecordSize) {
HaloDBFile file = readFileMap.get(fileId);
if (file == null)
Expand Down Expand Up @@ -955,11 +964,6 @@ private Map<Integer, Double> computeStaleDataMapForStats() {
}

// Used only in tests.
@VisibleForTesting
boolean isCompactionComplete() {
return compactionManager.isCompactionComplete();
}

@VisibleForTesting
boolean isTombstoneFilesMerging() {
return isTombstoneFilesMerging;
Expand Down
Loading