diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md index f8e4840d28a0e..7238c60ecca1e 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/serialization/custom_serialization.md @@ -1,9 +1,10 @@ --- -title: "Custom State Serialization" -weight: 8 +title: "自定义序列化器" +weight: 3 type: docs aliases: - /zh/dev/stream/state/custom_serialization.html + - /zh/docs/dev/datastream/fault-tolerance/custom_serialization/ --- -# Custom Serialization for Managed State +# 自定义序列化器 -This page is targeted as a guideline for users who require the use of custom serialization for their state, covering -how to provide a custom state serializer as well as guidelines and best practices for implementing serializers that allow -state schema evolution. +本页面为需要自定义序列化器的用户提供指导,包括如何自定义序列化器,以及实现支持状态格式演进(state schema evolution)的序列化器的最佳实践。 -If you're simply using Flink's own serializers, this page is irrelevant and can be ignored. +如果你仅使用 Flink 自带的序列化器,可以忽略本节内容。 -## Using custom state serializers +## 使用自定义序列化器 -When registering a managed operator or keyed state, a `StateDescriptor` is required -to specify the state's name, as well as information about the type of the state. The type information is used by Flink's -[type serialization framework]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) to create appropriate serializers for the state. +注册 Managed keyed/operator state 的时候,需要提供一个包含 state 名字以及其 state 类型信息的 `StateDescriptor`。 +Flink 的 [type serialization framework]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) 会使用类型信息创建对应的序列化器。 -It is also possible to completely bypass this and let Flink use your own custom serializer to serialize managed states, -simply by directly instantiating the `StateDescriptor` with your own `TypeSerializer` implementation: +当然也可以通过传递自定义的 `TypeSerializer` 给 `StateDescriptor`,让 Flink 使用自定义序列化器序列化 managed state: {{< tabs "ee215ff6-2e21-4a40-a1b4-7f114560546f" >}} {{< tab "Java" >}} @@ -69,30 +66,22 @@ checkpointedState = getRuntimeContext.getListState(descriptor) {{< /tab >}} {{< /tabs >}} -## State serializers and schema evolution +## 状态序列化器及格式演进 -This section explains the user-facing abstractions related to state serialization and schema evolution, and necessary -internal details about how Flink interacts with these abstractions. +本节介绍了 state 序列化以及格式演进相关的面向用户的抽象,以及 Flink 如何与这些抽象交互的一些内部细节。 -When restoring from savepoints, Flink allows changing the serializers used to read and write previously registered state, -so that users are not locked in to any specific serialization schema. When state is restored, a new serializer will be -registered for the state (i.e., the serializer that comes with the `StateDescriptor` used to access the state in the -restored job). This new serializer may have a different schema than that of the previous serializer. Therefore, when -implementing state serializers, besides the basic logic of reading / writing data, another important thing to keep in -mind is how the serialization schema can be changed in the future. +从 savepoint 恢复时,Flink 允许更改 state 的序列化器,从而支持格式演进。state 恢复之后,将使用新的序列化器进行 state 注册(即恢复后作业中 `StateDescriptor` 指定的序列化器), +新的序列化器可能和之前的序列化器拥有不同的格式。因此,实现 state 序列化器的时候,处理正确处理读写数据的基本逻辑外,另外一个需要重点考虑的是未来如何支持 state 的格式演进。 -When speaking of *schema*, in this context the term is interchangeable between referring to the *data model* of a state -type and the *serialized binary format* of a state type. The schema, generally speaking, can change for a few cases: +这里说的 *格式*,既可能指 state 的 *数据模型*,也可能指 *序列化之后的二进制格式"。一般来说,格式在如下情况下会发生改变: - 1. Data schema of the state type has evolved, i.e. adding or removing a field from a POJO that is used as state. - 2. Generally speaking, after a change to the data schema, the serialization format of the serializer will need to be upgraded. - 3. Configuration of the serializer has changed. - -In order for the new execution to have information about the *written schema* of state and detect whether or not the -schema has changed, upon taking a savepoint of an operator's state, a *snapshot* of the state serializer needs to be -written along with the state bytes. This is abstracted a `TypeSerializerSnapshot`, explained in the next subsection. + 1. state 的格式发生变化,比如 POJO 类中增加或删除字段。 + 2. 一般来说,数据格式变化之后,序列化器的序列化格式需要进行升级。 + 3. 序列化器的配置发生了变化。 + +为了能在新作业中获取到之前 state 的结构,并检测到模式是否发生变化,在生成 savepoint 的时候,会把 state 序列化器的快照也一并写出。这个快照被抽象为 `TypeSerializerSnapshot`,在下一节中详细描述 -### The `TypeSerializerSnapshot` abstraction +### `TypeSerializerSnapshot` 相关抽象 ```java public interface TypeSerializerSnapshot { @@ -112,109 +101,72 @@ public abstract class TypeSerializer { } ``` -A serializer's `TypeSerializerSnapshot` is a point-in-time information that serves as the single source of truth about -the state serializer's write schema, as well as any additional information mandatory to restore a serializer that -would be identical to the given point-in-time. The logic about what should be written and read at restore time -as the serializer snapshot is defined in the `writeSnapshot` and `readSnapshot` methods. - -Note that the snapshot's own write schema may also need to change over time (e.g. when you wish to add more information -about the serializer to the snapshot). To facilitate this, snapshots are versioned, with the current version -number defined in the `getCurrentVersion` method. On restore, when the serializer snapshot is read from savepoints, -the version of the schema in which the snapshot was written in will be provided to the `readSnapshot` method so that -the read implementation can handle different versions. - -At restore time, the logic that detects whether or not the new serializer's schema has changed should be implemented in -the `resolveSchemaCompatibility` method. When previous registered state is registered again with new serializers in the -restored execution of an operator, the old serializer snapshot is provided to the new serializer's snapshot via this method. -This method returns a `TypeSerializerSchemaCompatibility` representing the result of the compatibility resolution, -which can be one of the following: - - 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: this result signals that the new serializer is compatible, - meaning that the new serializer has identical schema with the previous serializer. It is possible that the new - serializer has been reconfigured in the `resolveSchemaCompatibility` method so that it is compatible. - 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this result signals that the new serializer has a - different serialization schema, and it is possible to migrate from the old schema by using the previous serializer - (which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with - the new serializer (which recognizes the new schema). - 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result signals that the new serializer has a - different serialization schema, but it is not possible to migrate from the old schema. - -The last bit of detail is how the previous serializer is obtained in the case that migration is required. -Another important role of a serializer's `TypeSerializerSnapshot` is that it serves as a factory to restore -the previous serializer. More specifically, the `TypeSerializerSnapshot` should implement the `restoreSerializer` method -to instantiate a serializer instance that recognizes the previous serializer's schema and configuration, and can therefore -safely read data written by the previous serializer. - -### How Flink interacts with the `TypeSerializer` and `TypeSerializerSnapshot` abstractions - -To wrap up, this section concludes how Flink, or more specifically the state backends, interact with the -abstractions. The interaction is slightly different depending on the state backend, but this is orthogonal -to the implementation of state serializers and their serializer snapshots. +序列化器的 `TypeSerializerSnapshot` 包含序列化器的结构信息,以及恢复序列化器所需要的其他附加信息。序列化器的快照读写逻辑在 `writeSnapshot` 以及 `readSnapshot` 中进行实现。 + +需要注意的是快照本身的格式可能也需要随时间发生变化(比如,往快照中增加更多序列化器的信息)。为了方便,快照携带版本号,可以通过 `getCurrentVersion` 获取当前版本。在恢复的时候,从 savepoint 读取到快照后,`readSnapshot` 会调用对应版本的实现方法。 + +在恢复时,检测序列化器格式是否发生变化的逻辑应该在 `resolveSchemaCompatibility` 中实现,该方法接收新的序列化器作为参数。该方法返回一个 `TypeSerializerSchemaCompatibility` 表示兼容性的结果,该结果有如下几种: + 1. **`TypeSerializerSchemaCompatibility.compatibleAsIs()`**: 该结果表明新的序列化器是兼容的,意味着新序列化器和之前的序列化器拥有相同的格式。也可能是在 `resolveSchemaCompatibility` 中对新序列化器继续重新配置所达到的。 + 2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: 该结果表示新旧序列化器的格式不同,不过可以使用之前的序列化器反序列化,然后再用新序列化器进行序列化,从而进行迁移。 + 3. **`TypeSerializerSchemaCompatibility.incompatible()`**: 该结果表示新旧序列化器的格式不同,且无法进行迁移。 + +最后一点细节在于需要进行 state 迁移时如何获取之前的序列化器。`TypeSerializerSnapshot` 的另一个重要作用是可以构造之前的序列化器。更具体的说,`TypeSerializerSnapshot` 应该实现 `restoreSerializer` 方法,该方法返回一个可以安全读取之前序列化器写出数据的序列化器。 + +### Flink 如何与 `TypeSerializer` 及 `TypeSerializerSnapshot` 交互 + +总结一下,本节阐述了 Flink,或者更具体的说状态后端,是如何与这些抽象交互。根据状态后端的不同,交互方式略有不同,但这与序列化器以及序列化器快照的实现是正交的。 #### Off-heap state backends (e.g. `RocksDBStateBackend`) - 1. **Register new state with a state serializer that has schema _A_** - - the registered `TypeSerializer` for the state is used to read / write state on every state access. - - State is written in schema *A*. + 1. **以拥有格式 _A_ 的序列化器注册一个新的 state** + - state 的每次访问(读/写)都使用注册的 `TypeSerializer`. + - state 以格式 *A* 进行序列化. 2. **Take a savepoint** - - The serializer snapshot is extracted via the `TypeSerializer#snapshotConfiguration` method. - - The serializer snapshot is written to the savepoint, as well as the already-serialized state bytes (with schema *A*). - 3. **Restored execution re-accesses restored state bytes with new state serializer that has schema _B_** - - The previous state serializer's snapshot is restored. - - State bytes are not deserialized on restore, only loaded back to the state backends (therefore, still in schema *A*). - - Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the - `TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility. - 4. **Migrate state bytes in backend from schema _A_ to schema _B_** - - If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is - performed. The previous state serializer which recognizes schema _A_ will be obtained from the serializer snapshot, via - `TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects, which in turn - are re-written again with the new serializer, which recognizes schema _B_ to complete the migration. All entries - of the accessed state is migrated all-together before processing continues. - - If the resolution signals incompatibility, then the state access fails with an exception. + - 通过 `TypeSerializer#snapshotConfiguration` 获取序列化器快照。 + - 序列化器快照和序列化后的 state 数据一起写到 savepoin。 + 3. **以一个拥有格式 _B_ 的新序列化器恢复 state 数据** + - 恢复之前序列化器的快照。 + - state 的数据不会真正被反序列化,仅加载到状态后端(这个时候仍然以格式 *A* 的形式存在)。 + - 接受到新序列化器之后,会通过 `TypeSerializer#resolveSchemaCompatibility` 检查格式的兼容性。 + 4. **在状态后端中从格式 _A_ 迁移到格式 _B_** + - 如果格式发生了变化且能够进行迁移,则会迁移。通过 `TypeSerializerSnapshot#restoreSerializer()` 获取之前格式 _A_ 的序列化器,用于反序列化 state 数据,然后使用格式 _B_ 的序列化器序列化,从而完成迁移。迁移工作会在其他工作之前一次性完成。 + - 如果格式不兼容,则会抛异常。 #### Heap state backends (e.g. `MemoryStateBackend`, `FsStateBackend`) - 1. **Register new state with a state serializer that has schema _A_** - - the registered `TypeSerializer` is maintained by the state backend. - 2. **Take a savepoint, serializing all state with schema _A_** - - The serializer snapshot is extracted via the `TypeSerializer#snapshotConfiguration` method. - - The serializer snapshot is written to the savepoint. - - State objects are now serialized to the savepoint, written in schema _A_. - 3. **On restore, deserialize state into objects in heap** - - The previous state serializer's snapshot is restored. - - The previous serializer, which recognizes schema _A_, is obtained from the serializer snapshot, via - `TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects. - - From now on, all of the state is already deserialized. - 4. **Restored execution re-accesses previous state with new state serializer that has schema _B_** - - Upon receiving the new serializer, the previous serializer's snapshot is provided to the new serializer's snapshot via the - `TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility. - - If the compatibility check signals that migration is required, nothing happens in this case since for - heap backends, all state is already deserialized into objects. - - If the resolution signals incompatibility, then the state access fails with an exception. - 5. **Take another savepoint, serializing all state with schema _B_** - - Same as step 2., but now state bytes are all in schema _B_. - -## Predefined convenient `TypeSerializerSnapshot` classes - -Flink provides two abstract base `TypeSerializerSnapshot` classes that can be used for typical scenarios: -`SimpleTypeSerializerSnapshot` and `CompositeTypeSerializerSnapshot`. - -Serializers that provide these predefined snapshots as their serializer snapshot must always have their own, independent -subclass implementation. This corresponds to the best practice of not sharing snapshot classes -across different serializers, which is more thoroughly explained in the next section. - -### Implementing a `SimpleTypeSerializerSnapshot` - -The `SimpleTypeSerializerSnapshot` is intended for serializers that do not have any state or configuration, -essentially meaning that the serialization schema of the serializer is solely defined by the serializer's class. - -There will only be 2 possible results of the compatibility resolution when using the `SimpleTypeSerializerSnapshot` -as your serializer's snapshot class: - - - `TypeSerializerSchemaCompatibility.compatibleAsIs()`, if the new serializer class remains identical, or - - `TypeSerializerSchemaCompatibility.incompatible()`, if the new serializer class is different then the previous one. - -Below is an example of how the `SimpleTypeSerializerSnapshot` is used, using Flink's `IntSerializer` as an example: + 1. **以拥有格式 _A_ 的序列化器注册一个新的 state** + - 注册的 `TypeSerializer` 后状态后端维护. + 2. **做 savepoint 时, 以格式 _A_ 序列化所有的 state** + - 通过 `TypeSerializer#snapshotConfiguration` 获取序列化器快照. + - 将序列化器快照写到 savepoint. + - 以格式 _A_ 将 state 序列化到 savepoint 中。 + 3. **恢复时,将 state 反序列化到堆上的对象** + - 恢复之前的序列化器快照。 + - 通过 `TypeSerializerSnapshot#restoreSerializer()` 获取之前的格式 _A_ 的序列化器,并用于反序列化 state。 + - 至此,所有的 state 都被反序列化完成。 + 4. **恢复后以新格式 _B_ 的序列化器访问 state**。 + - 接受到新的序列化器后,通过 `TypeSerializer#resolveSchemaCompatibility` 检查格式的兼容性。 + - 如果兼容性表明需要迁移,则不需要做任何事情,因为所有的 state 都已经被反序列化成对象。 + - 如果兼容性表明不兼容,则会抛出异常。 + 5. **再次执行 savepoint 时,以格式 _B_ 序列化所有的 state** + - 和第二步一样,不过使用格式 _B_。 + +## 内置的 `TypeSerializerSnapshot` 类 + +Flink 提供了 `TypeSerializerSnapshot` 在两个典型场景下的抽象类 `SimpleTypeSerializerSnapshot` 和 `CompositeTypeSerializerSnapshot`。 + +使用这些预定义快照类的序列化器需要提供自己独立的实现。这是不同序列化器之间不共享快照类的最佳实践,这一点在下一节有更详细的描述。 + +### 实现 `SimpleTypeSerializerSnapshot` + +`SimpleTypeSerializerSnapshot` 用于那些没有任何状态或者配置的序列化器,基本意味着序列化器的格式由序列化器的类所决定。 + +使用 `SimpleTypeSerializerSnapshot` 的情况下,格式兼容性的结果只有两种: + + - `TypeSerializerSchemaCompatibility.compatibleAsIs()`:新旧序列化器完全兼容,或者 + - `TypeSerializerSchemaCompatibility.incompatible()`:新旧序列化器完全不兼容。 + +下面是以 `IntSerializer` 为例的一个介绍 `SimpleTypeSerializerSnapshot` 的例子: ```java public class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot { @@ -224,33 +176,21 @@ public class IntSerializerSnapshot extends SimpleTypeSerializerSnapshot } ``` -The `IntSerializer` has no state or configurations. Serialization format is solely defined by the serializer -class itself, and can only be read by another `IntSerializer`. Therefore, it suits the use case of the -`SimpleTypeSerializerSnapshot`. +`IntSerializer` 没有任何状态或配置。序列化器的格式完全有序列化器类所决定,只能被另外一个 `IntSerializer` 所读取。因此很适合 `SimpleTypeSerializerSnapshot`。 -The base super constructor of the `SimpleTypeSerializerSnapshot` expects a `Supplier` of instances -of the corresponding serializer, regardless of whether the snapshot is currently being restored or being written during -snapshots. That supplier is used to create the restore serializer, as well as type checks to verify that the -new serializer is of the same expected serializer class. +`SimpleTypeSerializerSnapshot` 的构造函数需要一个提供对应序列化器的 `Supplier`, 这个序列化器会在做快照或者恢复时使用。在恢复时会检查 supplier 返回的序列化器是否是预期内的序列化器类。 -### Implementing a `CompositeTypeSerializerSnapshot` +### 实现 `CompositeTypeSerializerSnapshot` -The `CompositeTypeSerializerSnapshot` is intended for serializers that rely on multiple nested serializers for serialization. +`CompositeTypeSerializerSnapshot` 主要用于那些依赖多个嵌套序列化器的组合序列化器。 -Before further explanation, we call the serializer, which relies on multiple nested serializer(s), as the "outer" serializer in this context. -Examples for this could be `MapSerializer`, `ListSerializer`, `GenericArraySerializer`, etc. -Consider the `MapSerializer`, for example - the key and value serializers would be the nested serializers, -while `MapSerializer` itself is the "outer" serializer. +在进一步解释之前,我们将依赖多个嵌套序列化器的序列化器称为"外部"序列化器。比如 `MapSerializer`, `ListSerializer` 以及 `GenericArraySerializer` 等。以 `MapSerializer` 为例 -- 键和值的序列化器都是嵌套序列化器,而 `MapSerializer` 本身就是 "外部" 序列化器。 -In this case, the snapshot of the outer serializer should also contain snapshots of the nested serializers, so that -the compatibility of the nested serializers can be independently checked. When resolving the compatibility of the -outer serializer, the compatibility of each nested serializer needs to be considered. +这种情况下,外部序列化器的快照还应该包含嵌套序列化器的快照,从而可以单独检查嵌套序列化器的兼容性。嵌套序列化器的兼容性会影响外部序列化器的兼容性。 -`CompositeTypeSerializerSnapshot` is provided to assist in the implementation of snapshots for these kind of -composite serializers. It deals with reading and writing the nested serializer snapshots, as well as resolving -the final compatibility result taking into account the compatibility of all nested serializers. +`CompositeTypeSerializerSnapshot` 用于实现这类符合序列化器的快照。它覆盖了嵌套序列化器快照的读写,以及考虑所有嵌套序列化器的兼容性来决定外部序列化器的兼容性。 -Below is an example of how the `CompositeTypeSerializerSnapshot` is used, using Flink's `MapSerializer` as an example: +下面以 `MapSerializer` 为例来阐述如何使用 `CompositeTypeSerializerSnapshot`。 ```java public class MapSerializerSnapshot extends CompositeTypeSerializerSnapshot, MapSerializer> { @@ -284,32 +224,21 @@ public class MapSerializerSnapshot extends CompositeTypeSerializerSnapshot } ``` -When implementing a new serializer snapshot as a subclass of `CompositeTypeSerializerSnapshot`, -the following three methods must be implemented: - * `#getCurrentOuterSnapshotVersion()`: This method defines the version of - the current outer serializer snapshot's serialized binary format. - * `#getNestedSerializers(TypeSerializer)`: Given the outer serializer, returns its nested serializers. - * `#createOuterSerializerWithNestedSerializers(TypeSerializer[])`: - Given the nested serializers, create an instance of the outer serializer. - -The above example is a `CompositeTypeSerializerSnapshot` where there are no extra information to be snapshotted -apart from the nested serializers' snapshots. Therefore, its outer snapshot version can be expected to never -require an uptick. Some other serializers, however, contains some additional static configuration -that needs to be persisted along with the nested component serializer. An example for this would be Flink's -`GenericArraySerializer`, which contains as configuration the class of the array element type, besides -the nested element serializer. - -In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`: - * `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written. - * `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read. - * `#resolveOuterSchemaCompatibility(TypeSerializerSnapshot)`: checks the compatibility based on the outer snapshot information. - -By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to -read / write, and therefore have empty default implementations for the above methods. If the subclass -has outer snapshot information, then all three methods must be implemented. - -Below is an example of how the `CompositeTypeSerializerSnapshot` is used for composite serializer snapshots -that do have outer snapshot information, using Flink's `GenericArraySerializer` as an example: +当作为 `CompositeTypeSerializerSnapshot` 的子类实现新序列化器快照时,必须实现如下三个方法: + * `#getCurrentOuterSnapshotVersion()`: 返回外部序列化器序列化快照的当前版本 + * `#getNestedSerializers(TypeSerializer)`: 返回外部序列化器的嵌套序列化器 + * `#createOuterSerializerWithNestedSerializers(TypeSerializer[])`: 创建一个包含给定嵌套序列化器的外部序列化器 + +上述示例是一个除嵌套序列化器快照外,没有其他额外信息的 `CompositeTypeSerializerSnapshot` 例子。因此,外部序列化器永远不会有升级需求。而其他一些序列化器,则包含了一些需要和嵌套序列化器快照一起被持久化的静态配置,Flink 的 `GenericArraySerializer` 是其中一个例子,除嵌套序列化器的信息外,还包含了数组元素类型的相关信息。 + +这种情况下,还需要实现其他三个方法: + * `#writeOuterSnapshot(DataOutputView)`: 定义如何持久化外部序列化器快照信息 + * `#readOuterSnapshot(int, DataInputView, ClassLoader)`: 定义如何读取外部序列化器快照信息 + * `#resolveOuterSchemaCompatibility(TypeSerializerSnapshot)`: 根据外部序列化器快照的相关信息检查兼容性 + +默认情况下,`CompositeTypeSerializerSnapshot` 假定外部序列化器没有任何快照信息需要读写,因此上述三个方法均是空实现。如果子类有相应需求,则上述三个方法均需实现。 + +下面以 `GenericArraySerializer` 为例,介绍拥有外部快照信息的序列化器如何使用 `CompositeTypeSerializerSnapshot`。 ```java public final class GenericArraySerializerSnapshot extends CompositeTypeSerializerSnapshot { @@ -365,103 +294,56 @@ public final class GenericArraySerializerSnapshot extends CompositeTypeSerial } ``` -There are two important things to notice in the above code snippet. First of all, since this -`CompositeTypeSerializerSnapshot` implementation has outer snapshot information that is written as part of the snapshot, -the outer snapshot version, as defined by `getCurrentOuterSnapshotVersion()`, must be upticked whenever the -serialization format of the outer snapshot information changes. - -Second of all, notice how we avoid using Java serialization when writing the component class, by only writing -the classname and dynamically loading it when reading back the snapshot. Avoiding Java serialization for writing -contents of serializer snapshots is in general a good practice to follow. More details about this is covered in the -next section. +上述代码片段中,有两个重要的事情需要注意。首先,外部序列化器的额外信息会被持久化到快照中,因此当外部序列化器的额外信息被修改后,`getCurrentOuterSnapshotVersion()` 的返回值也需要随之升级。 -## Implementation notes and best practices +另外就是,注意我们如何避免使用 Java 的序列化器,仅持久化类名,并在恢复时进行动态加载。避免使用 Java 的序列化器通常是一个很好的做法,有关这一点的详细描述将在下一节介绍。 -#### 1. Flink restores serializer snapshots by instantiating them with their classname +## 实现细节及最佳实践 -A serializer's snapshot, being the single source of truth for how a registered state was serialized, serves as an -entry point to reading state in savepoints. In order to be able to restore and access previous state, the previous state -serializer's snapshot must be able to be restored. +#### 1. Flink 通过类名实例化序列化器的快照 -Flink restores serializer snapshots by first instantiating the `TypeSerializerSnapshot` with its classname (written -along with the snapshot bytes). Therefore, to avoid being subject to unintended classname changes or instantiation -failures, `TypeSerializerSnapshot` classes should: +序列化器的快照,是如何序列化 state 的唯一事实标准。为了能够恢复并访问之前的 state,之前序列化器的快照也必须进行恢复。 - - avoid being implemented as anonymous classes or nested classes, - - have a public, nullary constructor for instantiation +Flink 会使用类名实例化具体的 `TypeSerializerSnapshot`(类名会和快照一起写出)。因此,为了防止无意的类名改动或者初始化 `TypeSerializerSnapshot` 失败,需要保证: + - 不能以匿名类或者内部类的形式实现 + - 有一个无参的 public 构造函数 -#### 2. Avoid sharing the same `TypeSerializerSnapshot` class across different serializers +#### 2. 避免在多个序列化器之间共用一个 `TypeSerializerSnapshot` -Since schema compatibility checks goes through the serializer snapshots, having multiple serializers returning -the same `TypeSerializerSnapshot` class as their snapshot would complicate the implementation for the -`TypeSerializerSnapshot#resolveSchemaCompatibility` and `TypeSerializerSnapshot#restoreSerializer()` method. +由于格式兼容检查是通过序列化器快照进行的,因此多个序列化器返回同一个 `TypeSerializerSnapshot` 会导致 `TypeSerializerSnapshot#resolveSchemaCompatibility` 和 `TypeSerializerSnapshot#restoreSerializer()` 的实现变复杂。 -This would also be a bad separation of concerns; a single serializer's serialization schema, -configuration, as well as how to restore it, should be consolidated in its own dedicated `TypeSerializerSnapshot` class. +共用 `TypeSerializerSnapshot` 从分工来说也是不好的选择,每个序列化器的的格式,配置信息以及如何恢复等都应该放到一个单独的 `TypeSerializerSnapshot` 中。 -#### 3. Avoid using Java serialization for serializer snapshot content +#### 3. 避免使用 Java 默认的序列化器 -Java serialization should not be used at all when writing contents of a persisted serializer snapshot. -Take for example, a serializer which needs to persist a class of its target type as part of its snapshot. -Information about the class should be persisted by writing the class name, instead of directly serializing the class -using Java. When reading the snapshot, the class name is read, and used to dynamically load the class via the name. +应该避免使用 Java 序列化器序列化序列化器快照。举例来说,序列化器在序列化一个目标类的时候,应该序列化类名,而不是直接序列化该对象。恢复快照时,通过读取类名,然后动态加载生成具体的序列化器快照对象。 -This practice ensures that serializer snapshots can always be safely read. In the above example, if the type class -was persisted using Java serialization, the snapshot may no longer be readable once the class implementation has changed -and is no longer binary compatible according to Java serialization specifics. +这种方式确保了序列化器快照始终可以被安全地读取。在上面的例子中,如果通过 Java 序列化器把类序列化,那么一旦序列化器快照类的实现进行了修改,会由于二进制兼容问题导致无法读取。 -## Migrating from deprecated serializer snapshot APIs before Flink 1.7 +## 从 1.7 之前的版本进行迁移 -This section is a guide for API migration from serializers and serializer snapshots that existed before Flink 1.7. +本节将介绍如何从 Flink 1.7 之前的序列化器以及序列化器快照 API 进行迁移。 -Before Flink 1.7, serializer snapshots were implemented as a `TypeSerializerConfigSnapshot` (which is now deprecated, -and will eventually be removed in the future to be fully replaced by the new `TypeSerializerSnapshot` interface). -Moreover, the responsibility of serializer schema compatibility checks lived within the `TypeSerializer`, -implemented in the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method. +在 Flink 1.7 之前,序列化器快照是以 `TypeSerializerConfigSnapshot` 存在(现在已经过时,会在未来被 `TypeSerializerSnapshot` 完全取代并删除)。此外,序列化器格式的兼容性检查在 `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 中执行。 -Another major difference between the new and old abstractions is that the deprecated `TypeSerializerConfigSnapshot` -did not have the capability of instantiating the previous serializer. Therefore, in the case where your serializer -still returns a subclass of `TypeSerializerConfigSnapshot` as its snapshot, the serializer instance itself will always -be written to savepoints using Java serialization so that the previous serializer may be available at restore time. -This is very undesirable, since whether or not restoring the job will be successful is susceptible to availability -of the previous serializer's class, or in general, whether or not the serializer instance can be read back at restore -time using Java serialization. This means that you be limited to the same serializer for your state, -and could be problematic once you want to upgrade serializer classes or perform schema migration. +新旧实现的另一个主要区别是,`TypeSerializerConfigSnapshot` 无法实例化之前的序列化器。因此,序列化器仍然需要返回一个 `TypeSerializerConfigSnapshot` 子类作为快照,序列化器对象本身会通过 Java 序列化器写到 savepoint 中,这样恢复时可能可以从 savepoint 中恢复原来的序列化器。这是非常不可取的,因为作业是否能够恢复成功,取决于能否从 savepoint 正确恢复原来的序列化器。这限制了序列化器的升级,一旦序列化器继续升级就可能出问题。 -To be future-proof and have flexibility to migrate your state serializers and schema, it is highly recommended to -migrate from the old abstractions. The steps to do this is as follows: +为将来序列化器能够继续升级,强烈建议迁移使用新 API,具体操作步骤如下所示: - 1. Implement a new subclass of `TypeSerializerSnapshot`. This will be the new snapshot for your serializer. - 2. Return the new `TypeSerializerSnapshot` as the serializer snapshot for your serializer in the - `TypeSerializer#snapshotConfiguration()` method. - 3. Restore the job from the savepoint that existed before Flink 1.7, and then take a savepoint again. - Note that at this step, the old `TypeSerializerConfigSnapshot` of the serializer must still exist in the classpath, - and the implementation for the `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` method must not be - removed. The purpose of this process is to replace the `TypeSerializerConfigSnapshot` written in old savepoints - with the newly implemented `TypeSerializerSnapshot` for the serializer. - 4. Once you have a savepoint taken with Flink 1.7, the savepoint will contain `TypeSerializerSnapshot` as the - state serializer snapshot, and the serializer instance will no longer be written in the savepoint. - At this point, it is now safe to remove all implementations of the old abstraction (remove the old - `TypeSerializerConfigSnapshot` implementation as will as the - `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` from the serializer). + 1. 实现 `TypeSerializerSnapshot` 的一个子类作为序列化器的快照类。 + 2. 在调用 `TypeSerializer#snapshotConfiguration()` 时返回新的序列化器快照类 + 3. 从1.7版本之前的旧作业的 savepoint 恢复,然后触发一个新的 savepoint。注意:这一步中旧的 `TypeSerializerConfigSnapshot` 必须存在于 classpath 中,并且 `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 不能被删除。这个操作是希望将 savepoint 中的 `TypeSerializerConfigSnapshot` 替换成新的 `TypeSerializerSnapshot`。 + 4. 使用Flink 1.7获取 savepoint 后,savepoint 将包含 “TypeSerializerSnapshot” 作为状态序列化器快照,序列化器实例将不再写入 savepoint。此时可以安全的删除 `TypeSerializerConfigSnapshot` 的子类以及 `TypeSerializer#ensureCompatibility(TypeSerializerConfigSnapshot)` 实现。 -## Migrating from deprecated `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` before Flink 1.19 +## 迁移从 Flink 1.19 之前弃用的 `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` -This section is a guide for a method migration from the serializer snapshots that existed before Flink 1.19. +本节是关于在 Flink 1.19 之前存在的序列化器快照方法的迁移指南。 -Before Flink 1.19, when using a customized serializer to process data, the schema compatibility in the old serializer -(maybe in Flink library) has to meet the future need. -Or else TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer) of the old serializer has to be modified. -There are no ways to specify the compatibility with the old serializer in the new serializer, which also makes scheme evolution -not supported in some scenarios. +在 Flink 1.19 之前,当使用自定义序列化器处理数据时,旧序列化器中的模式兼容性(可能在 Flink 库中)必须满足未来的需求。否则,就需要修改旧序列化器的 TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer) 方法。在新序列化器中无法指定与旧序列化器的兼容性,这也导致在某些场景下不支持格式演进。 -So from Flink 1.19, the direction of resolving schema compatibility has been reversed. The old method -`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` is now removed and needs to be replaced with -`TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`. -To make this transition, follow these steps: +因此,从 Flink 1.19 开始,解决模式兼容性的方式发生了变化。旧方法 `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` 已被移除,需要用 `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` 替换。要进行此迁移,请按照以下步骤操作: -1. Implement the `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)` whose logic - should be same as the original `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`. -2. Remove the old method `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`. + 1. 实现 `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializerSnapshot oldSerializerSnapshot)`,其逻辑应与原始的 `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)` 相同。 + 2. 移除旧方法 `TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer newSerializer)`。 {{< top >}}