Skip to content

Commit

Permalink
Added a test decompression feature, bugfixed EnConfig, made a new rel…
Browse files Browse the repository at this point in the history
…ease jar
  • Loading branch information
marcusklang committed Jul 17, 2015
1 parent 599dbef commit 5672123
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 76 deletions.
Binary file removed dist/wikiforia-1.1.1.jar
Binary file not shown.
Binary file added dist/wikiforia-1.2.1.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>se.lth.cs.nlp</groupId>
<artifactId>wikiforia</artifactId>
<version>1.2</version>
<version>1.2.1</version>
<packaging>jar</packaging>

<name>wikiforia</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,18 @@ public class MultistreamBzip2XmlDumpParser extends AbstractEmitter<Page,Void> im
* Represents a page block
*/
private final static class PageBlock {
private final Block block;
private final byte[] buffer;

public PageBlock(byte[] buffer) {
public PageBlock(Block block, byte[] buffer) {
this.block = block;
this.buffer = buffer;
}

public Block getBlock() {
return block;
}

public final byte[] getBuffer() {
return buffer;
}
Expand Down Expand Up @@ -102,19 +108,29 @@ public MultistreamBzip2XmlDumpParser(File index, File pages, int batchsize, int
this.batchsize = batchsize;
}

private static class Block {
public final long start;
public final int size;

public Block(long start, int size) {
this.start = start;
this.size = size;
}
}

/**
* The index reader
*/
private static class IndexReader {
private BufferedReader indexReader;
private final long pageFileSize;
private final int bufferAhead;
private final ArrayDeque<Integer> buffer;
private final ArrayDeque<Block> buffer;

public IndexReader(File indexFile, File pageFile, int bufferAhead) {
try {
this.pageFileSize = pageFile.length();
this.buffer = new ArrayDeque<Integer>();
this.buffer = new ArrayDeque<Block>();
this.bufferAhead = bufferAhead;
this.indexReader =
new BufferedReader(
Expand All @@ -138,21 +154,21 @@ private void fillBuffer() {
String line;
while( (line = indexReader.readLine()) != null && counter < bufferAhead) {
if(line.isEmpty())
break;
continue;

int pos = line.indexOf(':');
long current = Long.parseLong(line.substring(0, pos));
if(start != current) {
long diff = current - start;
buffer.addLast((int) diff);
buffer.addLast(new Block(start, (int)diff));
start = current;

counter++;
}
}

if(counter != bufferAhead) {
buffer.addLast((int) (pageFileSize - start));
buffer.addLast(new Block(start, (int)(pageFileSize - start)));
indexReader.close();
indexReader = null;
}
Expand All @@ -161,22 +177,22 @@ private void fillBuffer() {
}
}

public int peek() {
public Block peek() {
if(buffer.isEmpty())
fillBuffer();

return buffer.isEmpty() ? -1 : buffer.peekFirst();
return buffer.isEmpty() ? null : buffer.peekFirst();
}

/**
* Get next block to read
* @return -1 if no more blocks to read, integer > 0 if more blocks to read
*/
public int next() {
public Block next() {
if(buffer.isEmpty())
fillBuffer();

return buffer.isEmpty() ? -1 : buffer.removeFirst();
return buffer.isEmpty() ? null : buffer.removeFirst();
}
}

Expand Down Expand Up @@ -215,7 +231,7 @@ public static Header parseHeader(String xml) throws XMLStreamException
}

private Header readHeader() throws IOException {
byte[] header = next();
byte[] header = next().buffer;

ByteArrayInputStream bais = new ByteArrayInputStream(header);
BZip2CompressorInputStream bcis = new BZip2CompressorInputStream(bais);
Expand Down Expand Up @@ -247,36 +263,47 @@ public Header getHeader() {
* Read page blocks
* @return null if no more blocks, otherwise a byte[] with the block
*/
public byte[] next() {
int size = indexReader.next();
if(size == -1)
public PageBlock next() {
Block block = indexReader.next();
if(block == null)
return null;

try
{
byte[] buffer = new byte[size];
int left = size;
byte[] buffer = new byte[block.size];
int left = block.size;
while(left > 0) {
int read = pageStream.read(buffer, size - left, left);
int read = pageStream.read(buffer, block.size - left, left);
if(read == -1)
throw new IOError(new EOFException("Unexpected end of file!"));

left -= read;
}

return buffer;
return new PageBlock(block, buffer);
} catch (IOException e) {
throw new IOError(e);
}
}
}

protected final InputStream getStream() {
protected final ParallelDumpStream getStream() {
return new ParallelDumpStream();
}

protected class ParallelDumpStream extends InputStream {

private PageBlock lastBlock = null;
private PageBlock currentBlock = null;

public PageBlock getLastBlock() {
return lastBlock;
}

public PageBlock getCurrentBlock() {
return currentBlock;
}

private byte[] buffer = new byte[0];
private int pos = 0;

Expand All @@ -291,6 +318,8 @@ private boolean getNext() {
else {
pos = 0;
this.buffer = block.getBuffer();
this.lastBlock = this.currentBlock;
this.currentBlock = block;
return true;
}
} catch (InterruptedException e) {
Expand Down Expand Up @@ -338,15 +367,41 @@ public class Worker extends Thread {
public void run()
{
try {
XmlDumpParser parser = new XmlDumpParser(pageReader.getHeader(), new BZip2CompressorInputStream(getStream(), true));
ParallelDumpStream dumpStream = getStream();
XmlDumpParser parser = new XmlDumpParser(pageReader.getHeader(), new BZip2CompressorInputStream(dumpStream, true));
ArrayList<Page> batch = new ArrayList<Page>(batchsize);

Page page;
while((page = parser.next()) != null) {
batch.add(page);
if(batch.size() == batchsize)
{
output(batch);
try {
output(batch);
}
catch (Exception ex) {
//Save prev block, current block
PageBlock currentBlock = dumpStream.getCurrentBlock();
PageBlock prevBlock = dumpStream.getLastBlock();

if(currentBlock != null && currentBlock.block != null) {
File output = new File("stream-current-" + currentBlock.block.start + "-" + currentBlock.block.size + ".xml.bz2");
FileOutputStream outputStream = new FileOutputStream(output);
outputStream.write(currentBlock.buffer);
outputStream.flush();
outputStream.close();
}

if(prevBlock != null && prevBlock.block != null) {
File output = new File("stream-prev-" + prevBlock.block.start + "-" + prevBlock.block.size + ".xml.bz2");
FileOutputStream outputStream = new FileOutputStream(output);
outputStream.write(prevBlock.buffer);
outputStream.flush();
outputStream.close();
}

throw new IOError(ex);
}
batch = new ArrayList<Page>(batchsize);
}
}
Expand Down Expand Up @@ -386,10 +441,10 @@ public void uncaughtException(Thread th, Throwable ex) {
}

//2. Seed them with data until there is no more
byte[] data;
PageBlock data;
while((data = pageReader.next()) != null && !terminate.get()) {
try {
blocks.put(new PageBlock(data));
blocks.put(data);
} catch (InterruptedException e) {
logger.error("Data put interrupted", e);
break;
Expand All @@ -398,7 +453,7 @@ public void uncaughtException(Thread th, Throwable ex) {

for (int i = 0; i < workers.length; i++) {
try {
blocks.put(new PageBlock(null));
blocks.put(new PageBlock(null, null));
} catch (InterruptedException e) {
logger.info("Termination interrupted", e);
break;
Expand Down
33 changes: 30 additions & 3 deletions src/main/java/se/lth/cs/nlp/wikiforia/App.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.io.File;
import java.io.IOError;
import java.io.IOException;
import java.nio.channels.Pipe;
import java.util.ArrayList;
import java.util.TreeSet;
import java.util.regex.Matcher;
Expand Down Expand Up @@ -94,6 +95,10 @@ public class App
.withDescription("set split size (defaults to 64 M UTF-8 chars), only applicable with hadoop, max value = 2 G")
.create("splitsize");

@SuppressWarnings("static-access")
private static final Option testDecompression = OptionBuilder.withLongOpt("test")
.create("test");

@SuppressWarnings("static-access")
private static final Option output = OptionBuilder.withLongOpt("output")
.withDescription("xml output filepath")
Expand Down Expand Up @@ -234,6 +239,22 @@ public static void convert(
pipeline.run();
}

public static void test(TemplateConfig config,
File indexPath,
File pagesPath,
int numThreads,
int batchsize) {
Source<Page,Void> source;

if(index == null)
source = new SinglestreamXmlDumpParser(pagesPath, batchsize);
else
source = new MultistreamBzip2XmlDumpParser(indexPath, pagesPath, batchsize, numThreads);

Pipeline pipeline = new Pipeline(source, null, config, true);
pipeline.run();
}

/**
* Application entrypoint
* @param args input arguments
Expand All @@ -242,7 +263,7 @@ public static void main( String[] args )
{
Logger logger = LoggerFactory.getLogger(App.class);

logger.info("Wikiforia v1.1.1 by Marcus Klang");
logger.info("Wikiforia v1.2.1 by Marcus Klang");

Options options = new Options();
options.addOption(index);
Expand All @@ -253,6 +274,7 @@ public static void main( String[] args )
options.addOption(lang);
options.addOption(hadoop);
options.addOption(gzip);
options.addOption(testDecompression);
options.addOption(filterNs);

CommandLineParser parser = new PosixParser();
Expand All @@ -278,7 +300,7 @@ public static void main( String[] args )
outputPath = new File(cmdline.getOptionValue(output.getOpt()));

//Create output directories if they do not exist
if(!outputPath.getParentFile().getAbsoluteFile().exists()) {
if(!outputPath.getAbsoluteFile().getParentFile().getAbsoluteFile().exists()) {
if (!outputPath.getParentFile().getAbsoluteFile().mkdirs()) {
throw new IOError(new IOException("Failed to create directories for " + outputPath.getParentFile().getAbsolutePath()));
}
Expand Down Expand Up @@ -384,7 +406,12 @@ public String toString() {
}
}
else {
convert(config,indexPath,pagesPath, outputPath, numThreads, batchsize, filters);
if(cmdline.hasOption(testDecompression.getOpt())) {
test(config, indexPath, pagesPath, numThreads, batchsize);
}
else {
convert(config,indexPath,pagesPath, outputPath, numThreads, batchsize, filters);
}
}

} catch (ParseException e) {
Expand Down
Loading

0 comments on commit 5672123

Please sign in to comment.