Skip to content

Commit

Permalink
[FLINK-3154][runtime] Upgrade from Kryo v2 + Chill 0.7.6 to Kryo v5 w…
Browse files Browse the repository at this point in the history
…ith backward compatibility for existing savepoints and checkpoints.
  • Loading branch information
Kurt Ostfeld committed May 26, 2023
1 parent ac6aedb commit de7dffe
Show file tree
Hide file tree
Showing 101 changed files with 7,979 additions and 172 deletions.
12 changes: 12 additions & 0 deletions docs/layouts/shortcodes/generated/pipeline_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.default-kryo5-serializers</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of pairs of class names and Kryo serializers class names to be used as Kryo 5 default serializers<br /><br />Example:<br /><code class="highlighter-rouge">class:org.example.ExampleClass,serializer:org.example.ExampleSerializer1; class:org.example.ExampleClass2,serializer:org.example.ExampleSerializer2</code></td>
</tr>
<tr>
<td><h5>pipeline.force-avro</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down Expand Up @@ -116,6 +122,12 @@
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-kryo5-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;String&gt;</td>
<td>Semicolon separated list of types to be registered with the serialization stack. If the type is eventually serialized as a POJO, then the type is registered with the POJO serializer. If the type ends up being serialized with Kryo, then it will be registered at Kryo to make sure that only tags are written.</td>
</tr>
<tr>
<td><h5>pipeline.registered-pojo-types</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
8 changes: 4 additions & 4 deletions docs/layouts/shortcodes/generated/rest_v1_dispatcher.html
Original file line number Diff line number Diff line change
Expand Up @@ -1508,7 +1508,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down Expand Up @@ -1573,7 +1573,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down Expand Up @@ -1675,7 +1675,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down Expand Up @@ -2028,7 +2028,7 @@
},
"checkpoint_type" : {
"type" : "string",
"enum" : [ "CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
"enum" : [ "CHECKPOINT", "UNALIGNED_CHECKPOINT", "SAVEPOINT", "SYNC_SAVEPOINT" ]
},
"checkpointed_size" : {
"type" : "integer"
Expand Down
1 change: 1 addition & 0 deletions docs/static/generated/rest_v1_dispatcher.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2729,6 +2729,7 @@ components:
type: string
enum:
- CHECKPOINT
- UNALIGNED_CHECKPOINT
- SAVEPOINT
- SYNC_SAVEPOINT
RestoreMode:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
org.apache.flink.api.common.ExecutionConfig.configure(org.apache.flink.configuration.ReadableConfig, java.lang.ClassLoader): Argument leaf type org.apache.flink.configuration.ReadableConfig does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getClosureCleanerLevel(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getDefaultKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getGlobalJobParameters(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.getRegisteredTypesWithKryoSerializers(): Returned leaf type org.apache.flink.api.common.ExecutionConfig$SerializableSerializer does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setClosureCleanerLevel(org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel): Argument leaf type org.apache.flink.api.common.ExecutionConfig$ClosureCleanerLevel does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.ExecutionConfig.setGlobalJobParameters(org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters): Argument leaf type org.apache.flink.api.common.ExecutionConfig$GlobalJobParameters does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
org.apache.flink.api.common.cache.DistributedCache.parseCachedFilesFromString(java.util.List): Returned leaf type org.apache.flink.api.common.cache.DistributedCache$DistributedCacheEntry does not satisfy: reside outside of package 'org.apache.flink..' or reside in any package ['..shaded..'] or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.table.BinPacking;
import org.apache.flink.connector.file.table.stream.TaskTracker;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.table.stream.PartitionCommitInfo;
import org.apache.flink.connector.file.table.stream.compact.CompactMessages.CompactionUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo5.KryoSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
Expand Down
6 changes: 6 additions & 0 deletions flink-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ under the License.
<!-- managed version -->
</dependency>

<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo5</artifactId>
<!-- managed version -->
</dependency>

<!-- The common collections are needed for some hash tables used in the collection execution -->
<dependency>
<groupId>commons-collections</groupId>
Expand Down
Loading

0 comments on commit de7dffe

Please sign in to comment.