Skip to content

Commit

Permalink
[FLINK-37140][runtime] Create async timer service when restore async …
Browse files Browse the repository at this point in the history
…operator (#25993)
  • Loading branch information
fredia authored Jan 17, 2025
1 parent 6b4385e commit 33de4ea
Show file tree
Hide file tree
Showing 10 changed files with 211 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
Expand Down Expand Up @@ -90,9 +90,7 @@ public abstract class AbstractAsyncStateStreamOperator<OUT> extends AbstractStre

/** Initialize necessary state components for {@link AbstractStreamOperator}. */
@Override
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
public final void beforeInitializeStateHandler() {
KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null);
if (stateStore instanceof DefaultKeyedStateStore) {
((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2();
Expand Down Expand Up @@ -293,12 +291,11 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
InternalTimerService<N> service =
keyedTimeServiceHandler.getInternalTimerService(
name, keySerializer, namespaceSerializer, triggerable);
((InternalTimerServiceAsyncImpl<K, N>) service).setup(asyncExecutionController);
return service;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AsyncStateProcessing;
Expand Down Expand Up @@ -92,9 +92,7 @@ public AbstractAsyncStateStreamOperatorV2(

/** Initialize necessary state components for {@link AbstractStreamOperatorV2}. */
@Override
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
super.initializeState(streamTaskStateManager);
public final void beforeInitializeStateHandler() {
KeyedStateStore stateStore = stateHandler.getKeyedStateStore().orElse(null);
if (stateStore instanceof DefaultKeyedStateStore) {
((DefaultKeyedStateStore) stateStore).setSupportKeyedStateApiSetV2();
Expand Down Expand Up @@ -258,12 +256,11 @@ public <K, N> InternalTimerService<N> getInternalTimerService(
checkState(keySerializer != null, "Timers can only be used on keyed operators.");
// A {@link RecordContext} will be set as the current processing context to preserve record
// order when the given {@link Triggerable} is invoked.
return keyedTimeServiceHandler.getAsyncInternalTimerService(
name,
keySerializer,
namespaceSerializer,
triggerable,
(AsyncExecutionController<K>) asyncExecutionController);
InternalTimerService<N> service =
keyedTimeServiceHandler.getInternalTimerService(
name, keySerializer, namespaceSerializer, triggerable);
((InternalTimerServiceAsyncImpl<K, N>) service).setup(asyncExecutionController);
return service;
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,11 @@ public OperatorMetricGroup getMetricGroup() {
return metrics;
}

/** Initialize necessary state components before initializing state components. */
protected void beforeInitializeStateHandler() {}

@Override
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {

final TypeSerializer<?> keySerializer =
Expand Down Expand Up @@ -296,6 +299,8 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();

beforeInitializeStateHandler();
stateHandler.initializeOperatorState(this);
runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,11 @@ public OperatorMetricGroup getMetricGroup() {
return metrics;
}

/** Initialize necessary state components before initializing state components. */
protected void beforeInitializeStateHandler() {}

@Override
public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager)
throws Exception {
final TypeSerializer<?> keySerializer =
config.getStateKeySerializer(getUserCodeClassloader());
Expand All @@ -225,6 +228,8 @@ public void initializeState(StreamTaskStateInitializer streamTaskStateManager)
isAsyncStateProcessingEnabled()
? context.asyncInternalTimerServiceManager()
: context.internalTimerServiceManager();

beforeInitializeStateHandler();
stateHandler.initializeOperatorState(this);

if (useSplittableTimers()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -65,21 +64,6 @@ <N> InternalTimerService<N> getInternalTimerService(
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable);

/**
* Creates an {@link InternalTimerServiceAsyncImpl} for handling a group of timers identified by
* the given {@code name}. The timers are scoped to a key and namespace. Mainly used by async
* operators.
*
* <p>Some essential order preservation will be added when the given {@link Triggerable} is
* invoked.
*/
<N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController);

/**
* Advances the Watermark of all managed {@link InternalTimerService timer services},
* potentially firing event time timers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
Expand All @@ -39,6 +40,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
Expand Down Expand Up @@ -76,6 +79,8 @@ public class InternalTimeServiceManagerImpl<K> implements InternalTimeServiceMan

private final Map<String, InternalTimerServiceImpl<K, ?>> timerServices;

@Nullable AsyncExecutionController<K> asyncExecutionController;

private InternalTimeServiceManagerImpl(
TaskIOMetricGroup taskIOMetricGroup,
KeyGroupRange localKeyGroupRange,
Expand Down Expand Up @@ -162,65 +167,31 @@ <N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(
InternalTimerServiceImpl<K, N> timerService =
(InternalTimerServiceImpl<K, N>) timerServices.get(name);
if (timerService == null) {

timerService =
new InternalTimerServiceImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);

timerServices.put(name, timerService);
}
return timerService;
}

@Override
public <N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController) {
checkNotNull(keySerializer, "Timers can only be used on keyed operators.");

// the following casting is to overcome type restrictions.
TimerSerializer<K, N> timerSerializer =
new TimerSerializer<>(keySerializer, namespaceSerializer);

InternalTimerServiceAsyncImpl<K, N> timerService =
registerOrGetAsyncTimerService(name, timerSerializer, asyncExecutionController);

timerService.startTimerService(
timerSerializer.getKeySerializer(),
timerSerializer.getNamespaceSerializer(),
triggerable);

return timerService;
}

<N> InternalTimerServiceAsyncImpl<K, N> registerOrGetAsyncTimerService(
String name,
TimerSerializer<K, N> timerSerializer,
AsyncExecutionController<K> asyncExecutionController) {
InternalTimerServiceAsyncImpl<K, N> timerService =
(InternalTimerServiceAsyncImpl<K, N>) timerServices.get(name);
if (timerService == null) {
timerService =
new InternalTimerServiceAsyncImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext,
asyncExecutionController);
if (priorityQueueSetFactory instanceof AsyncKeyedStateBackend) {
timerService =
new InternalTimerServiceAsyncImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(
EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);
} else {
timerService =
new InternalTimerServiceImpl<>(
taskIOMetricGroup,
localKeyGroupRange,
keyContext,
processingTimeService,
createTimerPriorityQueue(
PROCESSING_TIMER_PREFIX + name, timerSerializer),
createTimerPriorityQueue(
EVENT_TIMER_PREFIX + name, timerSerializer),
cancellationContext);
}

timerServices.put(name, timerService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.ThrowingRunnable;

Expand Down Expand Up @@ -54,8 +55,7 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp
ProcessingTimeService processingTimeService,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> processingTimeTimersQueue,
KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>> eventTimeTimersQueue,
StreamTaskCancellationContext cancellationContext,
AsyncExecutionController<K> asyncExecutionController) {
StreamTaskCancellationContext cancellationContext) {
super(
taskIOMetricGroup,
localKeyGroupRange,
Expand All @@ -64,11 +64,17 @@ public class InternalTimerServiceAsyncImpl<K, N> extends InternalTimerServiceImp
processingTimeTimersQueue,
eventTimeTimersQueue,
cancellationContext);
this.asyncExecutionController = asyncExecutionController;
}

public void setup(AsyncExecutionController<K> asyncExecutionController) {
if (asyncExecutionController != null) {
this.asyncExecutionController = asyncExecutionController;
}
}

@Override
void onProcessingTime(long time) throws Exception {
Preconditions.checkNotNull(asyncExecutionController);
// null out the timer in case the Triggerable calls registerProcessingTimeTimer()
// inside the callback.
nextTimer = null;
Expand Down Expand Up @@ -99,6 +105,7 @@ void onProcessingTime(long time) throws Exception {
*/
@Override
public void advanceWatermark(long time) throws Exception {
Preconditions.checkNotNull(asyncExecutionController);
currentWatermark = time;
InternalTimer<K, N> timer;
while ((timer = eventTimeTimersQueue.peek()) != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators.sorted.state;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
Expand Down Expand Up @@ -74,17 +73,6 @@ public <N> InternalTimerService<N> getInternalTimerService(
return timerService;
}

@Override
public <N> InternalTimerService<N> getAsyncInternalTimerService(
String name,
TypeSerializer<K> keySerializer,
TypeSerializer<N> namespaceSerializer,
Triggerable<K, N> triggerable,
AsyncExecutionController<K> asyncExecutionController) {
throw new UnsupportedOperationException(
"Async timer service is not supported in BATCH execution mode.");
}

@Override
public void advanceWatermark(Watermark watermark) {
if (watermark.getTimestamp() == Long.MAX_VALUE) {
Expand Down
Loading

0 comments on commit 33de4ea

Please sign in to comment.