Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of iterators and close them before closing the DB #181

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
- Upgrade ache-dashboard npm dependencies
- Upgrade gradle wrapper to version 5.6.1
- Update Dockerfile to use openjdk:11-jdk (Java 11)
- Keep track of iterators and close them before closing the DB (fix #113)

## Version 0.11.0

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
package focusedCrawler.util.persistence.rocksdb;

import com.google.common.base.Preconditions;
import focusedCrawler.util.CloseableIterator;
import focusedCrawler.util.KV;
import java.io.Closeable;
import java.io.File;
import java.io.UnsupportedEncodingException;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.rocksdb.Options;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import com.google.common.io.BaseEncoding;
import org.rocksdb.RocksIterator;

public abstract class AbstractRocksDbHashtable implements Closeable {

protected Options options;
protected RocksDB db;
private List<RocksDBIterator> iterators = new ArrayList<>();

static {
RocksDB.loadLibrary();
Expand Down Expand Up @@ -63,6 +70,11 @@ protected byte[] getBytes(byte[] keyBytes) {
@Override
public synchronized void close() {
if (db != null) {
for (Iterator<RocksDBIterator> listIt = this.iterators.iterator(); listIt.hasNext(); ) {
RocksDBIterator dbIt = listIt.next();
listIt.remove();
dbIt.close();
}
db.close();
db = null;
options.close();
Expand Down Expand Up @@ -107,4 +119,75 @@ static String bytesToString(byte[] bytes) {
}
}

protected RocksDBIterator openIterator() {
RocksDBIterator it = new RocksDBIterator(this.db);
this.iterators.add(it);
return it;
}

public class RocksDBIterator implements CloseableIterator<KV<byte[], byte[]>> {

private final RocksIterator cursor;
private boolean hasNext;
private boolean isOpen;
private byte[] value;
private byte[] key;
private RocksDB db;

private RocksDBIterator(RocksDB db) {
this.db = db;
this.cursor = db.newIterator();
this.cursor.seekToFirst();
this.isOpen = true;
readNextKV(true);
}

private void readNextKV(boolean firstEntry) {
if (!firstEntry) {
cursor.next();
}
if (cursor.isValid()) {
this.hasNext = true;
this.key = cursor.key();
this.value = cursor.value();
} else {
this.close();
}
}

@Override
public void close() {
if (this.isOpen) {
iterators.remove(this);
cursor.close();
this.isOpen = false;
this.hasNext = false;
}
}

@Override
public boolean hasNext() {
return hasNext;
}

@Override
public KV<byte[], byte[]> next() {
if (!hasNext) {
return null;
}
KV<byte[], byte[]> kv = new KV<>(this.key, this.value);
readNextKV(false);
return kv;
}

public void remove() {
try {
db.delete(key);
} catch (RocksDBException e) {
throw new RuntimeException("Failed to remove entry from RocksDb");
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public byte[] get(byte[] key) {

@Override
public CloseableIterator<KV<byte[], byte[]>> iterator() {
return new RocksDBIterator(this.db);
return super.openIterator();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public T get(byte[] key) {

@Override
public CloseableIterator<KV<byte[], T>> iterator() {
return new BytesObjectIterator(new RocksDBIterator(super.db));
return new BytesObjectIterator(super.openIterator());
}

private class BytesObjectIterator
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public T get(String key) {

@Override
public CloseableIterator<KV<String, T>> iterator() {
return new StringObjectIterator(new RocksDBIterator(super.db));
return new StringObjectIterator(super.openIterator());
}

protected class StringObjectIterator
Expand Down
27 changes: 13 additions & 14 deletions src/test/java/focusedCrawler/link/frontier/FrontierTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,20 @@
import focusedCrawler.util.persistence.PersistentHashtable.DB;

public class FrontierTest {

@Rule
public TemporaryFolder tempFolder = new TemporaryFolder();

public Path testPath;

private Frontier frontier;

@Before
public void setUp() throws IOException {
testPath = Paths.get(tempFolder.newFolder().toString());
frontier = new Frontier(testPath.toString(), 1000, DB.ROCKSDB);
}

@After
public void tearDown() throws IOException {
frontier.close();
Expand All @@ -45,44 +45,43 @@ public void shouldInsertUrl() throws Exception {
// given
LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1);
LinkRelevance link2 = new LinkRelevance(new URL("http://www.example2.com/index.html"), 1);

// when
frontier.insert(link1);

// then
assertThat(frontier.exist(link1), is(1d));
assertThat(frontier.exist(link2), is(nullValue()));
}

@Test
public void shouldInsertUrlsAndSelectGivenNumberOfUrls() throws Exception {
// given
LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1);
LinkRelevance link2 = new LinkRelevance(new URL("http://www.example2.com/index.html"), 2);

// when
frontier.insert(link1);
frontier.insert(link2);

// then
assertThat(frontier.exist(link1), is(notNullValue()));
assertThat(frontier.exist(link1), is(1d));

assertThat(frontier.exist(link2), is(notNullValue()));
assertThat(frontier.exist(link2), is(2d));
}



@Test
public void shouldInsertAndDeleteUrl() throws Exception {
// given
LinkRelevance link1 = new LinkRelevance(new URL("http://www.example1.com/index.html"), 1);

// when
frontier.insert(link1);
// then
assertThat(frontier.exist(link1), is(1d));

// when
frontier.delete(link1);
// then
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package focusedCrawler.util.persistence;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;

import java.net.URL;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
Expand Down Expand Up @@ -159,4 +161,37 @@ public void shoudNotCrashWhenIterateOverEmptyHashtable() throws Exception {
}
}

@Test
public void shouldNotCrashWhenIterateOnClosedHashtable() throws Exception {
// given
PersistentHashtable<Integer> ht = new PersistentHashtable<>(
tempFolder.newFolder().toString(), 1000, Integer.class, database);

// when
ht.put("1", 1);
ht.put("2", 2);
ht.commit();

TupleIterator<Integer> it;

// when we open an iterator
it = ht.iterator();
// and close the hash table (not the iterator)
ht.close();

// (following calls used to crash the JVM before the bug-fix (issue #113)
boolean hasNext1 = it.hasNext();
Tuple<Integer> next1 = it.next();

boolean hasNext2 = it.hasNext();
Tuple<Integer> next2 = it.next();

// then
assertThat(hasNext1, is(false));
assertThat(next1, is(nullValue()));

assertThat(hasNext2, is(false));
assertThat(next2, is(nullValue()));
}

}