diff --git a/kite-morphlines/.gitignore b/kite-morphlines/.gitignore new file mode 100644 index 0000000000..db63eb984d --- /dev/null +++ b/kite-morphlines/.gitignore @@ -0,0 +1,3 @@ +/bin/ +/target/ +.class diff --git a/kite-morphlines/kite-morphlines-misc/.gitignore b/kite-morphlines/kite-morphlines-misc/.gitignore new file mode 100644 index 0000000000..b83d22266a --- /dev/null +++ b/kite-morphlines/kite-morphlines-misc/.gitignore @@ -0,0 +1 @@ +/target/ diff --git a/kite-morphlines/kite-morphlines-misc/README.md b/kite-morphlines/kite-morphlines-misc/README.md new file mode 100644 index 0000000000..173ffb754a --- /dev/null +++ b/kite-morphlines/kite-morphlines-misc/README.md @@ -0,0 +1,9 @@ +# Kite - Morphlines Miscellaneous + +This module contains Morphline commands for reading files from local system. + +{ + openLocalFile{} +} + + diff --git a/kite-morphlines/kite-morphlines-misc/pom.xml b/kite-morphlines/kite-morphlines-misc/pom.xml new file mode 100644 index 0000000000..bc4c77fdb5 --- /dev/null +++ b/kite-morphlines/kite-morphlines-misc/pom.xml @@ -0,0 +1,67 @@ + + + + 4.0.0 + + + org.kitesdk + kite-morphlines + 1.1.1-SNAPSHOT + + + kite-morphlines-misc + Kite Morphlines Miscellaneous + + + + org.kitesdk + kite-morphlines-core + + + + org.kitesdk + kite-morphlines-core + test-jar + test + + + + com.typesafe + config + + + + org.apache.commons + commons-io + 1.3.2 + + + + junit + junit + test + + + org.slf4j + slf4j-api + + + + org.slf4j + slf4j-log4j12 + test + + + diff --git a/kite-morphlines/kite-morphlines-misc/src/main/java/org/kitesdk/morphlines/misc/OpenLocalFileBuilder.java b/kite-morphlines/kite-morphlines-misc/src/main/java/org/kitesdk/morphlines/misc/OpenLocalFileBuilder.java new file mode 100644 index 0000000000..615758b8c1 --- /dev/null +++ b/kite-morphlines/kite-morphlines-misc/src/main/java/org/kitesdk/morphlines/misc/OpenLocalFileBuilder.java @@ -0,0 +1,124 @@ +/* + * Copyright 2013 Cloudera Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.kitesdk.morphlines.misc; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collection; +import java.util.Collections; +import java.util.zip.GZIPInputStream; + +import org.kitesdk.morphline.api.Command; +import org.kitesdk.morphline.api.CommandBuilder; +import org.kitesdk.morphline.api.MorphlineContext; +import org.kitesdk.morphline.api.MorphlineRuntimeException; +import org.kitesdk.morphline.api.Record; +import org.kitesdk.morphline.base.AbstractCommand; +import org.kitesdk.morphline.base.Fields; +import org.kitesdk.morphline.shaded.com.google.common.io.Closeables; +import org.kitesdk.morphline.stdio.AbstractParser; + +import com.typesafe.config.Config; + +/** + * Opens an Local file for read and return a corresponding InputStream. + */ +public final class OpenLocalFileBuilder implements CommandBuilder { + + public OpenLocalFileBuilder() { + // TODO Auto-generated constructor stub + } + + @Override + public Collection getNames() { + return Collections.singletonList("openLocalFile"); + } + + @Override + public Command build(Config config, Command parent, Command child, MorphlineContext context) { + return new OpenLocalFile(this, config, parent, child, context); + } + + + + /////////////////////////////////////////////////////////////////////////////// + // Nested classes: + /////////////////////////////////////////////////////////////////////////////// + private static final class OpenLocalFile extends AbstractCommand { + private String deafultPath = ""; + public static String PATH_SEPERATOR ="/"; + public OpenLocalFile(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) { + super(builder, config, parent, child, context); + + String defaultFileSystemUri = getConfigs().getString(config, "defaultPath", null); + if (defaultFileSystemUri != null) { + deafultPath = defaultFileSystemUri; + } + + validateArguments(); + } + + @Override + protected boolean doProcess(Record record) { + for (Object body : record.get(Fields.ATTACHMENT_BODY)) { + Record outputRecord = record.copy(); + AbstractParser.removeAttachments(outputRecord); + String pathString = body.toString(); + File file = new File(getFullPath(pathString)); + + InputStream in = null; + try { + try { + if (file.isFile() && file.exists()) { + + in = new FileInputStream(file); + if (pathString.endsWith(".gz")) { + in = new GZIPInputStream(in, 64 * 1024); + } + in = new BufferedInputStream(in); + outputRecord.put(Fields.ATTACHMENT_BODY, in); + } else { + throw new MorphlineRuntimeException("Unable to read File [" + file.getAbsolutePath() + "]"); + } + } catch (IOException e) { + throw new MorphlineRuntimeException(e); + } + + // pass record to next command in chain: + if (!getChild().process(outputRecord)) { + return false; + } + } finally { + Closeables.closeQuietly(in); + } + } + return true; + } + + + private String getFullPath(String path){ + if(deafultPath != null && deafultPath.trim().length()>0){ + return deafultPath + ((deafultPath.endsWith(PATH_SEPERATOR)?"":PATH_SEPERATOR)) + path; + } + return path; + } + } + + +} diff --git a/kite-morphlines/kite-morphlines-misc/src/test/java/org/kitesdk/morphlines/misc/OpenFileMorphlineTest.java b/kite-morphlines/kite-morphlines-misc/src/test/java/org/kitesdk/morphlines/misc/OpenFileMorphlineTest.java new file mode 100644 index 0000000000..0ae9768d58 --- /dev/null +++ b/kite-morphlines/kite-morphlines-misc/src/test/java/org/kitesdk/morphlines/misc/OpenFileMorphlineTest.java @@ -0,0 +1,67 @@ +package org.kitesdk.morphlines.misc; + +import java.io.ByteArrayInputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.util.zip.GZIPOutputStream; + +import org.apache.commons.io.IOUtils; +import org.junit.Assert; +import org.junit.Before; +import org.kitesdk.morphline.api.Collector; +import org.kitesdk.morphline.api.Command; +import org.kitesdk.morphline.api.MorphlineContext; +import org.kitesdk.morphline.api.Record; +import org.kitesdk.morphline.base.Fields; + +import com.codahale.metrics.MetricRegistry; +import com.google.common.base.Charsets; +import com.google.common.io.Files; + + +public class OpenFileMorphlineTest { + + private String testDirectory; + private Collector collector; + + private static final String RESOURCES_DIR = "target/test-classes"; + + @Before + public void setUp() throws IOException { + testDirectory = Files.createTempDir().getAbsolutePath(); + collector = new Collector(); + } + + @org.junit.Test + public void testBasic() throws IOException { + String msg = "hello world"; + + // setup: copy a file to HDFS to prepare inputFile + File inputFile = new File(testDirectory + "foo.txt.gz"); + + OutputStream out = new FileOutputStream(inputFile); + out = new GZIPOutputStream(out); + IOUtils.copy(new ByteArrayInputStream(msg.getBytes(Charsets.UTF_8)), out); + out.flush(); + out.close(); + Assert.assertTrue(inputFile.exists()); + + Command morphline = createMorphline("test-morphlines/openLocalFile"); + Record record = new Record(); + record.put(Fields.ATTACHMENT_BODY, inputFile.toString()); + Assert.assertTrue(morphline.process(record)); + Record expected = new Record(); + expected.put(Fields.MESSAGE, msg); + Assert.assertEquals(expected, collector.getFirstRecord()); + } + + private Command createMorphline(String file) { + return new org.kitesdk.morphline.base.Compiler().compile(new File(RESOURCES_DIR + "/" + file + ".conf"), null, createMorphlineContext(), collector); + } + + private MorphlineContext createMorphlineContext() { + return new MorphlineContext.Builder().setMetricRegistry(new MetricRegistry()).build(); + } +} diff --git a/kite-morphlines/kite-morphlines-misc/src/test/resources/test-morphlines/openLocalFile.conf b/kite-morphlines/kite-morphlines-misc/src/test/resources/test-morphlines/openLocalFile.conf new file mode 100644 index 0000000000..54bd88a02e --- /dev/null +++ b/kite-morphlines/kite-morphlines-misc/src/test/resources/test-morphlines/openLocalFile.conf @@ -0,0 +1,29 @@ +# Copyright 2013 Cloudera Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +morphlines : [ + { + id : morphline1 + importCommands : ["org.kitesdk.**"] + + commands : [ + { + openLocalFile {} + } + { + readClob { charset : UTF-8 } + } + ] + } +]