Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Flink] Optimization of Kafka Source Busy Problem #8374

Draft
wants to merge 4 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api.source;

/**
* An {@code InputStatus} indicates the availability of data from an asynchronous input. When asking
* an asynchronous input to produce data, it returns this status to indicate how to proceed.
*
* <p>When the input returns {@link InputStatus#NOTHING_AVAILABLE} it means that no data is
* available at this time, but more will (most likely) be available in the future. The asynchronous
* input will typically offer to register a <i>Notifier</i> or to obtain a <i>Future</i> that will
* signal the availability of new data.
*
* <p>When the input returns {@link InputStatus#MORE_AVAILABLE}, it can be immediately asked again
* to produce more data. That readers from the asynchronous input can bypass subscribing to a
* Notifier or a Future for efficiency.
*
* <p>When the input returns {@link InputStatus#END_OF_INPUT}, then no data will be available again
* from this input. It has reached the end of its bounded data.
*/
public enum InputStatus {

/**
* Indicator that more data is available and the input can be called immediately again to
* produce more data.
*/
MORE_AVAILABLE,

/**
* Indicator that no data is currently available, but more data will be available in the future
* again.
*/
NOTHING_AVAILABLE,

/** Indicator that the input has reached the end of data. */
END_OF_INPUT
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* The {@link SourceReader} is used to generate source record, and it will be running at worker.
Expand Down Expand Up @@ -51,6 +52,17 @@ public interface SourceReader<T, SplitT extends SourceSplit>
*/
void pollNext(Collector<T> output) throws Exception;

/**
* Generate the next batch of records.
*
* @param output output collector.
* @return InputStatus
* @throws Exception if error occurs.
*/
default InputStatus pollNextV2(Collector<T> output) throws Exception {
return null;
};

/**
* Get the current split checkpoint state by checkpointId.
*
Expand All @@ -62,6 +74,9 @@ public interface SourceReader<T, SplitT extends SourceSplit>
*/
List<SplitT> snapshotState(long checkpointId) throws Exception;

default CompletableFuture<Void> isAvailable() {
return CompletableFuture.completedFuture(null);
}
/**
* Add the split checkpoint state to reader.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordEmitter;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.seatunnel.format.compatible.debezium.json.CompatibleDebeziumJsonDeserializationSchema;

import io.debezium.relational.TableId;
Expand All @@ -69,8 +70,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand Down Expand Up @@ -158,8 +157,8 @@ public SourceReader<T, SourceSplitBase> createReader(SourceReader.Context reader
throws Exception {
// create source config for the given subtask (e.g. unique server id)
C sourceConfig = configFactory.create(readerContext.getIndexOfSubtask());
BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new LinkedBlockingQueue<>(2);
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue =
new FutureCompletingBlockingQueue<>(2);

SchemaChangeResolver schemaChangeResolver = deserializationSchema.getSchemaChangeResolver();
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.cdc.base.source.reader;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.InputStatus;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
Expand All @@ -39,6 +40,7 @@
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderOptions;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.synchronization.FutureCompletingBlockingQueue;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -47,7 +49,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -79,7 +80,7 @@ public class IncrementalSourceReader<T, C extends SourceConfig>

public IncrementalSourceReader(
DataSourceDialect<C> dataSourceDialect,
BlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> elementsQueue,
Supplier<IncrementalSourceSplitReader<C>> splitReaderSupplier,
RecordEmitter<SourceRecords, T, SourceSplitStateBase> recordEmitter,
SourceReaderOptions options,
Expand Down Expand Up @@ -116,10 +117,19 @@ public void pollNext(Collector<T> output) throws Exception {
log.info("Reader {} send NoMoreElement event", context.getIndexOfSubtask());
context.signalNoMoreElement();
} else {
super.pollNext(output);
super.pollNextV2(output);
}
}

@Override
public InputStatus pollNextV2(Collector<T> output) throws Exception {
// TODO We are currently using the old version here, and will need to switch to the new
// version interface to support FutureCompletingBlockingQueue in the future
this.pollNext(output);
// Return empty because the old version is stateless
return null;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
dataSourceDialect.commitChangeLogOffset(snapshotChangeLogOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.synchronization.FutureCompletingBlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.function.Supplier;

/**
Expand All @@ -45,15 +44,15 @@ public SingleThreadMultiplexSourceReaderBase(
SourceReaderOptions options,
SourceReader.Context context) {
this(
new ArrayBlockingQueue<>(options.getElementQueueCapacity()),
new FutureCompletingBlockingQueue<>(options.getElementQueueCapacity()),
splitReaderSupplier,
recordEmitter,
options,
context);
}

public SingleThreadMultiplexSourceReaderBase(
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
SourceReaderOptions options,
Expand All @@ -67,7 +66,7 @@ public SingleThreadMultiplexSourceReaderBase(
}

public SingleThreadMultiplexSourceReaderBase(
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
SourceReaderOptions options,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@

import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.InputStatus;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.fetcher.SplitFetcherManager;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.synchronization.FutureCompletingBlockingQueue;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
Expand All @@ -35,7 +37,6 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

Expand All @@ -55,20 +56,20 @@
@Slf4j
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
public final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final ConcurrentMap<String, SplitContext<T, SplitStateT>> splitStates;
protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
protected final SourceReaderOptions options;
protected final SourceReader.Context context;

private RecordsWithSplitIds<E> currentFetch;
public RecordsWithSplitIds<E> currentFetch;
protected SplitContext<T, SplitStateT> currentSplitContext;
private Collector<T> currentSplitOutput;
@Getter private volatile boolean noMoreSplitsAssignment;

public SourceReaderBase(
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
SourceReaderOptions options,
Expand All @@ -86,8 +87,13 @@ public void open() {
log.info("Open Source Reader.");
}

private InputStatus trace(InputStatus status) {
log.trace("Source reader status: {}", status);
return status;
}

@Override
public void pollNext(Collector<T> output) throws Exception {
public InputStatus pollNextV2(Collector<T> output) throws Exception {
RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
if (recordsWithSplitId == null) {
recordsWithSplitId = getNextFetch(output);
Expand All @@ -100,18 +106,38 @@ && isNoMoreElement()) {
"Reader {} into idle state, send NoMoreElement event",
context.getIndexOfSubtask());
}
return;
return trace(finishedOrAvailableLater());
}
}

E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
synchronized (output.getCheckpointLock()) {
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
while (true) {
E record = recordsWithSplitId.nextRecordFromSplit();
if (record != null) {
synchronized (output.getCheckpointLock()) {
recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
}
log.trace("Emitted record: {}", record);
return trace(InputStatus.MORE_AVAILABLE);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
return pollNextV2(output);
}
log.trace("Emitted record: {}", record);
} else if (!moveToNextSplit(recordsWithSplitId, output)) {
pollNext(output);
}
}

private InputStatus finishedOrAvailableLater() {
final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
return InputStatus.NOTHING_AVAILABLE;
}
if (elementsQueue.isEmpty()) {
// We may reach here because of exceptional split fetcher, check it.
splitFetcherManager.checkErrors();
return InputStatus.END_OF_INPUT;
} else {
// We can reach this case if we just processed all data from the queue and finished a
// split,
// and concurrently the fetcher finished another split, whose data is then in the queue.
return InputStatus.MORE_AVAILABLE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,12 @@ class AddSplitsTask<SplitT extends SourceSplit> implements SplitFetcherTask {
private final Map<String, SplitT> assignedSplits;

@Override
public void run() {
public boolean run() {
for (SplitT s : splitsToAdd) {
assignedSplits.put(s.splitId(), s);
}
splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.RecordsWithSplitIds;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.splitreader.SplitReader;
import org.apache.seatunnel.connectors.seatunnel.common.source.reader.synchronization.FutureCompletingBlockingQueue;

import lombok.AccessLevel;
import lombok.Getter;
Expand All @@ -28,8 +29,6 @@

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

@Slf4j
Expand All @@ -38,7 +37,7 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private static final int OFFER_TIMEOUT_MILLIS = 10000;

private final SplitReader<E, SplitT> splitReader;
private final BlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
private final Consumer<Collection<String>> splitFinishedCallback;
private final int fetcherIndex;

Expand All @@ -48,20 +47,22 @@ class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
private volatile RecordsWithSplitIds<E> lastRecords;

@Override
public void run() throws IOException {
public boolean run() throws IOException {
try {
if (!isWakeup() && lastRecords == null) {
lastRecords = splitReader.fetch();
log.debug("Fetch records from split fetcher {}", fetcherIndex);
}

if (!isWakeup()) {
if (elementsQueue.offer(lastRecords, OFFER_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
// The order matters here. We must first put the last records into the queue.
// This ensures the handling of the fetched records is atomic to wakeup.
if (elementsQueue.put(fetcherIndex, lastRecords)) {
if (!lastRecords.finishedSplits().isEmpty()) {
// The callback does not throw InterruptedException.
splitFinishedCallback.accept(lastRecords.finishedSplits());
}
lastRecords = null;
log.debug("Enqueued records from split fetcher {}", fetcherIndex);
} else {
log.debug(
"Enqueuing timed out in split fetcher {}, queue is blocked",
Expand All @@ -77,6 +78,7 @@ public void run() throws IOException {
wakeup = false;
}
}
return true;
}

@Override
Expand Down
Loading
Loading