Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37162] Add sinks.md to describe Flink's Data Sink API. #26013

Merged
merged 5 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions docs/content.zh/docs/dev/datastream/sinks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
---
title: "Data Sinks"
weight: 12
type: docs
aliases:
- /dev/stream/sinks.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Data Sinks

This page describes Flink's Data Sink API and the concepts and architecture behind it.
**Read this, if you are interested in how data sinks in Flink work, or if you want to implement a new Data Sink.**

If you are looking for pre-defined sink connectors, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}).

## The Data Sink API
This section describes the major interfaces of the new Sink API introduced in [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction), and provides tips to the developers on the Sink development.

### Sink
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java" name="Sink" >}} API is a factory style interface to create the [SinkWriter](#sinkwriter) to write the data.

The Sink implementations should be serializable as the Sink instances are serialized and uploaded to the Flink cluster at runtime.

#### Use the Sink
We can add a `Sink` to `DataStream` by calling `DataStream.sinkTo(Sink)` method. For example,

{{< tabs "bde5ff60-4e61-4633-a6dc-50413cfd7b45" >}}
{{< tab "Java" >}}
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Source mySource = new MySource(...);

DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");

Sink mySink = new MySink(...);

stream.sinkTo(mySink);
...
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
val mySink = new MySink(...)

val stream = stream.sinkTo(mySink)
...
```
{{< /tab >}}
{{< tab "Python" >}}
```python
env = StreamExecutionEnvironment.get_execution_environment()

my_source = ...

env.from_source(
my_source,
WatermarkStrategy.no_watermarks(),
"my_source_name")

my_sink = ...

env.sinkTo(my_sink)
```
{{< /tab >}}
{{< /tabs >}}

----

### SinkWriter

The core {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java" name="SinkWriter" >}} API is responsible for writing data to downstream system.

The `SinkWriter` API only has three methods:
- write(InputT element, Context context): Adds an element to the writer.
- flush(boolean endOfInput): Called on checkpoint or end of input so that the writer to flush all pending data for at-least-once.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: better English might be
Called on checkpoint or end of the input, setting this flag causes the writer to flush all pending data for at-least-once.

I am unsure about this comment, and what the writer is supposed to so with the flag set or on set. I assume the writer needs to check for at-least-once. For exactly-once - how does flush work?

- writeWatermark(Watermark watermark): Adds a watermark to the writer.

Please check the Java doc of the class for more details.

## Advanced Sink API

### SupportsWriterState

The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java" name="SupportsWriterState" >}} interface is used to indicate that the sink supports writer state, which means that the sink can be recovered from a failure.

The `SupportsWriterState` interface would require the `SinkWriter` to implement the {{ gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java" name="StatefulSinkWriter" >}} interface.

### SupportsCommitter

The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java" name="SupportsCommitter" >}} interface is used to indicate that the sink supports exactly-once semantics using a two-phase commit protocol.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be useful to point to how Flink is implementing 2 phase commit in the flink docs. Maybe this is another PR


The `Sink` consists of a `CommittingSinkWriter` that performs the precommits and a `Committer` that actually commits the data. To facilitate the separation, the `CommittingSinkWriter` creates `committables` on checkpoint or end of input and the sends it to the `Committer`.

The `Sink` needs to be serializable. All configuration should be validated eagerly. The respective sink writers and committers are transient and will only be created in the subtasks on the TaskManagers.

### SupportsPreWriteTopology

Allows expert users to implement a custom topology before `SinkWriter`.

### SupportsPreCommitTopology

Allows expert users to implement a custom topology after `SinkWriter` and before `Committer`.

### SupportsPostCommitTopology

Allows expert users to implement a custom topology after `Committer`.
136 changes: 136 additions & 0 deletions docs/content/docs/dev/datastream/sinks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
---
title: "Data Sinks"
weight: 12
type: docs
aliases:
- /dev/stream/sinks.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Data Sinks

This page describes Flink's Data Sink API and the concepts and architecture behind it.
**Read this, if you are interested in how data sinks in Flink work, or if you want to implement a new Data Sink.**

If you are looking for pre-defined sink connectors, please check the [Connector Docs]({{< ref "docs/connectors/datastream/overview" >}}).

## The Data Sink API
This section describes the major interfaces of the new Sink API introduced in [FLIP-191](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction), and provides tips to the developers on the Sink development.

### Sink
The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/Sink.java" name="Sink" >}} API is a factory style interface to create the [SinkWriter](#sinkwriter) to write the data.

The Sink implementations should be serializable as the Sink instances are serialized and uploaded to the Flink cluster at runtime.

#### Use the Sink
We can add a `Sink` to `DataStream` by calling `DataStream.sinkTo(Sink)` method. For example,

{{< tabs "bde5ff60-4e61-4633-a6dc-50413cfd7b45" >}}
{{< tab "Java" >}}
```java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Source mySource = new MySource(...);

DataStream<Integer> stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName");

Sink mySink = new MySink(...);

stream.sinkTo(mySink);
...
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
val env = StreamExecutionEnvironment.getExecutionEnvironment()

val mySource = new MySource(...)

val stream = env.fromSource(
mySource,
WatermarkStrategy.noWatermarks(),
"MySourceName")
val mySink = new MySink(...)

val stream = stream.sinkTo(mySink)
...
```
{{< /tab >}}
{{< tab "Python" >}}
```python
env = StreamExecutionEnvironment.get_execution_environment()

my_source = ...

env.from_source(
my_source,
WatermarkStrategy.no_watermarks(),
"my_source_name")

my_sink = ...

env.sinkTo(my_sink)
```
{{< /tab >}}
{{< /tabs >}}

----

### SinkWriter

The core {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SinkWriter.java" name="SinkWriter" >}} API is responsible for writing data to downstream system.

The `SinkWriter` API only has three methods:
- write(InputT element, Context context): Adds an element to the writer.
- flush(boolean endOfInput): Called on checkpoint or end of input so that the writer to flush all pending data for at-least-once.
- writeWatermark(Watermark watermark): Adds a watermark to the writer.

Please check the Java doc of the class for more details.

## Advanced Sink API

### SupportsWriterState

The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsWriterState.java" name="SupportsWriterState" >}} interface is used to indicate that the sink supports writer state, which means that the sink can be recovered from a failure.

The `SupportsWriterState` interface would require the `SinkWriter` to implement the {{ gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/StatefulSinkWriter.java" name="StatefulSinkWriter" >}} interface.

### SupportsCommitter

The {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/connector/sink2/SupportsCommitter.java" name="SupportsCommitter" >}} interface is used to indicate that the sink supports exactly-once semantics using a two-phase commit protocol.

The `Sink` consists of a `CommittingSinkWriter` that performs the precommits and a `Committer` that actually commits the data. To facilitate the separation, the `CommittingSinkWriter` creates `committables` on checkpoint or end of input and the sends it to the `Committer`.

The `Sink` needs to be serializable. All configuration should be validated eagerly. The respective sink writers and committers are transient and will only be created in the subtasks on the TaskManagers.

### SupportsPreWriteTopology

Allows expert users to implement a custom topology before `SinkWriter`.

### SupportsPreCommitTopology

Allows expert users to implement a custom topology after `SinkWriter` and before `Committer`.

### SupportsPostCommitTopology

Allows expert users to implement a custom topology after `Committer`.
4 changes: 0 additions & 4 deletions docs/content/docs/dev/datastream/sources.md
Original file line number Diff line number Diff line change
Expand Up @@ -369,10 +369,6 @@ The `SourceReader` implementations can also implement their own threading model

*Event Time* assignment and *Watermark Generation* happen as part of the data sources. The event streams leaving the Source Readers have event timestamps and (during streaming execution) contain watermarks. See [Timely Stream Processing]({{< ref "docs/concepts/time" >}}) for an introduction to Event Time and Watermarks.

{{< hint warning >}}
Applications based on the legacy {{< gh_link file="flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java" name="SourceFunction" >}} typically generate timestamps and watermarks in a separate later step via `stream.assignTimestampsAndWatermarks(WatermarkStrategy)`. This function should not be used with the new sources, because timestamps will be already assigned, and it will override the previous split-aware watermarks.
{{< /hint >}}

#### API

The `WatermarkStrategy` is passed to the Source during creation in the DataStream API and creates both the {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java" name="TimestampAssigner" >}} and {{< gh_link file="flink-core/src/main/java/org/apache/flink/api/common/eventtime/WatermarkGenerator.java" name="WatermarkGenerator" >}}.
Expand Down