Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Command openLocalFile #441

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kite-morphlines/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
/bin/
/target/
.class
1 change: 1 addition & 0 deletions kite-morphlines/kite-morphlines-misc/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/target/
9 changes: 9 additions & 0 deletions kite-morphlines/kite-morphlines-misc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Kite - Morphlines Miscellaneous

This module contains Morphline commands for reading files from local system.

{
openLocalFile{}
}


67 changes: 67 additions & 0 deletions kite-morphlines/kite-morphlines-misc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- Copyright 2013 Cloudera Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines</artifactId>
<version>1.1.1-SNAPSHOT</version>
</parent>

<artifactId>kite-morphlines-misc</artifactId>
<name>Kite Morphlines Miscellaneous</name>

<dependencies>
<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-core</artifactId>
</dependency>

<dependency>
<groupId>org.kitesdk</groupId>
<artifactId>kite-morphlines-core</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency> <!-- see https://github.com/typesafehub/config -->
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency> <!-- see http://www.slf4j.org -->
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2013 Cloudera Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.kitesdk.morphlines.misc;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.zip.GZIPInputStream;

import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.CommandBuilder;
import org.kitesdk.morphline.api.MorphlineContext;
import org.kitesdk.morphline.api.MorphlineRuntimeException;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.AbstractCommand;
import org.kitesdk.morphline.base.Fields;
import org.kitesdk.morphline.shaded.com.google.common.io.Closeables;
import org.kitesdk.morphline.stdio.AbstractParser;

import com.typesafe.config.Config;

/**
* Opens an Local file for read and return a corresponding InputStream.
*/
public final class OpenLocalFileBuilder implements CommandBuilder {

public OpenLocalFileBuilder() {
// TODO Auto-generated constructor stub
}

@Override
public Collection<String> getNames() {
return Collections.singletonList("openLocalFile");
}

@Override
public Command build(Config config, Command parent, Command child, MorphlineContext context) {
return new OpenLocalFile(this, config, parent, child, context);
}



///////////////////////////////////////////////////////////////////////////////
// Nested classes:
///////////////////////////////////////////////////////////////////////////////
private static final class OpenLocalFile extends AbstractCommand {
private String deafultPath = "";
public static String PATH_SEPERATOR ="/";
public OpenLocalFile(CommandBuilder builder, Config config, Command parent, Command child, MorphlineContext context) {
super(builder, config, parent, child, context);

String defaultFileSystemUri = getConfigs().getString(config, "defaultPath", null);
if (defaultFileSystemUri != null) {
deafultPath = defaultFileSystemUri;
}

validateArguments();
}

@Override
protected boolean doProcess(Record record) {
for (Object body : record.get(Fields.ATTACHMENT_BODY)) {
Record outputRecord = record.copy();
AbstractParser.removeAttachments(outputRecord);
String pathString = body.toString();
File file = new File(getFullPath(pathString));

InputStream in = null;
try {
try {
if (file.isFile() && file.exists()) {

in = new FileInputStream(file);
if (pathString.endsWith(".gz")) {
in = new GZIPInputStream(in, 64 * 1024);
}
in = new BufferedInputStream(in);
outputRecord.put(Fields.ATTACHMENT_BODY, in);
} else {
throw new MorphlineRuntimeException("Unable to read File [" + file.getAbsolutePath() + "]");
}
} catch (IOException e) {
throw new MorphlineRuntimeException(e);
}

// pass record to next command in chain:
if (!getChild().process(outputRecord)) {
return false;
}
} finally {
Closeables.closeQuietly(in);
}
}
return true;
}


private String getFullPath(String path){
if(deafultPath != null && deafultPath.trim().length()>0){
return deafultPath + ((deafultPath.endsWith(PATH_SEPERATOR)?"":PATH_SEPERATOR)) + path;
}
return path;
}
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.kitesdk.morphlines.misc;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.GZIPOutputStream;

import org.apache.commons.io.IOUtils;
import org.junit.Assert;
import org.junit.Before;
import org.kitesdk.morphline.api.Collector;
import org.kitesdk.morphline.api.Command;
import org.kitesdk.morphline.api.MorphlineContext;
import org.kitesdk.morphline.api.Record;
import org.kitesdk.morphline.base.Fields;

import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Charsets;
import com.google.common.io.Files;


public class OpenFileMorphlineTest {

private String testDirectory;
private Collector collector;

private static final String RESOURCES_DIR = "target/test-classes";

@Before
public void setUp() throws IOException {
testDirectory = Files.createTempDir().getAbsolutePath();
collector = new Collector();
}

@org.junit.Test
public void testBasic() throws IOException {
String msg = "hello world";

// setup: copy a file to HDFS to prepare inputFile
File inputFile = new File(testDirectory + "foo.txt.gz");

OutputStream out = new FileOutputStream(inputFile);
out = new GZIPOutputStream(out);
IOUtils.copy(new ByteArrayInputStream(msg.getBytes(Charsets.UTF_8)), out);
out.flush();
out.close();
Assert.assertTrue(inputFile.exists());

Command morphline = createMorphline("test-morphlines/openLocalFile");
Record record = new Record();
record.put(Fields.ATTACHMENT_BODY, inputFile.toString());
Assert.assertTrue(morphline.process(record));
Record expected = new Record();
expected.put(Fields.MESSAGE, msg);
Assert.assertEquals(expected, collector.getFirstRecord());
}

private Command createMorphline(String file) {
return new org.kitesdk.morphline.base.Compiler().compile(new File(RESOURCES_DIR + "/" + file + ".conf"), null, createMorphlineContext(), collector);
}

private MorphlineContext createMorphlineContext() {
return new MorphlineContext.Builder().setMetricRegistry(new MetricRegistry()).build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Copyright 2013 Cloudera Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]

commands : [
{
openLocalFile {}
}
{
readClob { charset : UTF-8 }
}
]
}
]