Skip to content

Commit

Permalink
Merge pull request #136 from arenadata/feature/ADH-5330-full-copy-append
Browse files Browse the repository at this point in the history
[ADH-5330] Support full file re-copy instead of append for schemes not supporting appends
  • Loading branch information
iamlapa authored Nov 26, 2024
2 parents 6df567a + 3fae225 commit 6a0df3e
Show file tree
Hide file tree
Showing 14 changed files with 313 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
<scala.version>2.11.12</scala.version>
<scala.binary.version>2.11</scala.binary.version>
<mockito.version>2.28.2</mockito.version>
<testcontainers.version>1.19.1</testcontainers.version>
<testcontainers.version>1.20.4</testcontainers.version>
<bouncycastle.version>1.68</bouncycastle.version>
<protobuf.version>3.7.1</protobuf.version>
<dbunitVersion>2.5.3</dbunitVersion>
Expand Down
11 changes: 4 additions & 7 deletions smart-common/src/main/java/org/smartdata/utils/PathUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

public class PathUtil {
private static final String DIR_SEP = "/";
private static final String[] GLOBS = new String[] {
private static final String[] GLOBS = new String[]{
"*", "?"
};

Expand Down Expand Up @@ -100,14 +100,11 @@ public static Optional<String> getScheme(Path path) {
.map(URI::getScheme);
}

public static FileSystem getRemoteFileSystem(String path) throws IOException {
return getRemoteFileSystem(new Path(path));
}

// todo we use default HDFS config, add mechanism to provide
// hdfs config paths for remote clusters
public static FileSystem getRemoteFileSystem(Path path) throws IOException {
return path.getFileSystem(toRemoteClusterConfig(new Configuration()));
public static FileSystem getRemoteFileSystem(
Path path, Configuration configuration) throws IOException {
return path.getFileSystem(toRemoteClusterConfig(configuration));
}

public static String getRawPath(Path path) {
Expand Down
16 changes: 16 additions & 0 deletions smart-frontend/app/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4351,6 +4351,13 @@ __metadata:
languageName: node
linkType: hard

"antlr4@npm:4.13.2":
version: 4.13.2
resolution: "antlr4@npm:4.13.2"
checksum: 10c0/dfe7dcb24fe99ce103e1eac3ce30558a3df713cf5deeacd7e30686f5cca8206864e49b254af3bd5eff5904d31e5cd7902a7468c13dcb619da10caf3f703d03b4
languageName: node
linkType: hard

"anymatch@npm:~3.1.2":
version: 3.1.3
resolution: "anymatch@npm:3.1.3"
Expand Down Expand Up @@ -8729,6 +8736,13 @@ __metadata:
languageName: node
linkType: hard

"monaco-editor@npm:0.52.0":
version: 0.52.0
resolution: "monaco-editor@npm:0.52.0"
checksum: 10c0/962c75272568eb1556835f0097da2c7768bffe1116b252bb8b3aefd34f31e2dc46baa27e3911c0a6617bf556e6d6802e6e9f0f5548edd832c33872a8a0c146d3
languageName: node
linkType: hard

"ms@npm:2.0.0":
version: 2.0.0
resolution: "ms@npm:2.0.0"
Expand Down Expand Up @@ -10548,6 +10562,7 @@ __metadata:
"@typescript-eslint/eslint-plugin": "npm:7.2.0"
"@typescript-eslint/parser": "npm:7.2.0"
"@vitejs/plugin-react": "npm:4.2.1"
antlr4: "npm:4.13.2"
axios: "npm:1.7.2"
classnames: "npm:2.5.1"
date-fns: "npm:3.6.0"
Expand All @@ -10565,6 +10580,7 @@ __metadata:
husky: "npm:9.0.11"
js-base64: "npm:3.7.7"
lint-staged: "npm:15.2.2"
monaco-editor: "npm:0.52.0"
prettier: "npm:3.2.5"
qs: "npm:6.12.1"
react: "npm:18.2.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.smartdata.conf.SmartConfKeys;
import org.smartdata.model.CmdletDescriptor;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Optional;

Expand Down Expand Up @@ -101,7 +100,7 @@ protected Path getPathArg(String key) {

protected FileSystem getFileSystemFor(Path path) throws IOException {
return isAbsoluteRemotePath(path)
? getRemoteFileSystem(path)
? getRemoteFileSystem(path, getConf())
: localFileSystem;
}

Expand All @@ -112,7 +111,7 @@ protected boolean isArgPresent(String key) {
protected Optional<FileStatus> getFileStatus(FileSystem fileSystem, Path path) throws IOException {
try {
return Optional.of(fileSystem.getFileStatus(path));
} catch (FileNotFoundException e) {
} catch (IOException e) {
return Optional.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@
* A MiniCluster for action test.
*/
public abstract class MiniClusterHarness {
public static int DEFAULT_BLOCK_SIZE = 50;
public static final int DEFAULT_BLOCK_SIZE = 50;

protected MiniDFSCluster cluster;
protected DistributedFileSystem dfs;
protected DFSClient dfsClient;
Expand All @@ -65,7 +66,7 @@ public void init() throws Exception {
smartContext = new SmartContext(conf);
}

static void initConf(Configuration conf) {
protected void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE);
conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
Expand Down
7 changes: 7 additions & 0 deletions smart-hadoop-support/smart-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,13 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>minio</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.smartdata</groupId>
<artifactId>smart-metastore</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@
+ CopyFileAction.LENGTH + " $length "
+ CopyFileAction.BUF_SIZE + " $size "
+ CopyFileAction.PRESERVE + " $attributes"
+ CopyFileAction.FORCE
)
public class CopyFileAction extends CopyPreservedAttributesAction {
public static final String BUF_SIZE = "-bufSize";
public static final String DEST_PATH = "-dest";
public static final String OFFSET_INDEX = "-offset";
public static final String LENGTH = "-length";
public static final String COPY_CONTENT = "-copyContent";
public static final String FORCE = "-force";
public static final Set<PreserveAttribute> DEFAULT_PRESERVE_ATTRIBUTES
= Sets.newHashSet(OWNER, GROUP, PERMISSIONS);

Expand All @@ -68,6 +70,7 @@ public class CopyFileAction extends CopyPreservedAttributesAction {
private long length;
private int bufferSize;
private boolean copyContent;
private boolean fullCopyAppend;

private Set<PreserveAttribute> preserveAttributes;

Expand All @@ -79,6 +82,7 @@ public CopyFileAction() {
this.length = 0;
this.bufferSize = 64 * 1024;
this.copyContent = true;
this.fullCopyAppend = false;
}

@Override
Expand All @@ -98,6 +102,7 @@ public void init(Map<String, String> args) {
if (args.containsKey(COPY_CONTENT)) {
copyContent = Boolean.parseBoolean(args.get(COPY_CONTENT));
}
fullCopyAppend = args.containsKey(FORCE);
}

@Override
Expand Down Expand Up @@ -141,6 +146,25 @@ private void copyWithOffset(
FileSystem srcFileSystem,
FileSystem destFileSystem,
int bufferSize, long offset, long length) throws IOException {
try {
copyWithOffsetInternal(srcFileSystem, destFileSystem, bufferSize, offset, length);
} catch (UnsupportedOperationException unsupportedOperationException) {
if (fullCopyAppend && offset != 0) {
appendLog(
String.format("Seems like target FS doesn't support appends. "
+ "Trying to copy the entire source file of size %s", srcFileStatus.getLen()));

copySingleFile(srcFileSystem, destFileSystem);
return;
}
throw unsupportedOperationException;
}
}

private void copyWithOffsetInternal(
FileSystem srcFileSystem,
FileSystem destFileSystem,
int bufferSize, long offset, long length) throws IOException {
appendLog(
String.format("Copy with offset %s and length %s", offset, length));

Expand All @@ -156,8 +180,6 @@ private void copyWithOffset(
}
}

// TODO add action option to fully re-copy the file in case if fs
// doesn't support appends
private OutputStream getOutputStream(
FileSystem fileSystem, long offset) throws IOException {
Optional<FileStatus> destFileStatus = getFileStatus(fileSystem, destPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ protected void execute() throws Exception {
Path targetPath = getTargetFile();
if (isRemoteMode()) {
preRemoteExecute();
execute(getRemoteFileSystem(targetPath));
execute(getRemoteFileSystem(targetPath, getConf()));
} else {
preLocalExecute();
execute(localFileSystem);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public ChecksumFileEqualityStrategy(Configuration config) {

private FileSystem getFileSystem(Path path, Configuration conf) throws IOException {
return isAbsoluteRemotePath(path)
? getRemoteFileSystem(path)
? getRemoteFileSystem(path, conf)
: FileSystem.get(HadoopUtil.getNameNodeUri(conf), conf);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo,
if (preserveAttributes != null) {
action.getArgs().put(CopyFileAction.PRESERVE, preserveAttributes);
}
action.getArgs().put(CopyFileAction.FORCE, "");
if (rateLimiter != null) {
String strLen = getLength(fileDiff);
if (strLen != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.smartdata.hdfs;

import org.testcontainers.containers.MinIOContainer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class SmartMinIOContainer extends MinIOContainer {
private static final int MINIO_UI_PORT = 9001;

private List<String> buckets;

public SmartMinIOContainer(String dockerImageName) {
super(dockerImageName);
this.buckets = new ArrayList<>();
}

public SmartMinIOContainer withBuckets(String... buckets) {
this.buckets = Arrays.asList(buckets);
return this;
}

@Override
public void configure() {
super.configure();

if (buckets.isEmpty()) {
return;
}

withCreateContainerCmdModifier(cmd -> {
cmd.withEntrypoint("sh");
cmd.withCmd("-c", createCmd());
});
}

@Override
public SmartMinIOContainer withPassword(String password) {
return (SmartMinIOContainer) super.withPassword(password);
}

@Override
public SmartMinIOContainer withUserName(String userName) {
return (SmartMinIOContainer) super.withUserName(userName);
}

private String createBucketDirs() {
return buckets.stream()
.map(bucket -> "/tmp/buckets/" + bucket)
.collect(Collectors.joining(" "));
}

private String createCmd() {
return "mkdir -p " + createBucketDirs()
+ " && /usr/bin/docker-entrypoint.sh minio server "
+ "--console-address :" + MINIO_UI_PORT
+ " /tmp/buckets";
}
}
Loading

0 comments on commit 6a0df3e

Please sign in to comment.