Skip to content

Commit

Permalink
Merge pull request #138 from arenadata/bugfix/ADH-5385-append-lease-p…
Browse files Browse the repository at this point in the history
…roblem

[ADH-5385] Wait for file truncation completion in copy action
  • Loading branch information
iamlapa authored Dec 10, 2024
2 parents 0a00278 + b97a828 commit d55f863
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 46 deletions.
1 change: 1 addition & 0 deletions conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -759,4 +759,5 @@
Whether to enable SQL queries statistics export.
</description>
</property>

</configuration>
80 changes: 40 additions & 40 deletions docs/supported-actions.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ public class SmartConfKeys {
"smart.action.client.cache.ttl";
public static final String SMART_ACTION_CLIENT_CACHE_TTL_DEFAULT = "10m";

public static final String ACCESS_EVENT_SOURCE_KEY = "smart.data.file.event.source";
public static final String ACCESS_EVENT_SOURCE_KEY = "smart.data.file.event.source";
public static final String ACCESS_EVENT_SOURCE_DEFAULT =
SmartServerAccessEventSource.class.getName();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@
+ CopyFileAction.OFFSET_INDEX + " $offset "
+ CopyFileAction.LENGTH + " $length "
+ CopyFileAction.BUF_SIZE + " $size "
+ CopyFileAction.PRESERVE + " $attributes"
+ CopyFileAction.PRESERVE + " $attributes "
+ CopyFileAction.TRUNCATE_WAIT_MS + " $truncateWaitMs "
+ CopyFileAction.FORCE
)
public class CopyFileAction extends CopyPreservedAttributesAction {
Expand All @@ -61,6 +62,8 @@ public class CopyFileAction extends CopyPreservedAttributesAction {
public static final String LENGTH = "-length";
public static final String COPY_CONTENT = "-copyContent";
public static final String FORCE = "-force";
public static final String TRUNCATE_WAIT_MS = "-truncateWaitMs";
public static final long SMART_ACTION_COPY_TRUNCATE_WAIT_MS_DEFAULT = 100L;
public static final Set<PreserveAttribute> DEFAULT_PRESERVE_ATTRIBUTES
= Sets.newHashSet(OWNER, GROUP, PERMISSIONS);

Expand All @@ -71,6 +74,7 @@ public class CopyFileAction extends CopyPreservedAttributesAction {
private int bufferSize;
private boolean copyContent;
private boolean fullCopyAppend;
private long truncateWaitMs;

private Set<PreserveAttribute> preserveAttributes;

Expand All @@ -83,6 +87,7 @@ public CopyFileAction() {
this.bufferSize = 64 * 1024;
this.copyContent = true;
this.fullCopyAppend = false;
this.truncateWaitMs = SMART_ACTION_COPY_TRUNCATE_WAIT_MS_DEFAULT;
}

@Override
Expand All @@ -102,6 +107,11 @@ public void init(Map<String, String> args) {
if (args.containsKey(COPY_CONTENT)) {
copyContent = Boolean.parseBoolean(args.get(COPY_CONTENT));
}

truncateWaitMs = Optional.ofNullable(args.get(TRUNCATE_WAIT_MS))
.map(Long::parseLong)
.orElse(SMART_ACTION_COPY_TRUNCATE_WAIT_MS_DEFAULT);

fullCopyAppend = args.containsKey(FORCE);
}

Expand Down Expand Up @@ -191,8 +201,10 @@ private OutputStream getOutputStream(
}

if (destFileStatus.get().getLen() != offset) {
appendLog("Truncating existing file " + destPath + " to the new length " + offset);
fileSystem.truncate(destPath, offset);
appendLog(String.format(
"Truncating existing file %s with length %d to the new length %d",
destPath, destFileStatus.get().getLen(), offset));
truncateFile(fileSystem, destPath, offset);
}

appendLog("Appending to existing file " + destPath);
Expand Down Expand Up @@ -220,4 +232,25 @@ private short getReplication(Short defaultReplication) {
? srcFileStatus.getReplication()
: defaultReplication;
}

@SuppressWarnings("BusyWait")
private void truncateFile(FileSystem fileSystem, Path path, long newLength) throws IOException {
if (fileSystem.truncate(path, newLength)) {
// return if the file is already truncated
return;
}

appendLog("Waiting for the file " + path + " to be truncated");
try {
// wait for the truncation of the blocks on datanodes
while (fileSystem.getFileStatus(path).getLen() != newLength) {
// HDFS doesn't provide any built-in mechanism for notification of truncation completion,
// so we can only use busy waiting for the file length to become expected,
// like in HDFS CLI client's truncate operation, when it's called with -w flag
Thread.sleep(truncateWaitMs);
}
} catch (Exception exception) {
throw new IOException("Error waiting for truncation completion for " + path, exception);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,7 @@ public ScheduleResult onSchedule(CmdletInfo cmdletInfo, ActionInfo actionInfo,
if (preserveAttributes != null) {
action.getArgs().put(CopyFileAction.PRESERVE, preserveAttributes);
}
action.getArgs().put(CopyFileAction.FORCE, "");
if (rateLimiter != null) {
if (rateLimiter != null) {
String strLen = getLength(fileDiff);
if (strLen != null) {
int appendLen = (int) (Long.parseLong(strLen) >> 20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,19 @@ public void testRetryUnsuccessfulAppend() throws Exception {
}
}

@Test
public void testAsyncTruncateBeforeAppend() throws Exception {
Path srcPath = new Path("/tmp/testFile.txt");
Path destPath = anotherClusterPath("/tmp", "testFileNew.txt");

DFSTestUtil.writeFile(dfs, srcPath, "Lorem ipsum dolor");
DFSTestUtil.writeFile(anotherDfs, destPath, "Lorem ipsum brrrrrrr;");

copyFile(srcPath, destPath, 8, 11);

assertFileContent(destPath, "Lorem ipsum dolor");
}

@Setter
private static class FailingDfsClient extends SmartDFSClient {
private boolean shouldFail;
Expand Down
4 changes: 4 additions & 0 deletions supports/tools/docker/multihost/conf/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@
<name>dfs.block.access.token.enable</name>
<value>true</value>
</property>
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>

<!-- Set privileged ports -->
<property>
Expand Down
1 change: 1 addition & 0 deletions supports/tools/docker/multihost/conf/smart-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -684,4 +684,5 @@
Whether to enable SQL queries statistics export.
</description>
</property>

</configuration>
4 changes: 4 additions & 0 deletions supports/tools/docker/singlehost/conf/hdfs-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@
<name>dfs.blocksize</name>
<value>1048576</value>
</property>
<property>
<name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
<value>NEVER</value>
</property>
</configuration>

0 comments on commit d55f863

Please sign in to comment.