Dsop is a dynamic streaming operator placement framework for multi-queries in edge-cloud environments
- Java 11
- Apache Flink 1.80 (modified version)
- Ray 2.40.0
Standard x86_64 setup: We use a modified version of Flink 1.80 with the ability to collect metrics at custom intervals. You should compile and deploy this modified version of Flink on every machine in your cluster.
Raspberry Pi / ARM deployment: For deploying on Raspberry Pi clusters, see DEPLOY_RASPBERRYPI.md for detailed instructions on:
- ARM-compatible dependencies (without CUDA)
- Resource-constrained configurations
- Raspberry Pi specific setup scripts
Flink Execution Plan Parser: For parsing and analyzing Flink execution plans via Python, see:
scripts/rl/flink_plan_parser.py- Core parser modulescripts/rl/flink_metrics_collector.py- Metrics collectorscripts/rl/example_flink_parser.py- Usage examples
GNN-based Execution Plan Representation: For learning graph representations of Flink execution plans using Graph Neural Networks, see:
scripts/rl/flink_execution_graph.py- Graph construction from execution plansscripts/rl/gnn_execution_planner.py- GNN models for representation learningscripts/rl/example_gnn_usage.py- Complete examplesscripts/rl/README_GNN.md- Detailed documentation
The system contains two main components: the Flink partitioner written in Java, and the reinforcement learning agent implemented with Ray RLlib.
- For Java code, point
flink.source.pathinpom.xmlto our modified Flink, then compile withmvn package. - For the RL agent, install the required Python packages in
scripts/rl/requirements.txt.
The parameters are configured in src/main/resources/params.yaml and scripts/rl/configurations.py respectively. An example
of the yaml and Python configuration file is provided in params/ and scripts/rl/configuration_pool/.
You can run the following baselines to compare with DsopEdge by setting the baseline to these values:
- FlatVec:
FlatVec - Amnis:
Amnis - PredictivePPM:
PredictivePPM - Costream:
Costream
DsopEdge processes streaming queries using a key-splitting pipeline architecture. A query in DsopEdge consists of:
- Source: Input data stream (e.g., socket source, file source)
- Partitioner: Routes records to downstream operators based on keys
- Combiner: Aggregates records within sliding windows to produce partial results
- Reducer: Merges partial results from combiners to produce final results
- Sink: Output destination for final results
Source → Partitioner → Combiner → Reducer → Sink
The system distinguishes between three types of keys:
- True Hot Keys: Keys that are routed by advanced partitioners and go through reducers
- Non-True Hot Keys: Keys that are hashed to respective reducers
- Non-Hot Keys: Keys that bypass the reducer pipeline via side output
Queries use sliding windows for aggregation:
- Window Size: Total duration of the window (e.g., 5000ms)
- Window Slide: How frequently windows are created (e.g., 1000ms)
- Metric Window: Separate window for collecting performance metrics
Example configuration:
windowSize: 5000 # 5 seconds
windowSlide: 1000 # 1 second slideThe Partitioner is the first operator in the pipeline. It routes incoming records to downstream operators based on their keys.
Base Class: Partitioner<K, R extends Record<K>>
Key Responsibilities:
- Receives records from the source
- Determines which worker/partition should process each record
- Outputs tuples of
(partitionId, record) - Tracks routing decisions and metrics
Supported Partitioner Types:
| Partitioner Name | Description | Use Case |
|---|---|---|
hash |
Simple hash-based partitioning | Baseline comparison |
cam |
Cardinality-Aware Matching partitioner | Load balancing |
dagreedy |
DAGreedy partitioner | Greedy optimization |
dalton-original |
Original Dalton partitioner | Q-learning based |
dalton-metrics |
Dalton with system metrics | Metric-aware routing |
dalton-offline |
Dalton for offline data collection | Training data generation |
saspartitioner |
SaSPartitioner (learning-based) | RL-based dynamic routing |
saspartitioner-adaptive |
Adaptive SaSPartitioner | Multi-distribution adaptation |
drifting-key |
Handles key distribution shifts | Dynamic workloads |
Example Usage:
Partitioner<Integer, WordCountRecord> partitioner =
new HashPartitioner<>(
metricCollectorPort,
numWorkers,
metricWindowNumSlides,
numMetricsPerWorker,
episodeLength,
initialPerf
);The Combiner aggregates records within sliding windows to produce partial results. It operates on partitioned data streams.
Base Class: Combiner<K, R extends Record<K>, P extends PartialResult<K>>
Key Responsibilities:
- Receives partitioned records:
(partitionId, record) - Aggregates records by key within sliding windows
- Produces partial results:
Map<K, PartialResult> - Tracks metrics per route version
- Handles hot/non-hot key classification
Key Methods:
createAccumulator(): Creates initial accumulator stateadd(value, accumulator): Adds a record to the accumulatorgetResult(accumulator): Returns the final partial resultmerge(a, b): Merges two accumulatorsgetMetrics(ioMetrics): Extracts metrics from I/O metric groupreportMetrics(...): Reports metrics for monitoring
Example Implementation (WordCountCombiner):
public class WordCountCombiner extends Combiner<Integer, WordCountRecord, WordCountState> {
@Override
public Map<Integer, WordCountState> add(
Tuple2<Integer, WordCountRecord> value,
Map<Integer, WordCountState> accumulator) {
WordCountRecord record = value.f1;
updateMetricsIfNecessary(record, subtaskIndex);
int key = record.getKey();
// Process words and update state
// ...
return accumulator;
}
@Override
protected double[] getMetrics(ManualView ioMetrics) {
return new double[] {ioMetrics.getBusyTimeMsPerSecond()};
}
}Configuration:
combinerParallelism: Number of parallel combiner instancescombinerWorkloadRatio: Workload multiplier for stress testing- Window configuration: Uses
SlidingEventTimeWindows
The Reducer merges partial results from multiple combiners to produce final results.
Base Class: Reducer<K, P extends PartialResult<K>, F extends FinalResult<K>>
Key Responsibilities:
- Receives partial results from combiners
- Merges partial results by key
- Produces final results
- Updates metrics at regular intervals
Key Methods:
createAccumulator(): Creates initial accumulatoradd(value, accumulator): Adds a partial resultgetResult(accumulator): Returns final resultmerge(a, b): Merges two accumulatorsupdateMetricsIfNecessary(): Updates metrics periodically
Example Implementation (WordCountReducer):
public class WordCountReducer extends Reducer<Integer, WordCountState, WordCountState> {
@Override
public WordCountState add(WordCountState value, WordCountState accumulator) {
updateMetricsIfNecessary();
if (accumulator != null) {
accumulator.merge(value);
} else {
accumulator = value;
}
return accumulator;
}
}Configuration:
reducerParallelism: Number of parallel reducer instances- Uses
TumblingEventTimeWindowsfor final aggregation
UniformKeyExtractor: Extracts keys for combiner partitioning
- Distributes keys uniformly across combiner instances
- Used in the combiner stage
UniformReduceKeyExtractor: Extracts keys for reducer partitioning
- Distributes keys based on their hotness and worker allocation
- Used in the reducer stage for distribution-aware routing
-
Source Stage
SingleOutputStreamOperator<WordCountRecord> source = env.addSource(sourceFunction, "SocketSource") .assignTimestampsAndWatermarks(...)
-
Partitioning Stage
SingleOutputStreamOperator<Tuple2<Integer, R>> partitioned = source.process(partitioner) .name("partitioner")
-
Combining Stage
SingleOutputStreamOperator<P> hotPartialResult = partitioned .keyBy(new UniformKeyExtractor<>(combinerParallelism)) .window(SlidingEventTimeWindows.of(windowSize, windowSlide)) .aggregate(combiner, combinerProcessWindowFunction) .setParallelism(combinerParallelism)
-
Reducing Stage
SingleOutputStreamOperator<F> hotResult = hotPartialResult .keyBy(reduceKeyExtractor) .window(TumblingEventTimeWindows.of(windowSlide)) .aggregate(reducer) .setParallelism(reducerParallelism)
-
Sink Stage
result.addSink(KeySplittingPipeline.getSink(params, discard)) .setParallelism(numWorkers)
| Parameter | Description | Example |
|---|---|---|
partitionerParallelism |
Number of partitioner instances | 4 |
combinerParallelism |
Number of combiner instances | 4 |
reducerParallelism |
Number of reducer instances | 4 |
combinerWorkloadRatio |
Workload multiplier | 1 |
windowSize |
Window size in milliseconds | 5000 |
windowSlide |
Window slide in milliseconds | 1000 |
| Parameter | Description | Example |
|---|---|---|
numKeys |
Total number of unique keys | 100 |
numHotKeys |
Number of hot keys | 10 |
numTrueHotKeys |
Number of true hot keys | 10 |
zipf |
Zipf distribution parameter | 1.5 |
dataset |
Dataset type (random, file) |
random |
| Parameter | Description | Example |
|---|---|---|
rayServerHost |
Ray server hostname | localhost |
rayServerPort |
Ray server port | 49985 |
metricCollectorPort |
Metrics collector port | 49986 |
episodeLength |
RL episode length | 100 |
numMetricsPerWorker |
Metrics per worker | 3 |
metricWindowSizeMillis |
Metric window size | 10000 |
metricWindowSlideMillis |
Metric window slide | 1000 |
initialPerf |
Initial performance estimate | 0.5 |
Dalton Partitioner:
alpha: Load balance preference (0.0-1.0)a: Q-learning learning rateepsilon: Exploration probabilitytemperature: Action selection temperaturehistorySize: Key frequency history size
SaSPartitioner:
maskIndexes: Key-to-worker mapping masksnumHotKeys: Number of hot keys to routemaskSpreadKeys: Keys without partition masks
- Extend
Combiner<K, R, P>:
public class CustomCombiner extends Combiner<Integer, CustomRecord, CustomState> {
@Override
public Map<Integer, CustomState> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<Integer, CustomState> add(
Tuple2<Integer, CustomRecord> value,
Map<Integer, CustomState> accumulator) {
// Update metrics
updateMetricsIfNecessary(value.f1, subtaskIndex);
// Process record
CustomRecord record = value.f1;
int key = record.getKey();
// ... aggregation logic
return accumulator;
}
@Override
protected double[] getMetrics(ManualView ioMetrics) {
// Return metrics array
return new double[] {ioMetrics.getBusyTimeMsPerSecond()};
}
@Override
protected void reportMetrics(
long routeVersion, int subtaskIndex,
double[] metrics, long routeDuration) {
// Log or send metrics
}
}- Extend
Reducer<K, P, F>:
public class CustomReducer extends Reducer<Integer, CustomState, CustomState> {
public CustomReducer(long metricUpdateInterval) {
super(metricUpdateInterval);
}
@Override
public CustomState createAccumulator() {
return null;
}
@Override
public CustomState add(CustomState value, CustomState accumulator) {
updateMetricsIfNecessary();
if (accumulator != null) {
accumulator.merge(value);
} else {
accumulator = value;
}
return accumulator;
}
@Override
protected void reportMetrics(int subtask, double[] metrics, long ts) {
// Log metrics
}
}// Setup pipeline
KeySplittingPipeline<Integer, WordCountRecord, WordCountState, WordCountState> pipeline =
new KeySplittingPipeline<>(
params,
partitioner,
new WordCountCombiner(workloadRatio),
new WordCountReducer(params.getWindowSize()),
reduceKeySelector,
new TypeHint<Integer>() {},
new TypeHint<WordCountState>() {},
new TypeHint<WordCountState>() {},
false // trackLatency
);
// Execute
DataStream<?> result = pipeline.transform(source);
result.addSink(KeySplittingPipeline.getSink(params, true));Similar structure but uses TDigestCombiner and TDigestReducer for percentile calculations.
-
Parallelism Configuration:
- Set parallelism based on available resources
- Balance between partitioner, combiner, and reducer parallelism
- Consider data skew when setting reducer parallelism
-
Window Configuration:
- Window size should be larger than slide for overlapping windows
- Metric windows should align with route update intervals
- Consider latency requirements when setting window sizes
-
Hot Key Management:
- Use
numHotKeysto control routing complexity - Configure
maskIndexesfor distribution-aware partitioners - Monitor key distribution shifts
- Use
-
Metrics Collection:
- Configure
metricWindowSizeMillisandmetricWindowSlideMillis - Ensure
numMetricsPerWorkermatches your metric extraction - Use
routeUpdateIntervalMillisto control update frequency
- Configure
-
Performance Tuning:
- Start with baseline partitioners (
hash) for comparison - Use
saspartitionerfor dynamic workloads - Enable latency tracking for performance analysis
- Adjust
combinerWorkloadRatiofor stress testing
- Start with baseline partitioners (
Common Issues:
- Metric Window Alignment: Ensure
metricWindowSizeMillisis a multiple ofmetricWindowSlideMillis - Key Distribution: Monitor key skew and adjust
numHotKeysaccordingly - Memory Issues: Reduce parallelism or window sizes for resource-constrained environments
- Connection Issues: Verify Ray server and metric collector ports are accessible
For more details on specific components, see: