Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt to the latest environment:hadoop2.8.3 jdk1.8 #2

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
BigFatLM V0.1
By Jonathan Clark
Carnegie-Mellon University -- School of Computer Science -- Language Technologies Institute
Released: March 31, 2011
Released: May 24, 2018
Liscence: LGPL 3.0
"Embiggen your language models"

Expand Down Expand Up @@ -34,7 +34,7 @@ and found to be within floating point error.
REQUIREMENTS

* Java 6+
* A Hadoop cluster running Hadoop 20.1+ or, for much slower results, a local install of Hadoop.
* A Hadoop cluster running Hadoop 2.8.3+ or, for much slower results, a local install of Hadoop.
* If you wish to build BigFatLM from source, you'll need the Apache Ant build system
* If you wish to filter the language model, you should also install KenLM's filter tool.
See http://kheafield.com/code/mt/filter.html.
Expand Down
4 changes: 2 additions & 2 deletions bigfat
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ case "$subtask" in
# MAPREDUCE-478 - Separate map/reduce jvm opts
run="hadoop jar bigfat.jar \
-Dmapred.job.map.memory.mb=3584 \
-Dmapred.job.queue.name=m45 \
-Dmapred.job.queue.name=default \
-Dmapred.map.tasks=$numTasks \
-Dmapred.reduce.tasks=$numTasks \
-Dmapred.map.tasks.speculative.execution=True \
Expand All @@ -52,7 +52,7 @@ case "$subtask" in
*)
run="hadoop jar $scriptDir/bigfat.jar \
-Dmapred.job.map.memory.mb=3584 \
-Dmapred.job.queue.name=m45 \
-Dmapred.job.queue.name=default \
-Dmapred.map.tasks=$numTasks \
-Dmapred.reduce.tasks=$numTasks \
-Dmapred.map.tasks.speculative.execution=True \
Expand Down
2 changes: 1 addition & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
</target>

<target name="compile" depends="init,resolve">
<javac compiler="javac1.5" srcdir="${src}" destdir="${build}" classpath="lib/jannopts.jar" classpathref="lib.path.id" debug="on" encoding="utf8" />
<javac includeantruntime="false" compiler="javac1.8" srcdir="${src}" destdir="${build}" classpath="lib/jannopts.jar" classpathref="lib.path.id" debug="on" encoding="utf8" />
</target>


Expand Down
4 changes: 2 additions & 2 deletions filter-lm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ EOF
)>$localTmpDir/filter-mapper.sh


frun="hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-Dmapred.job.queue.name=m45 \
frun="hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming.jar \
-Dmapred.job.queue.name=default \
-Dmapred.map.tasks=$numTasks \
-Dmapred.reduce.tasks=$numTasks \
-Dmapred.map.tasks.speculative.execution=True \
Expand Down
2 changes: 1 addition & 1 deletion ivy.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<dependency org="com.google.guava" name="guava" rev="r06"/>

<!-- Hadoop and its dependencies (which should have been referenced in hadoop-core) -->
<dependency org="org.apache.mahout.hadoop" name="hadoop-core" rev="0.20.1"/>
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" rev="2.8.3"/>
<dependency org="commons-logging" name="commons-logging" rev="1.1.1"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2"/>
<dependency org="commons-httpclient" name="commons-httpclient" rev="3.1"/>
Expand Down
5 changes: 3 additions & 2 deletions src/bigfat/hadoop/HDFSDirInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.zip.GZIPInputStream;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -19,6 +18,8 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import bigfat.util.MultiMemberGZIPInputStream;

/**
* This file is taken from the Chaski project under the Apache license.
*
Expand All @@ -33,7 +34,7 @@ protected InputStream getNextStream(String file) {
try {
FSDataInputStream stream = fs.open(new Path(file));
if (file.endsWith(".gz")) {
return new GZIPInputStream(stream);
return new MultiMemberGZIPInputStream(stream);
} else {
return stream;
}
Expand Down
7 changes: 4 additions & 3 deletions src/bigfat/hadoop/HadoopUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
Expand All @@ -27,6 +26,8 @@
import org.apache.hadoop.mapreduce.Reducer;

import bigfat.BigFatLM;
import bigfat.util.MultiMemberGZIPInputStream;
import bigfat.util.MultiMemberGZIPInputStream;

import com.google.common.collect.Lists;

Expand All @@ -45,7 +46,7 @@ public static void copyHdfsDirToDiskFile(Path hdfsDir, File diskFile,

InputStream is = new HDFSDirInputStream(hdfsDir.toString());
if (gunzip) {
is = new GZIPInputStream(is);
is = new MultiMemberGZIPInputStream(is);
}
BufferedReader in = new BufferedReader(new InputStreamReader(is));
PrintWriter out = new PrintWriter(diskFile);
Expand Down Expand Up @@ -84,7 +85,7 @@ public static void runJob(Job job) throws IOException,
}

private static void printCounters(Job job) throws IOException {
Collection<String> groups = job.getCounters().getGroupNames();
Iterable<String> groups = job.getCounters().getGroupNames();
for (String group : groups) {
if (!BigFatLM.PROGRAM_NAME.equals(group)) {
continue;
Expand Down
4 changes: 2 additions & 2 deletions src/bigfat/step7/ArpaMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.PriorityQueue;
import java.util.zip.GZIPInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
Expand All @@ -24,6 +23,7 @@
import org.apache.hadoop.io.compress.BZip2Codec;

import bigfat.BigFatLM;
import bigfat.util.MultiMemberGZIPInputStream;
import bigfat.BigFatLM.CompressionType;
import bigfat.util.StringUtils;
import bigfat.util.UncheckedLineIterable.UncheckedLineIterator;
Expand Down Expand Up @@ -118,7 +118,7 @@ public int compare(DArpaLineIterator o1, DArpaLineIterator o2) {
FSDataInputStream is = fs.open(stat.getPath());
InputStream compressedInputStream;
if(compression == CompressionType.GZIP) {
compressedInputStream = new GZIPInputStream(is);
compressedInputStream = new MultiMemberGZIPInputStream(is);
} else if(compression == CompressionType.BZIP) {
compressedInputStream = new BZip2Codec().createInputStream(is);
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/bigfat/util/IOUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.zip.GZIPInputStream;
import bigfat.util.MultiMemberGZIPInputStream;

public class IOUtils {
public static BufferedReader getBufferedReader(File file) throws FileNotFoundException,
IOException {
if (file.getName().endsWith(".gz")) {
return new BufferedReader(new InputStreamReader(new GZIPInputStream(
return new BufferedReader(new InputStreamReader(new MultiMemberGZIPInputStream(
new FileInputStream(file))));
} else {
return new BufferedReader(new InputStreamReader(new FileInputStream(file)));
Expand Down
93 changes: 93 additions & 0 deletions src/bigfat/util/MultiMemberGZIPInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package bigfat.util;

import java.io.IOException;
import java.io.InputStream;
import java.io.PushbackInputStream;
import java.util.zip.GZIPInputStream;

public class MultiMemberGZIPInputStream extends GZIPInputStream {
public MultiMemberGZIPInputStream(InputStream in, int size) throws IOException
{
// Wrap the stream in a PushbackInputStream…
super(new PushbackInputStream(in, size), size);
this.size=size;
}

public MultiMemberGZIPInputStream(InputStream in) throws IOException
{
// Wrap the stream in a PushbackInputStream…
super(new PushbackInputStream(in, 1024));
this.size=-1;
}

private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent) throws IOException
{
super(parent.in);
this.size=-1;
this.parent=parent.parent==null ? parent : parent.parent;
this.parent.child=this;
}

private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent, int size) throws IOException
{
super(parent.in, size);
this.size=size;
this.parent=parent.parent==null ? parent : parent.parent;
this.parent.child=this;
}

private MultiMemberGZIPInputStream parent;
private MultiMemberGZIPInputStream child;
private int size;
private boolean cvte_eos;

public int read(byte[] inputBuffer, int inputBufferOffset, int inputBufferLen) throws IOException {

if (cvte_eos) { return -1;}
if (this.child!=null)
return this.child.read(inputBuffer, inputBufferOffset, inputBufferLen);

int charsRead=super.read(inputBuffer, inputBufferOffset, inputBufferLen);
if (charsRead==-1)
{
// Push any remaining buffered data back onto the stream
// If the stream is then not empty, use it to construct
// a new instance of this class and delegate this and any
// future calls to it…
int n = inf.getRemaining() - 8;
if (n > 0)
{
// More than 8 bytes remaining in deflater
// First 8 are gzip trailer. Add the rest to
// any un-read data…
((PushbackInputStream)this.in).unread(buf, len-n, n);
}
else
{
// Nothing in the buffer. We need to know whether or not
// there is unread data available in the underlying stream
// since the base class will not handle an empty file.
// Read a byte to see if there is data and if so,
// push it back onto the stream…
byte[] b=new byte[1];
int ret=in.read(b,0,1);
if (ret==-1)
{
cvte_eos=true;
return -1;
}
else
((PushbackInputStream)this.in).unread(b, 0, 1);
}

MultiMemberGZIPInputStream child;
if (this.size==-1)
child=new MultiMemberGZIPInputStream(this);
else
child=new MultiMemberGZIPInputStream(this, this.size);
return child.read(inputBuffer, inputBufferOffset, inputBufferLen);
}
else
return charsRead;
}
}