From f83a83e69158697f5eb21fd08ec18c6270334aea Mon Sep 17 00:00:00 2001 From: ageliver Date: Tue, 22 May 2018 20:24:47 +0800 Subject: [PATCH 1/5] environment : hadoop 2.8.3 jdk 1.8 --- build.xml | 2 +- ivy.xml | 2 +- src/bigfat/hadoop/HadoopUtils.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/build.xml b/build.xml index cdb7a38..acbf575 100644 --- a/build.xml +++ b/build.xml @@ -60,7 +60,7 @@ - + diff --git a/ivy.xml b/ivy.xml index 17a7e19..d347be5 100644 --- a/ivy.xml +++ b/ivy.xml @@ -4,7 +4,7 @@ - + diff --git a/src/bigfat/hadoop/HadoopUtils.java b/src/bigfat/hadoop/HadoopUtils.java index d9e9600..49666b6 100644 --- a/src/bigfat/hadoop/HadoopUtils.java +++ b/src/bigfat/hadoop/HadoopUtils.java @@ -84,7 +84,7 @@ public static void runJob(Job job) throws IOException, } private static void printCounters(Job job) throws IOException { - Collection groups = job.getCounters().getGroupNames(); + Iterable groups = job.getCounters().getGroupNames(); for (String group : groups) { if (!BigFatLM.PROGRAM_NAME.equals(group)) { continue; From 893c0397991a9bbb267c165ab60b10b747cfe268 Mon Sep 17 00:00:00 2001 From: Ageliver Date: Thu, 24 May 2018 09:40:50 +0800 Subject: [PATCH 2/5] deal with EOFECXCEPTION, cause by "GZIPInputStream" --- bigfat | 4 ++-- src/bigfat/hadoop/HDFSDirInputStream.java | 5 +++-- src/bigfat/hadoop/HadoopUtils.java | 5 +++-- src/bigfat/step7/ArpaMerger.java | 4 ++-- src/bigfat/util/IOUtils.java | 4 ++-- 5 files changed, 12 insertions(+), 10 deletions(-) diff --git a/bigfat b/bigfat index f1ddec9..f030c62 100755 --- a/bigfat +++ b/bigfat @@ -39,7 +39,7 @@ case "$subtask" in # MAPREDUCE-478 - Separate map/reduce jvm opts run="hadoop jar bigfat.jar \ -Dmapred.job.map.memory.mb=3584 \ - -Dmapred.job.queue.name=m45 \ + -Dmapred.job.queue.name=default \ -Dmapred.map.tasks=$numTasks \ -Dmapred.reduce.tasks=$numTasks \ -Dmapred.map.tasks.speculative.execution=True \ @@ -52,7 +52,7 @@ case "$subtask" in *) run="hadoop jar $scriptDir/bigfat.jar \ -Dmapred.job.map.memory.mb=3584 \ - -Dmapred.job.queue.name=m45 \ + -Dmapred.job.queue.name=default \ -Dmapred.map.tasks=$numTasks \ -Dmapred.reduce.tasks=$numTasks \ -Dmapred.map.tasks.speculative.execution=True \ diff --git a/src/bigfat/hadoop/HDFSDirInputStream.java b/src/bigfat/hadoop/HDFSDirInputStream.java index 9df75ac..37c93bc 100644 --- a/src/bigfat/hadoop/HDFSDirInputStream.java +++ b/src/bigfat/hadoop/HDFSDirInputStream.java @@ -9,7 +9,6 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.Map; -import java.util.zip.GZIPInputStream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -19,6 +18,8 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import bigfat.util.MultiMemberGZIPInputStream; + /** * This file is taken from the Chaski project under the Apache license. * @@ -33,7 +34,7 @@ protected InputStream getNextStream(String file) { try { FSDataInputStream stream = fs.open(new Path(file)); if (file.endsWith(".gz")) { - return new GZIPInputStream(stream); + return new MultiMemberGZIPInputStream(stream); } else { return stream; } diff --git a/src/bigfat/hadoop/HadoopUtils.java b/src/bigfat/hadoop/HadoopUtils.java index 49666b6..e2689dd 100644 --- a/src/bigfat/hadoop/HadoopUtils.java +++ b/src/bigfat/hadoop/HadoopUtils.java @@ -14,7 +14,6 @@ import java.util.Comparator; import java.util.List; import java.util.Random; -import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; @@ -27,6 +26,8 @@ import org.apache.hadoop.mapreduce.Reducer; import bigfat.BigFatLM; +import bigfat.util.MultiMemberGZIPInputStream; +import bigfat.util.MultiMemberGZIPInputStream; import com.google.common.collect.Lists; @@ -45,7 +46,7 @@ public static void copyHdfsDirToDiskFile(Path hdfsDir, File diskFile, InputStream is = new HDFSDirInputStream(hdfsDir.toString()); if (gunzip) { - is = new GZIPInputStream(is); + is = new MultiMemberGZIPInputStream(is); } BufferedReader in = new BufferedReader(new InputStreamReader(is)); PrintWriter out = new PrintWriter(diskFile); diff --git a/src/bigfat/step7/ArpaMerger.java b/src/bigfat/step7/ArpaMerger.java index a343651..7dcce14 100644 --- a/src/bigfat/step7/ArpaMerger.java +++ b/src/bigfat/step7/ArpaMerger.java @@ -14,7 +14,6 @@ import java.util.Map; import java.util.NoSuchElementException; import java.util.PriorityQueue; -import java.util.zip.GZIPInputStream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -24,6 +23,7 @@ import org.apache.hadoop.io.compress.BZip2Codec; import bigfat.BigFatLM; +import bigfat.util.MultiMemberGZIPInputStream; import bigfat.BigFatLM.CompressionType; import bigfat.util.StringUtils; import bigfat.util.UncheckedLineIterable.UncheckedLineIterator; @@ -118,7 +118,7 @@ public int compare(DArpaLineIterator o1, DArpaLineIterator o2) { FSDataInputStream is = fs.open(stat.getPath()); InputStream compressedInputStream; if(compression == CompressionType.GZIP) { - compressedInputStream = new GZIPInputStream(is); + compressedInputStream = new MultiMemberGZIPInputStream(is); } else if(compression == CompressionType.BZIP) { compressedInputStream = new BZip2Codec().createInputStream(is); } else { diff --git a/src/bigfat/util/IOUtils.java b/src/bigfat/util/IOUtils.java index 5a75273..46206f0 100644 --- a/src/bigfat/util/IOUtils.java +++ b/src/bigfat/util/IOUtils.java @@ -6,13 +6,13 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; -import java.util.zip.GZIPInputStream; +import bigfat.util.MultiMemberGZIPInputStream; public class IOUtils { public static BufferedReader getBufferedReader(File file) throws FileNotFoundException, IOException { if (file.getName().endsWith(".gz")) { - return new BufferedReader(new InputStreamReader(new GZIPInputStream( + return new BufferedReader(new InputStreamReader(new MultiMemberGZIPInputStream( new FileInputStream(file)))); } else { return new BufferedReader(new InputStreamReader(new FileInputStream(file))); From e90705fa26ca5a6e2328a8d2ff194d7a42718cec Mon Sep 17 00:00:00 2001 From: Ageliver Date: Thu, 24 May 2018 09:43:36 +0800 Subject: [PATCH 3/5] deal with "EOFEXCEPTION", cause by GZIPInputStream --- .../util/MultiMemberGZIPInputStream.java | 93 +++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 src/bigfat/util/MultiMemberGZIPInputStream.java diff --git a/src/bigfat/util/MultiMemberGZIPInputStream.java b/src/bigfat/util/MultiMemberGZIPInputStream.java new file mode 100644 index 0000000..e24af26 --- /dev/null +++ b/src/bigfat/util/MultiMemberGZIPInputStream.java @@ -0,0 +1,93 @@ +package bigfat.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.PushbackInputStream; +import java.util.zip.GZIPInputStream; + +public class MultiMemberGZIPInputStream extends GZIPInputStream { + public MultiMemberGZIPInputStream(InputStream in, int size) throws IOException + { + // Wrap the stream in a PushbackInputStream… + super(new PushbackInputStream(in, size), size); + this.size=size; + } + + public MultiMemberGZIPInputStream(InputStream in) throws IOException + { + // Wrap the stream in a PushbackInputStream… + super(new PushbackInputStream(in, 1024)); + this.size=-1; + } + + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent) throws IOException + { + super(parent.in); + this.size=-1; + this.parent=parent.parent==null ? parent : parent.parent; + this.parent.child=this; + } + + private MultiMemberGZIPInputStream(MultiMemberGZIPInputStream parent, int size) throws IOException + { + super(parent.in, size); + this.size=size; + this.parent=parent.parent==null ? parent : parent.parent; + this.parent.child=this; + } + + private MultiMemberGZIPInputStream parent; + private MultiMemberGZIPInputStream child; + private int size; + private boolean cvte_eos; + + public int read(byte[] inputBuffer, int inputBufferOffset, int inputBufferLen) throws IOException { + + if (cvte_eos) { return -1;} + if (this.child!=null) + return this.child.read(inputBuffer, inputBufferOffset, inputBufferLen); + + int charsRead=super.read(inputBuffer, inputBufferOffset, inputBufferLen); + if (charsRead==-1) + { + // Push any remaining buffered data back onto the stream + // If the stream is then not empty, use it to construct + // a new instance of this class and delegate this and any + // future calls to it… + int n = inf.getRemaining() - 8; + if (n > 0) + { + // More than 8 bytes remaining in deflater + // First 8 are gzip trailer. Add the rest to + // any un-read data… + ((PushbackInputStream)this.in).unread(buf, len-n, n); + } + else + { + // Nothing in the buffer. We need to know whether or not + // there is unread data available in the underlying stream + // since the base class will not handle an empty file. + // Read a byte to see if there is data and if so, + // push it back onto the stream… + byte[] b=new byte[1]; + int ret=in.read(b,0,1); + if (ret==-1) + { + cvte_eos=true; + return -1; + } + else + ((PushbackInputStream)this.in).unread(b, 0, 1); + } + + MultiMemberGZIPInputStream child; + if (this.size==-1) + child=new MultiMemberGZIPInputStream(this); + else + child=new MultiMemberGZIPInputStream(this, this.size); + return child.read(inputBuffer, inputBufferOffset, inputBufferLen); + } + else + return charsRead; + } +} From 60f021b8699aaaf58c266c62879f7cb6c38d9591 Mon Sep 17 00:00:00 2001 From: Ageliver Date: Thu, 24 May 2018 10:40:29 +0800 Subject: [PATCH 4/5] fix the runing environment descrption --- README | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README b/README index 369f138..d243f92 100644 --- a/README +++ b/README @@ -1,7 +1,7 @@ BigFatLM V0.1 By Jonathan Clark Carnegie-Mellon University -- School of Computer Science -- Language Technologies Institute -Released: March 31, 2011 +Released: May 24, 2018 Liscence: LGPL 3.0 "Embiggen your language models" @@ -34,7 +34,7 @@ and found to be within floating point error. REQUIREMENTS * Java 6+ -* A Hadoop cluster running Hadoop 20.1+ or, for much slower results, a local install of Hadoop. +* A Hadoop cluster running Hadoop 2.8.3+ or, for much slower results, a local install of Hadoop. * If you wish to build BigFatLM from source, you'll need the Apache Ant build system * If you wish to filter the language model, you should also install KenLM's filter tool. See http://kheafield.com/code/mt/filter.html. From 6c3c5463edeed690e719879a71892de1399bd064 Mon Sep 17 00:00:00 2001 From: Ageliver Date: Tue, 12 Jun 2018 15:31:20 +0800 Subject: [PATCH 5/5] fix "hadoop-streaming.jar" path --- filter-lm.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filter-lm.sh b/filter-lm.sh index edb967c..d314eee 100755 --- a/filter-lm.sh +++ b/filter-lm.sh @@ -63,8 +63,8 @@ EOF )>$localTmpDir/filter-mapper.sh - frun="hadoop jar $HADOOP_HOME/hadoop-streaming.jar \ - -Dmapred.job.queue.name=m45 \ + frun="hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming.jar \ + -Dmapred.job.queue.name=default \ -Dmapred.map.tasks=$numTasks \ -Dmapred.reduce.tasks=$numTasks \ -Dmapred.map.tasks.speculative.execution=True \