Skip to content

Commit

Permalink
[FLINK-37158][tests] Introduce ForSt to existing ITCases
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia committed Jan 17, 2025
1 parent b3e7f9b commit e4229b7
Show file tree
Hide file tree
Showing 8 changed files with 283 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessingOperator;

import org.apache.flink.shaded.guava32.com.google.common.collect.Lists;

Expand Down Expand Up @@ -185,6 +186,16 @@ public final void setChainingStrategy(ChainingStrategy strategy) {
operatorFactory.setChainingStrategy(strategy);
}

@Override
public void enableAsyncState() {
OneInputStreamOperator<IN, OUT> operator =
(OneInputStreamOperator<IN, OUT>)
((SimpleOperatorFactory<OUT>) operatorFactory).getOperator();
if (!(operator instanceof AsyncStateProcessingOperator)) {
super.enableAsyncState();
}
}

public boolean isOutputOnlyAfterEndOfStream() {
return operatorFactory.getOperatorAttributes().isOutputOnlyAfterEndOfStream();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,8 @@ public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(
@Nonnull
@Override
public SavepointResources<K> savepoint() throws Exception {
throw new UnsupportedOperationException("This method is not supported.");
throw new UnsupportedOperationException(
"Canonical savepoints are not supported by ForSt State Backend.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,9 @@ public static Collection<Object[]> data() {
new Object[][] {
{"rocksdb", false},
{"rocksdb", true},
{"hashmap", false}
{"hashmap", false},
{"forst", false},
{"forst", true}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -110,6 +112,7 @@ enum StateBackendEnum {
ROCKSDB_FULL,
ROCKSDB_INCREMENTAL,
ROCKSDB_INCREMENTAL_ZK,
FORST_INCREMENTAL
}

@Parameterized.Parameters(name = "statebackend type ={0}")
Expand Down Expand Up @@ -191,6 +194,14 @@ private Configuration getConfiguration() throws Exception {
setupRocksDB(config, 16, true);
break;
}
case FORST_INCREMENTAL:
{
config.set(
ForStOptions.TIMER_SERVICE_FACTORY,
ForStStateBackend.PriorityQueueStateType.ForStDB);
setupForSt(config, 16);
break;
}
default:
throw new IllegalStateException("No backend selected.");
}
Expand Down Expand Up @@ -229,6 +240,29 @@ private void setupRocksDB(
config.set(RocksDBOptions.LOCAL_DIRECTORIES, rocksDb);
}

private void setupForSt(Configuration config, int fileSizeThreshold) throws IOException {
// Configure the managed memory size as 64MB per slot for rocksDB state backend.
config.set(
TaskManagerOptions.MANAGED_MEMORY_SIZE,
MemorySize.ofMebiBytes(PARALLELISM / NUM_OF_TASK_MANAGERS * 64));

final String forstdb = tempFolder.newFolder().getAbsolutePath();
final File backups = tempFolder.newFolder().getAbsoluteFile();
// we use the fs backend with small threshold here to test the behaviour with file
// references, not self contained byte handles
config.set(StateBackendOptions.STATE_BACKEND, "forst");
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
config.set(
CheckpointingOptions.CHECKPOINTS_DIRECTORY,
Path.fromLocalFile(backups).toUri().toString());
if (fileSizeThreshold != -1) {
config.set(
CheckpointingOptions.FS_SMALL_FILE_THRESHOLD,
MemorySize.parse(fileSizeThreshold + "b"));
}
config.set(ForStOptions.LOCAL_DIRECTORIES, forstdb);
}

protected Configuration createClusterConfig() throws IOException {
TemporaryFolder temporaryFolder = new TemporaryFolder();
temporaryFolder.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.state.forst.ForStOptions;
import org.apache.flink.state.rocksdb.RocksDBOptions;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
Expand Down Expand Up @@ -143,6 +146,21 @@ public void testWithRocksDbBackendIncremental() throws Exception {
testProgramWithBackend(env);
}

@Test
public void testWithForStBackendIncremental() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.configure(
new Configuration()
.set(StateBackendOptions.STATE_BACKEND, "forst")
.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true)
.set(
ForStOptions.LOCAL_DIRECTORIES,
tmpFolder.newFolder().getAbsolutePath()));
CheckpointStorageUtils.configureFileSystemCheckpointStorage(
env, tmpFolder.newFolder().toURI().toString());
testProgramWithBackend(env);
}

// ------------------------------------------------------------------------

protected void testProgramWithBackend(StreamExecutionEnvironment env) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.flink.test.checkpointing;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.state.v2.StateFuture;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
Expand All @@ -40,8 +43,10 @@
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.testutils.junit.SharedReference;
Expand All @@ -56,8 +61,12 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Optional;
Expand All @@ -74,6 +83,7 @@
* NotifyingDefiniteKeySource, SubtaskIndexFlatMapper and CollectionSink refer to RescalingITCase,
* because the static fields in these classes can not be shared.
*/
@RunWith(Parameterized.class)
public class RescaleCheckpointManuallyITCase extends TestLogger {

private static final int NUM_TASK_MANAGERS = 2;
Expand All @@ -84,10 +94,24 @@ public class RescaleCheckpointManuallyITCase extends TestLogger {

@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();

@Parameterized.Parameter(0)
public String statebackendType;

@Parameterized.Parameter(1)
public boolean enableAsyncState;

@Parameterized.Parameters(name = "statebackend type ={0}, enableAsyncState={1}")
public static Collection<Object[]> parameter() {
return Arrays.asList(
new Object[][] {
{"forst", true}, {"forst", false}, {"rocksdb", true}, {"rocksdb", false}
});
}

@Before
public void setup() throws Exception {
Configuration config = new Configuration();
config.set(StateBackendOptions.STATE_BACKEND, "rocksdb");
config.set(StateBackendOptions.STATE_BACKEND, statebackendType);
config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);

cluster =
Expand Down Expand Up @@ -263,7 +287,7 @@ private JobGraph createJobGraphWithKeyedState(

SharedReference<JobID> jobID = sharedObjects.add(new JobID());
SharedReference<MiniCluster> miniClusterRef = sharedObjects.add(miniCluster);
DataStream<Integer> input =
KeyedStream<Integer, Integer> input =
env.addSource(
new NotifyingDefiniteKeySource(
numberKeys, numberElements, failAfterEmission) {
Expand Down Expand Up @@ -300,10 +324,18 @@ public Integer getKey(Integer value) {
return value;
}
});
DataStream<Tuple2<Integer, Integer>> result =
input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect));
if (enableAsyncState) {
input.enableAsyncState();
DataStream<Tuple2<Integer, Integer>> result =
input.flatMap(new AsyncSubtaskIndexFlatMapper(numberElementsExpect));

result.sinkTo(new CollectionSink<>());
result.sinkTo(new CollectionSink<>());
} else {
DataStream<Tuple2<Integer, Integer>> result =
input.flatMap(new SubtaskIndexFlatMapper(numberElementsExpect));

result.sinkTo(new CollectionSink<>());
}

return env.getStreamGraph().getJobGraph(env.getClass().getClassLoader(), jobID.get());
}
Expand Down Expand Up @@ -349,8 +381,9 @@ public void run(SourceContext<Integer> ctx) throws Exception {
} else {
boolean newCheckpoint = false;
long waited = 0L;
running = false;
// maximum wait 5min
while (!newCheckpoint && waited < 30000L) {
while (!newCheckpoint && waited < 300000L) {
synchronized (ctx.getCheckpointLock()) {
newCheckpoint = waitCheckpointCompleted();
}
Expand Down Expand Up @@ -423,6 +456,79 @@ public void initializeState(FunctionInitializationContext context) throws Except
}
}

private static class AsyncSubtaskIndexFlatMapper
extends RichFlatMapFunction<Integer, Tuple2<Integer, Integer>>
implements CheckpointedFunction {

private static final long serialVersionUID = 1L;

private transient org.apache.flink.api.common.state.v2.ValueState<Integer> counter;
private transient org.apache.flink.api.common.state.v2.ValueState<Integer> sum;

private final int numberElements;

public AsyncSubtaskIndexFlatMapper(int numberElements) {
this.numberElements = numberElements;
}

@Override
public void flatMap(Integer value, Collector<Tuple2<Integer, Integer>> out)
throws Exception {
StateFuture<Integer> counterFuture =
counter.asyncValue()
.thenCompose(
(Integer c) -> {
int updated = c == null ? 1 : c + 1;
return counter.asyncUpdate(updated)
.thenApply(nothing -> updated);
});
StateFuture<Integer> sumFuture =
sum.asyncValue()
.thenCompose(
(Integer s) -> {
int updated = s == null ? value : s + value;
return sum.asyncUpdate(updated)
.thenApply(nothing -> updated);
});

counterFuture.thenCombine(
sumFuture,
(c, s) -> {
if (c == numberElements) {
out.collect(
Tuple2.of(
getRuntimeContext()
.getTaskInfo()
.getIndexOfThisSubtask(),
s));
}
return null;
});
}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// all managed, nothing to do.
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {}

@Override
public void open(OpenContext openContext) throws Exception {
counter =
((StreamingRuntimeContext) getRuntimeContext())
.getValueState(
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
"counter", BasicTypeInfo.INT_TYPE_INFO));
sum =
((StreamingRuntimeContext) getRuntimeContext())
.getValueState(
new org.apache.flink.api.common.state.v2.ValueStateDescriptor<>(
"sum", BasicTypeInfo.INT_TYPE_INFO));
}
}

private static class CollectionSink<IN> implements Sink<IN> {

private static final ConcurrentHashMap<JobID, CollectionSinkWriter<?>> writers =
Expand Down
Loading

0 comments on commit e4229b7

Please sign in to comment.