Skip to content

Commit

Permalink
[FLINK-36905] Update Chinese doc on serialization to reflect the late…
Browse files Browse the repository at this point in the history
…st changes in Flink 2.0
  • Loading branch information
X-czh committed Jan 20, 2025
1 parent 4a2535c commit 824bd40
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ to specify the state's name, as well as information about the type of the state.
It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states,
simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation:

{{< tabs "ee215ff6-2e21-4a40-a1b4-7f114560546f" >}}
{{< tab "Java" >}}
```java
public class CustomTypeSerializer extends TypeSerializer<Tuple2<String, Integer>> {...};

Expand All @@ -54,20 +52,6 @@ ListStateDescriptor<Tuple2<String, Integer>> descriptor =

checkpointedState = getRuntimeContext().getListState(descriptor);
```
{{< /tab >}}
{{< tab "Scala" >}}
```scala
class CustomTypeSerializer extends TypeSerializer[(String, Integer)] {...}

val descriptor = new ListStateDescriptor[(String, Integer)](
"state-name",
new CustomTypeSerializer)
)

checkpointedState = getRuntimeContext.getListState(descriptor)
```
{{< /tab >}}
{{< /tabs >}}

## State serializers and schema evolution

Expand Down Expand Up @@ -151,7 +135,7 @@ To wrap up, this section concludes how Flink, or more specifically the state bac
abstractions. The interaction is slightly different depending on the state backend, but this is orthogonal
to the implementation of state serializers and their serializer snapshots.

#### Off-heap state backends (e.g. `RocksDBStateBackend`)
#### Off-heap state backends (e.g. `EmbeddedRocksDBStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
- the registered `TypeSerializer` for the state is used to read / write state on every state access.
Expand All @@ -172,7 +156,7 @@ to the implementation of state serializers and their serializer snapshots.
of the accessed state is migrated all-together before processing continues.
- If the resolution signals incompatibility, then the state access fails with an exception.

#### Heap state backends (e.g. `MemoryStateBackend`, `FsStateBackend`)
#### Heap state backends (e.g. `HashMapStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
- the registered `TypeSerializer` is maintained by the state backend.
Expand Down Expand Up @@ -284,6 +268,7 @@ public class MapSerializerSnapshot<K, V> extends CompositeTypeSerializerSnapshot
}
```


When implementing a new serializer snapshot as a subclass of `CompositeTypeSerializerSnapshot`,
the following three methods must be implemented:
* `#getCurrentOuterSnapshotVersion()`: This method defines the version of
Expand Down Expand Up @@ -311,6 +296,7 @@ has outer snapshot information, then all three methods must be implemented.
Below is an example of how the `CompositeTypeSerializerSnapshot` is used for composite serializer snapshots
that do have outer snapshot information, using Flink's `GenericArraySerializer` as an example:


```java
public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerializerSnapshot<C[], GenericArraySerializer> {

Expand Down Expand Up @@ -365,6 +351,7 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
}
```


There are two important things to notice in the above code snippet. First of all, since this
`CompositeTypeSerializerSnapshot` implementation has outer snapshot information that is written as part of the snapshot,
the outer snapshot version, as defined by `getCurrentOuterSnapshotVersion()`, must be upticked whenever the
Expand Down Expand Up @@ -449,18 +436,18 @@ migrate from the old abstractions. The steps to do this is as follows:

This section is a guide for a method migration from the serializer snapshots that existed before Flink 1.19.

Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer
(maybe in Flink library) has to meet the future need.
Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer has to be modified.
There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution
Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer
(maybe in Flink library) has to meet the future need.
Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer<T> newSerializer) of the old serializer has to be modified.
There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution
not supported in some scenarios.

So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with
So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with
`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`.
To make this transition, follow these steps:

1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic
1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic
should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.
2. Remove the old method `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,37 @@ under the License.

如果在 Flink 程序中使用了 Flink 类型序列化器无法进行序列化的用户自定义类型,Flink 会回退到通用的 Kryo 序列化器。
可以使用 Kryo 注册自己的序列化器或序列化系统,比如 Google Protobuf 或 Apache Thrift。
使用方法是在 Flink 程序中的 `ExecutionConfig` 注册类类型以及序列化器。
使用方法是在 Flink 程序中使用配置 [pipeline.serialization-config]({{< ref "docs/deployment/config#pipeline-serialization-config" >}})
注册类类型以及序列化器:

```java
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 为类型注册序列化器类
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, MyCustomSerializer.class);

// 为类型注册序列化器实例
MySerializer mySerializer = new MySerializer();
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, mySerializer);
```yaml
pipeline.serialization-config:
- org.example.MyCustomType: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer}
```
需要确保你的自定义序列化器继承了 Kryo 的序列化器类。
对于 Google Protobuf 或 Apache Thrift,这一点已经为你做好了:
你也可以使用代码设置:
```java
Configuration config = new Configuration();

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// register the class of the serializer as serializer for a type
config.set(PipelineOptions.SERIALIZATION_CONFIG,
List.of("org.example.MyCustomType: {type: kryo, kryo-type: registered, class: org.example.MyCustomSerializer}"));

// 使用 Kryo 注册 Google Protobuf 序列化器
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
```

// 注册 Apache Thrift 序列化器为标准序列化器
// TBaseSerializer 需要初始化为默认的 kryo 序列化器
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);
需要确保你的自定义序列化器继承了 Kryo 的序列化器类。
对于 Google Protobuf 或 Apache Thrift,这一点已经为你做好了:

```
```yaml
pipeline.serialization-config:
# register the Google Protobuf serializer with Kryo
- org.example.MyCustomProtobufType: {type: kryo, kryo-type: registered, class: com.twitter.chill.protobuf.ProtobufSerializer}
# register the serializer included with Apache Thrift as the standard serializer
# TBaseSerializer states it should be initialized as a default Kryo serializer
- org.example.MyCustomThriftType: {type: kryo, kryo-type: default, class: com.twitter.chill.thrift.TBaseSerializer}
````

为了使上面的例子正常工作,需要在 Maven 项目文件中(pom.xml)包含必要的依赖。
为 Apache Thrift 添加以下依赖:
Expand Down
Loading

0 comments on commit 824bd40

Please sign in to comment.