From e4229b7661e0e23216c651a44833806d90a40aff Mon Sep 17 00:00:00 2001 From: fredia Date: Wed, 15 Jan 2025 14:57:00 +0800 Subject: [PATCH] [FLINK-37158][tests] Introduce ForSt to existing ITCases --- .../OneInputTransformation.java | 11 ++ .../sync/ForStSyncKeyedStateBackend.java | 3 +- .../checkpointing/AutoRescalingITCase.java | 4 +- .../EventTimeWindowCheckpointingITCase.java | 34 +++++ .../KeyedStateCheckpointingITCase.java | 18 +++ .../RescaleCheckpointManuallyITCase.java | 118 +++++++++++++++- .../ResumeCheckpointManuallyITCase.java | 129 +++++++++++++----- ...napshotFileMergingCompatibilityITCase.java | 10 +- 8 files changed, 283 insertions(+), 44 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java index e8a116b3f9bb97..83970919cfe3d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/transformations/OneInputTransformation.java @@ -27,6 +27,7 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; +import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator; import org.apache.flink.shaded.guava32.com.google.common.collect.Lists; @@ -185,6 +186,16 @@ public final void setChainingStrategy(ChainingStrategy strategy) { operatorFactory.setChainingStrategy(strategy); } + @Override + public void enableAsyncState() { + OneInputStreamOperator operator = + (OneInputStreamOperator) + ((SimpleOperatorFactory) operatorFactory).getOperator(); + if (!(operator instanceof AsyncStateProcessingOperator)) { + super.enableAsyncState(); + } + } + public boolean isOutputOnlyAfterEndOfStream() { return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream(); } diff --git a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java index 3159dfb45765c2..9e9c4280c0b753 100644 --- a/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java +++ b/flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/sync/ForStSyncKeyedStateBackend.java @@ -602,7 +602,8 @@ public RunnableFuture> snapshot( @Nonnull @Override public SavepointResources savepoint() throws Exception { - throw new UnsupportedOperationException("This method is not supported."); + throw new UnsupportedOperationException( + "Canonical savepoints are not supported by ForSt State Backend."); } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java index 374e53963c9519..42eb9992f89001 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AutoRescalingITCase.java @@ -116,7 +116,9 @@ public static Collection data() { new Object[][] { {"rocksdb", false}, {"rocksdb", true}, - {"hashmap", false} + {"hashmap", false}, + {"forst", false}, + {"forst", true} }); } diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java index d3369ff7598c97..ca07e3a6ab2786 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java @@ -35,6 +35,8 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; +import org.apache.flink.state.forst.ForStOptions; +import org.apache.flink.state.forst.ForStStateBackend; import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend; import org.apache.flink.state.rocksdb.RocksDBOptions; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -110,6 +112,7 @@ enum StateBackendEnum { ROCKSDB_FULL, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, + FORST_INCREMENTAL } @Parameterized.Parameters(name = "statebackend type ={0}") @@ -191,6 +194,14 @@ private Configuration getConfiguration() throws Exception { setupRocksDB(config, 16, true); break; } + case FORST_INCREMENTAL: + { + config.set( + ForStOptions.TIMER_SERVICE_FACTORY, + ForStStateBackend.PriorityQueueStateType.ForStDB); + setupForSt(config, 16); + break; + } default: throw new IllegalStateException("No backend selected."); } @@ -229,6 +240,29 @@ private void setupRocksDB( config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb); } + private void setupForSt(Configuration config, int fileSizeThreshold) throws IOException { + // Configure the managed memory size as 64MB per slot for rocksDB state backend. + config.set( + TaskManagerOptions.MANAGED_MEMORY_SIZE, + MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 64)); + + final String forstdb = tempFolder.newFolder().getAbsolutePath(); + final File backups = tempFolder.newFolder().getAbsoluteFile(); + // we use the fs backend with small threshold here to test the behaviour with file + // references, not self contained byte handles + config.set(StateBackendOptions.STATE_BACKEND, "forst"); + config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); + config.set( + CheckpointingOptions.CHECKPOINTS_DIRECTORY, + Path.fromLocalFile(backups).toUri().toString()); + if (fileSizeThreshold != -1) { + config.set( + CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, + MemorySize.parse(fileSizeThreshold + "b")); + } + config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb); + } + protected Configuration createClusterConfig() throws IOException { TemporaryFolder temporaryFolder = new TemporaryFolder(); temporaryFolder.create(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java index 150096ab43aac2..be9ba1eeac8c56 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/KeyedStateCheckpointingITCase.java @@ -25,10 +25,13 @@ import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.CheckpointingOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MemorySize; +import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.state.forst.ForStOptions; import org.apache.flink.state.rocksdb.RocksDBOptions; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.datastream.DataStream; @@ -143,6 +146,21 @@ public void testWithRocksDbBackendIncremental() throws Exception { testProgramWithBackend(env); } + @Test + public void testWithForStBackendIncremental() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.configure( + new Configuration() + .set(StateBackendOptions.STATE_BACKEND, "forst") + .set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true) + .set( + ForStOptions.LOCAL_DIRECTORIES, + tmpFolder.newFolder().getAbsolutePath())); + CheckpointStorageUtils.configureFileSystemCheckpointStorage( + env, tmpFolder.newFolder().toURI().toString()); + testProgramWithBackend(env); + } + // ------------------------------------------------------------------------ protected void testProgramWithBackend(StreamExecutionEnvironment env) throws Exception { diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java index a4f05362348684..f811c26740464a 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescaleCheckpointManuallyITCase.java @@ -19,9 +19,12 @@ package org.apache.flink.test.checkpointing; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.functions.OpenContext; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.SinkWriter; import org.apache.flink.api.connector.sink2.WriterInitContext; @@ -40,8 +43,10 @@ import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.testutils.junit.SharedObjects; import org.apache.flink.testutils.junit.SharedReference; @@ -56,8 +61,12 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.Optional; @@ -74,6 +83,7 @@ * NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer to RescalingITCase, * because the static fields in these classes can not be shared. */ +@RunWith(Parameterized.class) public class RescaleCheckpointManuallyITCase extends TestLogger { private static final int NUM_TASK_MANAGERS = 2; @@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends TestLogger { @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + @Parameterized.Parameter(0) + public String statebackendType; + + @Parameterized.Parameter(1) + public boolean enableAsyncState; + + @Parameterized.Parameters(name = "statebackend type ={0}, enableAsyncState={1}") + public static Collection parameter() { + return Arrays.asList( + new Object[][] { + {"forst", true}, {"forst", false}, {"rocksdb", true}, {"rocksdb", false} + }); + } + @Before public void setup() throws Exception { Configuration config = new Configuration(); - config.set(StateBackendOptions.STATE_BACKEND, "rocksdb"); + config.set(StateBackendOptions.STATE_BACKEND, statebackendType); config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); cluster = @@ -263,7 +287,7 @@ private JobGraph createJobGraphWithKeyedState( SharedReference jobID = sharedObjects.add(new JobID()); SharedReference miniClusterRef = sharedObjects.add(miniCluster); - DataStream input = + KeyedStream input = env.addSource( new NotifyingDefiniteKeySource( numberKeys, numberElements, failAfterEmission) { @@ -300,10 +324,18 @@ public Integer getKey(Integer value) { return value; } }); - DataStream> result = - input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect)); + if (enableAsyncState) { + input.enableAsyncState(); + DataStream> result = + input.flatMap(new AsyncSubtaskIndexFlatMapper(numberElementsExpect)); - result.sinkTo(new CollectionSink<>()); + result.sinkTo(new CollectionSink<>()); + } else { + DataStream> result = + input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect)); + + result.sinkTo(new CollectionSink<>()); + } return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get()); } @@ -349,8 +381,9 @@ public void run(SourceContext ctx) throws Exception { } else { boolean newCheckpoint = false; long waited = 0L; + running = false; // maximum wait 5min - while (!newCheckpoint && waited < 30000L) { + while (!newCheckpoint && waited < 300000L) { synchronized (ctx.getCheckpointLock()) { newCheckpoint = waitCheckpointCompleted(); } @@ -423,6 +456,79 @@ public void initializeState(FunctionInitializationContext context) throws Except } } + private static class AsyncSubtaskIndexFlatMapper + extends RichFlatMapFunction> + implements CheckpointedFunction { + + private static final long serialVersionUID = 1L; + + private transient org.apache.flink.api.common.state.v2.ValueState counter; + private transient org.apache.flink.api.common.state.v2.ValueState sum; + + private final int numberElements; + + public AsyncSubtaskIndexFlatMapper(int numberElements) { + this.numberElements = numberElements; + } + + @Override + public void flatMap(Integer value, Collector> out) + throws Exception { + StateFuture counterFuture = + counter.asyncValue() + .thenCompose( + (Integer c) -> { + int updated = c == null ? 1 : c + 1; + return counter.asyncUpdate(updated) + .thenApply(nothing -> updated); + }); + StateFuture sumFuture = + sum.asyncValue() + .thenCompose( + (Integer s) -> { + int updated = s == null ? value : s + value; + return sum.asyncUpdate(updated) + .thenApply(nothing -> updated); + }); + + counterFuture.thenCombine( + sumFuture, + (c, s) -> { + if (c == numberElements) { + out.collect( + Tuple2.of( + getRuntimeContext() + .getTaskInfo() + .getIndexOfThisSubtask(), + s)); + } + return null; + }); + } + + @Override + public void snapshotState(FunctionSnapshotContext context) throws Exception { + // all managed, nothing to do. + } + + @Override + public void initializeState(FunctionInitializationContext context) throws Exception {} + + @Override + public void open(OpenContext openContext) throws Exception { + counter = + ((StreamingRuntimeContext) getRuntimeContext()) + .getValueState( + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( + "counter", BasicTypeInfo.INT_TYPE_INFO)); + sum = + ((StreamingRuntimeContext) getRuntimeContext()) + .getValueState( + new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>( + "sum", BasicTypeInfo.INT_TYPE_INFO)); + } + } + private static class CollectionSink implements Sink { private static final ConcurrentHashMap> writers = diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 1f847ec887b8d5..98d4205daac5c7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.runtime.testutils.ZooKeeperTestUtils; +import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; @@ -82,9 +83,17 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { @Parameterized.Parameter public RecoveryClaimMode recoveryClaimMode; - @Parameterized.Parameters(name = "RecoveryClaimMode = {0}") - public static Object[] parameters() { - return RecoveryClaimMode.values(); + @Parameterized.Parameter(1) + public boolean enableAsync; + + @Parameterized.Parameters(name = "RecoveryClaimMode = {0}, enableAsync = {1}") + public static Object[][] parameters() { + return new Object[][] { + {RecoveryClaimMode.CLAIM, true}, + {RecoveryClaimMode.CLAIM, true}, + {RecoveryClaimMode.NO_CLAIM, false}, + {RecoveryClaimMode.NO_CLAIM, true} + }; } @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -97,7 +106,8 @@ public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exc null, createRocksDBStateBackend(checkpointDir, true), false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } @Test @@ -108,7 +118,8 @@ public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception null, createRocksDBStateBackend(checkpointDir, false), false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } @Test @@ -120,7 +131,8 @@ public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStanda null, createRocksDBStateBackend(checkpointDir, true), true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } @Test @@ -132,21 +144,32 @@ public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() null, createRocksDBStateBackend(checkpointDir, false), true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } @Test public void testExternalizedFSCheckpointsStandalone() throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createFsStateBackend(checkpointDir), false, recoveryClaimMode); + checkpointDir, + null, + createFsStateBackend(checkpointDir), + false, + recoveryClaimMode, + enableAsync); } @Test public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception { final File checkpointDir = temporaryFolder.newFolder(); testExternalizedCheckpoints( - checkpointDir, null, createFsStateBackend(checkpointDir), true, recoveryClaimMode); + checkpointDir, + null, + createFsStateBackend(checkpointDir), + true, + recoveryClaimMode, + enableAsync); } @Test @@ -158,7 +181,8 @@ public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exce zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, true), false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -171,7 +195,8 @@ public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception { zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, false), false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -185,7 +210,8 @@ public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookee zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, true), true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -199,7 +225,8 @@ public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() zkServer.getConnectString(), createRocksDBStateBackend(checkpointDir, false), true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -212,7 +239,8 @@ public void testExternalizedFSCheckpointsZookeeper() throws Exception { zkServer.getConnectString(), createFsStateBackend(checkpointDir), false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -225,7 +253,8 @@ public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exc zkServer.getConnectString(), createFsStateBackend(checkpointDir), true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -241,7 +270,8 @@ public void testExternalizedSwitchRocksDBCheckpointsStandalone() throws Exceptio newStateBackendConfig, previousStateBackendConfig, false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } @Test @@ -257,7 +287,8 @@ public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryStandalone( newStateBackendConfig, previousStateBackendConfig, true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } @Test @@ -274,7 +305,8 @@ public void testExternalizedSwitchRocksDBCheckpointsZookeeper() throws Exception newStateBackendConfig, previousStateBackendConfig, false, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -293,7 +325,8 @@ public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryZookeeper() newStateBackendConfig, previousStateBackendConfig, true, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } } @@ -320,7 +353,8 @@ private static void testExternalizedCheckpoints( String zooKeeperQuorum, Configuration configuration, boolean localRecovery, - RecoveryClaimMode recoveryClaimMode) + RecoveryClaimMode recoveryClaimMode, + boolean enableAsync) throws Exception { testExternalizedCheckpoints( checkpointDir, @@ -329,7 +363,8 @@ private static void testExternalizedCheckpoints( configuration, configuration, localRecovery, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); } private static void testExternalizedCheckpoints( @@ -339,7 +374,8 @@ private static void testExternalizedCheckpoints( Configuration config2, Configuration config3, boolean localRecovery, - RecoveryClaimMode recoveryClaimMode) + RecoveryClaimMode recoveryClaimMode, + boolean enableAsync) throws Exception { final Configuration config = new Configuration(); @@ -378,12 +414,17 @@ private static void testExternalizedCheckpoints( try { // main test sequence: start job -> eCP -> restore job -> eCP -> restore job String firstExternalCheckpoint = - runJobAndGetExternalizedCheckpoint(config1, null, cluster, recoveryClaimMode); + runJobAndGetExternalizedCheckpoint( + config1, null, cluster, recoveryClaimMode, enableAsync); assertNotNull(firstExternalCheckpoint); String secondExternalCheckpoint = runJobAndGetExternalizedCheckpoint( - config2, firstExternalCheckpoint, cluster, recoveryClaimMode); + config2, + firstExternalCheckpoint, + cluster, + recoveryClaimMode, + enableAsync); assertNotNull(secondExternalCheckpoint); String thirdExternalCheckpoint = @@ -396,7 +437,8 @@ private static void testExternalizedCheckpoints( ? secondExternalCheckpoint : firstExternalCheckpoint, cluster, - recoveryClaimMode); + recoveryClaimMode, + enableAsync); assertNotNull(thirdExternalCheckpoint); } finally { cluster.after(); @@ -407,11 +449,18 @@ private static String runJobAndGetExternalizedCheckpoint( Configuration configuration, @Nullable String externalCheckpoint, MiniClusterWithClientResource cluster, - RecoveryClaimMode recoveryClaimMode) + RecoveryClaimMode recoveryClaimMode, + boolean enableAsync) throws Exception { // complete at least two checkpoints so that the initial checkpoint can be subsumed return runJobAndGetExternalizedCheckpoint( - externalCheckpoint, cluster, recoveryClaimMode, configuration, 2, true); + externalCheckpoint, + cluster, + recoveryClaimMode, + configuration, + 2, + true, + enableAsync); } static String runJobAndGetExternalizedCheckpoint( @@ -420,10 +469,16 @@ static String runJobAndGetExternalizedCheckpoint( RecoveryClaimMode recoveryClaimMode, Configuration jobConfig, int consecutiveCheckpoints, - boolean retainCheckpoints) + boolean retainCheckpoints, + boolean enableAsync) throws Exception { JobGraph initialJobGraph = - getJobGraph(externalCheckpoint, recoveryClaimMode, jobConfig, retainCheckpoints); + getJobGraph( + externalCheckpoint, + recoveryClaimMode, + jobConfig, + retainCheckpoints, + enableAsync); NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); cluster.getClusterClient().submitJob(initialJobGraph).get(); @@ -447,7 +502,8 @@ private static JobGraph getJobGraph( @Nullable String externalCheckpoint, RecoveryClaimMode recoveryClaimMode, Configuration jobConfig, - boolean retainCheckpoints) { + boolean retainCheckpoints, + boolean enableAsync) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(jobConfig); @@ -460,9 +516,16 @@ private static JobGraph getJobGraph( : ExternalizedCheckpointRetention.DELETE_ON_CANCELLATION); RestartStrategyUtils.configureNoRestartStrategy(env); - env.addSource(new NotifyingInfiniteTupleSource(10_000)) - .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) - .keyBy(x -> x.f0) + KeyedStream, String> keyedStream = + env.addSource(new NotifyingInfiniteTupleSource(10_000)) + .assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()) + .keyBy(x -> x.f0); + + if (enableAsync) { + keyedStream.enableAsyncState(); + } + + keyedStream .window(TumblingEventTimeWindows.of(Duration.ofSeconds(3))) .reduce((value1, value2) -> Tuple2.of(value1.f0, value1.f1 + value2.f1)) .filter(value -> value.f0.startsWith("Tuple 0")); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java index 171204888c9455..e4f0c6ec0d7423 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java @@ -130,7 +130,8 @@ private void testSwitchingFileMerging( recoveryClaimMode, config, consecutiveCheckpoint, - true); + true, + false); assertThat(firstCheckpoint).isNotNull(); firstMetadata = TestUtils.loadCheckpointMetadata(firstCheckpoint); verifyStateHandleType(firstMetadata, firstFileMergingSwitch); @@ -157,7 +158,8 @@ private void testSwitchingFileMerging( recoveryClaimMode, config, consecutiveCheckpoint, - true); + true, + false); assertThat(secondCheckpoint).isNotNull(); secondMetadata = TestUtils.loadCheckpointMetadata(secondCheckpoint); verifyStateHandleType(secondMetadata, secondFileMergingSwitch); @@ -189,7 +191,8 @@ private void testSwitchingFileMerging( recoveryClaimMode, config, consecutiveCheckpoint, - true); + true, + false); assertThat(thirdCheckpoint).isNotNull(); thirdMetadata = TestUtils.loadCheckpointMetadata(thirdCheckpoint); verifyStateHandleType(thirdMetadata, secondFileMergingSwitch); @@ -221,6 +224,7 @@ private void testSwitchingFileMerging( recoveryClaimMode, config, consecutiveCheckpoint, + false, false); assertThat(fourthCheckpoint).isNotNull(); verifyCheckpointExistOrWaitDeleted(