Skip to content

Commit

Permalink
[FLINK-37021][state/forst] Fix incorrect local path when reusing and …
Browse files Browse the repository at this point in the history
…creating file
  • Loading branch information
yinhan.yh committed Jan 22, 2025
1 parent dccb782 commit b62907e
Show file tree
Hide file tree
Showing 16 changed files with 134 additions and 321 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,8 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.state.forst.datatransfer.DataTransferStrategy;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.StringifiedForStFileSystem;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -109,12 +107,6 @@ public final class ForStResourceContainer implements AutoCloseable {
/** The ForSt file system. Null when remote dir is not set. */
@Nullable private ForStFlinkFileSystem forStFileSystem;

private final RecoveryClaimMode claimMode;

private final FileOwnershipDecider fileOwnershipDecider;

private DataTransferStrategy dataTransferStrategy;

/**
* The shared resource among ForSt instances. This resource is not part of the 'handlesToClose',
* because the handles to close are closed quietly, whereas for this one, we want exceptions to
Expand Down Expand Up @@ -196,8 +188,6 @@ public ForStResourceContainer(
this.remoteBasePath = remoteBasePath;
this.remoteForStPath =
remoteBasePath != null ? new Path(remoteBasePath, DB_DIR_STRING) : null;
this.claimMode = claimMode;
this.fileOwnershipDecider = new FileOwnershipDecider(claimMode);

this.enableStatistics = enableStatistics;
this.handlesToClose = new ArrayList<>();
Expand Down Expand Up @@ -397,7 +387,6 @@ public void prepareDirectories() throws Exception {
ForStFlinkFileSystem.get(
remoteForStPath.toUri(),
localForStPath,
fileOwnershipDecider,
ForStFlinkFileSystem.getFileBasedCache(
cacheBasePath, cacheCapacity, cacheReservedSize, metricGroup));
} else {
Expand All @@ -409,10 +398,6 @@ public void prepareDirectories() throws Exception {
return forStFileSystem;
}

public DataTransferStrategy getDataTransferStrategy() {
return dataTransferStrategy;
}

private static void prepareDirectories(Path basePath, Path dbPath) throws IOException {
FileSystem fileSystem = basePath.getFileSystem();
if (fileSystem.exists(basePath)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.filemapping.FileBackedMappingEntrySource;
import org.apache.flink.state.forst.fs.filemapping.HandleBackedMappingEntrySource;
import org.apache.flink.state.forst.fs.filemapping.FileOwnership;
import org.apache.flink.state.forst.fs.filemapping.MappingEntry;
import org.apache.flink.state.forst.fs.filemapping.MappingEntrySource;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -78,7 +76,6 @@ public HandleAndLocalPath transferToCheckpoint(
}

return copyFileToCheckpoint(
dbFileSystem,
dbFilePath,
maxTransferBytes,
checkpointStreamFactory,
Expand All @@ -92,97 +89,85 @@ public void transferFromCheckpoint(
StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry closeableRegistry)
throws IOException {
LOG.trace("Copy file from checkpoint: {}, {}, {}", sourceHandle, targetPath, dbFileSystem);
copyFileFromCheckpoint(dbFileSystem, sourceHandle, targetPath, closeableRegistry);
copyFileFromCheckpoint(sourceHandle, targetPath, closeableRegistry);
}

@Override
public String toString() {
return "CopyDataTransferStrategy{" + ", dbFileSystem=" + dbFileSystem + '}';
}

private static HandleAndLocalPath copyFileToCheckpoint(
FileSystem dbFileSystem,
Path filePath,
private HandleAndLocalPath copyFileToCheckpoint(
Path dbFilePath,
long maxTransferBytes,
CheckpointStreamFactory checkpointStreamFactory,
CheckpointedStateScope stateScope,
CloseableRegistry closeableRegistry,
CloseableRegistry tmpResourcesRegistry)
throws IOException {
StreamStateHandle handleByDuplicating =
duplicateFileToCheckpoint(
dbFileSystem, filePath, checkpointStreamFactory, stateScope);
if (handleByDuplicating != null) {
LOG.trace("Duplicate file to checkpoint: {} {}", filePath, handleByDuplicating);
return HandleAndLocalPath.of(handleByDuplicating, filePath.getName());

// Get State handle for the DB file
StreamStateHandle sourceStateHandle;
if (dbFileSystem instanceof ForStFlinkFileSystem) {
// Obtain the state handle stored in MappingEntry
// or Construct a FileStateHandle base on the source file
MappingEntry mappingEntry =
((ForStFlinkFileSystem) dbFileSystem).getMappingEntry(dbFilePath);
Preconditions.checkNotNull(mappingEntry, "dbFile not found: " + dbFilePath);
sourceStateHandle = mappingEntry.getSource().toStateHandle();
if (mappingEntry.getFileOwnership() == FileOwnership.NOT_OWNED) {
// The file is already owned by JM, simply return the state handle
return HandleAndLocalPath.of(sourceStateHandle, dbFilePath.getName());
}
} else {
// Construct a FileStateHandle base on the DB file
FileSystem sourceFileSystem = dbFilePath.getFileSystem();
long fileLength = sourceFileSystem.getFileStatus(dbFilePath).getLen();
sourceStateHandle = new FileStateHandle(dbFilePath, fileLength);
}

// Try path-copying first. If failed, fallback to bytes-copying
StreamStateHandle targetStateHandle =
tryPathCopyingToCheckpoint(sourceStateHandle, checkpointStreamFactory, stateScope);
if (targetStateHandle != null) {
LOG.trace("Path-copy file to checkpoint: {} {}", dbFilePath, targetStateHandle);
} else {
targetStateHandle =
bytesCopyingToCheckpoint(
dbFilePath,
maxTransferBytes,
checkpointStreamFactory,
stateScope,
closeableRegistry,
tmpResourcesRegistry);
LOG.trace("Bytes-copy file to checkpoint: {}, {}", dbFilePath, targetStateHandle);
}

HandleAndLocalPath handleAndLocalPath =
HandleAndLocalPath.of(
writeFileToCheckpoint(
dbFileSystem,
filePath,
maxTransferBytes,
checkpointStreamFactory,
stateScope,
closeableRegistry,
tmpResourcesRegistry),
filePath.getName());
LOG.trace("Write file to checkpoint: {}, {}", filePath, handleAndLocalPath.getHandle());
return handleAndLocalPath;
return HandleAndLocalPath.of(targetStateHandle, dbFilePath.getName());
}

/**
* Duplicate file to checkpoint storage by calling {@link CheckpointStreamFactory#duplicate} if
* possible.
*/
private static @Nullable StreamStateHandle duplicateFileToCheckpoint(
FileSystem dbFileSystem,
Path filePath,
private @Nullable StreamStateHandle tryPathCopyingToCheckpoint(
@Nonnull StreamStateHandle sourceHandle,
CheckpointStreamFactory checkpointStreamFactory,
CheckpointedStateScope stateScope)
throws IOException {

StreamStateHandle stateHandle = getStateHandle(dbFileSystem, filePath);

if (!checkpointStreamFactory.canFastDuplicate(stateHandle, stateScope)) {
if (!checkpointStreamFactory.canFastDuplicate(sourceHandle, stateScope)) {
return null;
}

List<StreamStateHandle> result =
checkpointStreamFactory.duplicate(
Collections.singletonList(stateHandle), stateScope);
Collections.singletonList(sourceHandle), stateScope);
return result.get(0);
}

private static StreamStateHandle getStateHandle(FileSystem dbFileSystem, Path filePath)
throws IOException {
Path sourceFilePath = filePath;
if (dbFileSystem instanceof ForStFlinkFileSystem) {
MappingEntry mappingEntry =
((ForStFlinkFileSystem) dbFileSystem).getMappingEntry(filePath);
Preconditions.checkNotNull(
mappingEntry, "File mapping entry not found for %s", filePath);

MappingEntrySource source = mappingEntry.getSource();
if (source instanceof HandleBackedMappingEntrySource) {
// return the state handle stored in MappingEntry
return ((HandleBackedMappingEntrySource) source).getStateHandle();
} else {
// use file path stored in MappingEntry
sourceFilePath = ((FileBackedMappingEntrySource) source).getFilePath();
}
}

// construct a FileStateHandle base on source file
FileSystem sourceFileSystem = sourceFilePath.getFileSystem();
long fileLength = sourceFileSystem.getFileStatus(sourceFilePath).getLen();
return new FileStateHandle(sourceFilePath, fileLength);
}

/** Write file to checkpoint storage through {@link CheckpointStateOutputStream}. */
private static @Nullable StreamStateHandle writeFileToCheckpoint(
FileSystem dbFileSystem,
private @Nullable StreamStateHandle bytesCopyingToCheckpoint(
Path filePath,
long maxTransferBytes,
CheckpointStreamFactory checkpointStreamFactory,
Expand Down Expand Up @@ -236,11 +221,8 @@ private static StreamStateHandle getStateHandle(FileSystem dbFileSystem, Path fi
}
}

private static void copyFileFromCheckpoint(
FileSystem dbFileSystem,
StreamStateHandle sourceHandle,
Path targetPath,
CloseableRegistry closeableRegistry)
private void copyFileFromCheckpoint(
StreamStateHandle sourceHandle, Path targetPath, CloseableRegistry closeableRegistry)
throws IOException {

if (closeableRegistry.isClosed()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.state.forst.StateHandleTransferSpec;
import org.apache.flink.state.forst.fs.ForStFlinkFileSystem;
import org.apache.flink.state.forst.fs.filemapping.FileOwnershipDecider;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,20 +45,25 @@ public class DataTransferStrategyBuilder {
private static final Logger LOG = LoggerFactory.getLogger(DataTransferStrategyBuilder.class);

public static DataTransferStrategy buildForSnapshot(
SnapshotType.SharingFilesStrategy sharingFilesStrategy,
@Nullable ForStFlinkFileSystem forStFlinkFileSystem,
@Nullable CheckpointStreamFactory checkpointStreamFactory) {
return buildForSnapshot(
sharingFilesStrategy,
forStFlinkFileSystem,
isDbPathUnderCheckpointPathForSnapshot(
forStFlinkFileSystem, checkpointStreamFactory));
}

@VisibleForTesting
static DataTransferStrategy buildForSnapshot(
SnapshotType.SharingFilesStrategy sharingFilesStrategy,
@Nullable ForStFlinkFileSystem forStFlinkFileSystem,
boolean isDbPathUnderCheckpointPathForSnapshot) {
DataTransferStrategy strategy;
if (forStFlinkFileSystem == null || isDbPathUnderCheckpointPathForSnapshot) {
if (sharingFilesStrategy != SnapshotType.SharingFilesStrategy.FORWARD_BACKWARD
|| forStFlinkFileSystem == null
|| !isDbPathUnderCheckpointPathForSnapshot) {
strategy =
forStFlinkFileSystem == null
? new CopyDataTransferStrategy()
Expand All @@ -67,9 +72,7 @@ static DataTransferStrategy buildForSnapshot(
return strategy;
}

strategy =
new ReusableDataTransferStrategy(
FileOwnershipDecider.getDefault(), forStFlinkFileSystem);
strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
LOG.info("Build DataTransferStrategy for Snapshot: {}", strategy);
return strategy;
}
Expand All @@ -80,17 +83,17 @@ private static boolean isDbPathUnderCheckpointPathForSnapshot(
if (forStFlinkFileSystem == null) {
return false;
}
// For checkpoint other that FsCheckpointStorageAccess, we treat it as 'DB path not under
// For checkpoint other than FsCheckpointStorageAccess, we treat it as 'DB path not under
// checkpoint path', since we cannot reuse checkpoint files in such case.
// todo: Support enabling 'cp file reuse' with FsMergingCheckpointStorageAccess
if (checkpointStreamFactory == null
|| checkpointStreamFactory.getClass() != FsCheckpointStreamFactory.class) {
|| checkpointStreamFactory.getClass() != FsCheckpointStorageLocation.class) {
return false;
}

// get checkpoint shared path
FsCheckpointStreamFactory fsCheckpointStreamFactory =
(FsCheckpointStreamFactory) checkpointStreamFactory;
FsCheckpointStorageLocation fsCheckpointStreamFactory =
(FsCheckpointStorageLocation) checkpointStreamFactory;
Path cpSharedPath = fsCheckpointStreamFactory.getTargetPath(CheckpointedStateScope.SHARED);
FileSystem cpSharedFs;
try {
Expand All @@ -101,13 +104,13 @@ private static boolean isDbPathUnderCheckpointPathForSnapshot(
}

// check if dbRemotePath is under cpSharedPath
if (!cpSharedFs.equals(forStFlinkFileSystem.getDelegateFS())) {
if (!cpSharedFs.getUri().equals(forStFlinkFileSystem.getDelegateFS().getUri())) {
// different file system
return false;
} else {
// same file system
String dbRemotePathStr = forStFlinkFileSystem.getRemoteBase();
String cpSharedPathStr = cpSharedPath.getPath();
String cpSharedPathStr = cpSharedPath.toString();
return cpSharedPathStr.equals(dbRemotePathStr)
|| dbRemotePathStr.startsWith(cpSharedPathStr);
}
Expand All @@ -121,18 +124,22 @@ public static DataTransferStrategy buildForRestore(
FileSystem cpSharedFs = getSharedStateFileSystem(specs);
if (forStFlinkFileSystem == null
|| cpSharedFs == null
|| forStFlinkFileSystem.equals(cpSharedFs)) {
|| !forStFlinkFileSystem.getUri().equals(cpSharedFs.getUri())
|| recoveryClaimMode != RecoveryClaimMode.CLAIM) {
strategy =
forStFlinkFileSystem == null
? new CopyDataTransferStrategy()
: new CopyDataTransferStrategy(forStFlinkFileSystem);
LOG.info("Build DataTransferStrategy for Restore: {}", strategy);
LOG.info(
"Build DataTransferStrategy for Restore: {}, forStFlinkFileSystem: {}, cpSharedFs:{}, recoveryClaimMode:{}",
strategy,
forStFlinkFileSystem,
cpSharedFs,
recoveryClaimMode);
return strategy;
}

strategy =
new ReusableDataTransferStrategy(
new FileOwnershipDecider(recoveryClaimMode), forStFlinkFileSystem);
strategy = new ReusableDataTransferStrategy(forStFlinkFileSystem);
LOG.info("Build DataTransferStrategy for Restore: {}", strategy);
return strategy;
}
Expand Down
Loading

0 comments on commit b62907e

Please sign in to comment.