Skip to content

Commit ef09e17

Browse files
kurtostfeldgyfora
authored andcommitted
[FLINK-3154][API] Upgrade from Kryo 2.x to Kryo 5.x. Removed twitter chill companion library which is abandoned.
1 parent d5ffb88 commit ef09e17

File tree

46 files changed

+2274
-425
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2274
-425
lines changed

flink-architecture-tests/flink-architecture-tests-production/archunit-violations/7602816f-5c01-4b7a-9e3e-235dfedec245

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,6 @@ org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer does not satisfy
106106
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializerSnapshot does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
107107
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroKryoSerializerClass does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
108108
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$DummyAvroRegisteredClass does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
109-
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$SpecificInstanceCollectionSerializer does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
110-
org.apache.flink.api.java.typeutils.runtime.kryo.Serializers$SpecificInstanceCollectionSerializerForArrayList does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
111109
org.apache.flink.runtime.io.network.api.CancelCheckpointMarker does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
112110
org.apache.flink.runtime.io.network.api.CheckpointBarrier does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
113111
org.apache.flink.runtime.io.network.api.EndOfData does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated

flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableComparator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.util.InstantiationUtil;
2828

2929
import com.esotericsoftware.kryo.Kryo;
30+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3031
import org.apache.hadoop.io.Writable;
3132
import org.objenesis.strategy.StdInstantiatorStrategy;
3233

@@ -174,12 +175,10 @@ private void checkKryoInitialized() {
174175
if (this.kryo == null) {
175176
this.kryo = new Kryo();
176177

177-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
178-
new Kryo.DefaultInstantiatorStrategy();
178+
DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
179179
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
180180
kryo.setInstantiatorStrategy(instantiatorStrategy);
181181

182-
this.kryo.setAsmEnabled(true);
183182
this.kryo.register(type);
184183
}
185184
}

flink-connectors/flink-hadoop-compatibility/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.util.InstantiationUtil;
2828

2929
import com.esotericsoftware.kryo.Kryo;
30+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3031
import org.apache.hadoop.io.NullWritable;
3132
import org.apache.hadoop.io.Writable;
3233
import org.objenesis.strategy.StdInstantiatorStrategy;
@@ -126,12 +127,10 @@ private void checkKryoInitialized() {
126127
if (this.kryo == null) {
127128
this.kryo = new Kryo();
128129

129-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
130-
new Kryo.DefaultInstantiatorStrategy();
130+
DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
131131
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
132132
kryo.setInstantiatorStrategy(instantiatorStrategy);
133133

134-
this.kryo.setAsmEnabled(true);
135134
this.kryo.register(typeClass);
136135
}
137136
}

flink-core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ under the License.
112112

113113
<!-- for the fallback generic serializer -->
114114
<dependency>
115-
<groupId>com.esotericsoftware.kryo</groupId>
115+
<groupId>com.esotericsoftware</groupId>
116116
<artifactId>kryo</artifactId>
117117
<!-- managed version -->
118118
</dependency>

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoRegistration.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import com.esotericsoftware.kryo.Kryo;
2626
import com.esotericsoftware.kryo.Serializer;
27-
import com.esotericsoftware.kryo.factories.ReflectionSerializerFactory;
27+
import com.esotericsoftware.kryo.SerializerFactory.ReflectionSerializerFactory;
2828

2929
import javax.annotation.Nullable;
3030

@@ -118,7 +118,7 @@ public Serializer<?> getSerializer(Kryo kryo) {
118118
case UNSPECIFIED:
119119
return null;
120120
case CLASS:
121-
return ReflectionSerializerFactory.makeSerializer(
121+
return ReflectionSerializerFactory.newSerializer(
122122
kryo, serializerClass, registeredClass);
123123
case INSTANCE:
124124
return serializableSerializerInstance.getSerializer();

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/NoFetchingInput.java

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
import com.esotericsoftware.kryo.KryoException;
2424
import com.esotericsoftware.kryo.io.Input;
25+
import com.esotericsoftware.kryo.io.KryoBufferUnderflowException;
2526

2627
import java.io.EOFException;
2728
import java.io.IOException;
@@ -33,11 +34,6 @@ public NoFetchingInput(InputStream inputStream) {
3334
super(inputStream, 8);
3435
}
3536

36-
@Override
37-
public boolean eof() {
38-
throw new UnsupportedOperationException("NoFetchingInput does not support EOF.");
39-
}
40-
4137
@Override
4238
public int read() throws KryoException {
4339
require(1);
@@ -60,33 +56,56 @@ public boolean canReadLong() throws KryoException {
6056
* bytes. Thus, it will only load the data which is required and never prefetch data.
6157
*
6258
* @param required the number of bytes being available in the buffer
63-
* @return the number of bytes remaining, which is equal to required
59+
* @return The number of bytes remaining in the buffer, which will be at least <code>required
60+
* </code> bytes.
6461
* @throws KryoException
6562
*/
6663
@Override
6764
protected int require(int required) throws KryoException {
65+
// The main change between this and Kryo 5 Input.require is this will never read more bytes
66+
// than required.
67+
// There are also formatting changes to be compliant with the Flink project styling rules.
68+
int remaining = limit - position;
69+
if (remaining >= required) {
70+
return remaining;
71+
}
6872
if (required > capacity) {
6973
throw new KryoException(
70-
"Buffer too small: capacity: " + capacity + ", " + "required: " + required);
74+
"Buffer too small: capacity: " + capacity + ", required: " + required);
7175
}
7276

73-
position = 0;
74-
int bytesRead = 0;
7577
int count;
76-
while (true) {
77-
count = fill(buffer, bytesRead, required - bytesRead);
78-
78+
// Try to fill the buffer.
79+
if (remaining > 0) {
80+
// Logical change 1 (from Kryo Input.require): "capacity - limit" -> "required - limit"
81+
count = fill(buffer, limit, required - limit);
7982
if (count == -1) {
80-
throw new KryoException(new EOFException("No more bytes left."));
83+
throw new KryoBufferUnderflowException("Buffer underflow.");
8184
}
82-
83-
bytesRead += count;
84-
if (bytesRead == required) {
85-
break;
85+
remaining += count;
86+
if (remaining >= required) {
87+
limit += count;
88+
return remaining;
8689
}
8790
}
88-
limit = required;
89-
return required;
91+
92+
// Was not enough, compact and try again.
93+
System.arraycopy(buffer, position, buffer, 0, remaining);
94+
total += position;
95+
position = 0;
96+
97+
do {
98+
// Logical change 2 (from Kryo Input.require): "capacity - remaining" -> "required -
99+
// remaining"
100+
count = fill(buffer, remaining, required - remaining);
101+
if (count == -1) {
102+
throw new KryoBufferUnderflowException("Buffer underflow.");
103+
}
104+
remaining += count;
105+
} while (remaining < required);
106+
107+
limit = remaining;
108+
return remaining;
90109
}
91110

92111
@Override

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueComparator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.flink.util.InstantiationUtil;
2929

3030
import com.esotericsoftware.kryo.Kryo;
31+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3132
import org.objenesis.strategy.StdInstantiatorStrategy;
3233

3334
import java.io.IOException;
@@ -142,12 +143,10 @@ private void checkKryoInitialized() {
142143
if (this.kryo == null) {
143144
this.kryo = new Kryo();
144145

145-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
146-
new Kryo.DefaultInstantiatorStrategy();
146+
DefaultInstantiatorStrategy instantiatorStrategy = new DefaultInstantiatorStrategy();
147147
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
148148
kryo.setInstantiatorStrategy(instantiatorStrategy);
149149

150-
this.kryo.setAsmEnabled(true);
151150
this.kryo.register(type);
152151
}
153152
}

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.util.InstantiationUtil;
3030

3131
import com.esotericsoftware.kryo.Kryo;
32+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
3233
import org.objenesis.strategy.StdInstantiatorStrategy;
3334

3435
import java.io.IOException;
@@ -138,12 +139,11 @@ private void checkKryoInitialized() {
138139
if (this.kryo == null) {
139140
this.kryo = new Kryo();
140141

141-
Kryo.DefaultInstantiatorStrategy instantiatorStrategy =
142-
new Kryo.DefaultInstantiatorStrategy();
143-
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
144-
kryo.setInstantiatorStrategy(instantiatorStrategy);
142+
DefaultInstantiatorStrategy initStrategy = new DefaultInstantiatorStrategy();
143+
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
144+
kryo.setInstantiatorStrategy(initStrategy);
145145

146-
this.kryo.setAsmEnabled(true);
146+
// this.kryo.setAsmEnabled(true);
147147

148148
KryoUtils.applyRegistrations(
149149
this.kryo, kryoRegistrations.values(), this.kryo.getNextRegistrationId());

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoSerializer.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@
4040
import com.esotericsoftware.kryo.KryoException;
4141
import com.esotericsoftware.kryo.Serializer;
4242
import com.esotericsoftware.kryo.io.Input;
43+
import com.esotericsoftware.kryo.io.KryoBufferUnderflowException;
4344
import com.esotericsoftware.kryo.io.Output;
45+
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
4446
import org.apache.commons.lang3.exception.CloneFailedException;
4547
import org.objenesis.strategy.StdInstantiatorStrategy;
4648
import org.slf4j.Logger;
@@ -344,9 +346,9 @@ public void serialize(T record, DataOutputView target) throws IOException {
344346
kryo.writeClassAndObject(output, record);
345347
output.flush();
346348
} catch (KryoException ke) {
347-
// make sure that the Kryo output buffer is cleared in case that we can recover from
349+
// make sure that the Kryo output buffer is reset in case that we can recover from
348350
// the exception (e.g. EOFException which denotes buffer full)
349-
output.clear();
351+
output.reset();
350352

351353
Throwable cause = ke.getCause();
352354
if (cause instanceof EOFException) {
@@ -380,6 +382,9 @@ public T deserialize(DataInputView source) throws IOException {
380382

381383
try {
382384
return (T) kryo.readClassAndObject(input);
385+
} catch (KryoBufferUnderflowException ke) {
386+
// 2023-04-26: Existing Flink code expects a java.io.EOFException in this scenario
387+
throw new EOFException(ke.getMessage());
383388
} catch (KryoException ke) {
384389
Throwable cause = ke.getCause();
385390

@@ -488,7 +493,7 @@ private Kryo getKryoInstance() {
488493
LOG.info("Kryo serializer scala extensions are not available.");
489494
}
490495

491-
Kryo.DefaultInstantiatorStrategy initStrategy = new Kryo.DefaultInstantiatorStrategy();
496+
DefaultInstantiatorStrategy initStrategy = new DefaultInstantiatorStrategy();
492497
initStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
493498

494499
Kryo kryo = new Kryo();

flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/kryo/Serializers.java

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -32,16 +32,13 @@
3232
import com.esotericsoftware.kryo.Serializer;
3333
import com.esotericsoftware.kryo.io.Input;
3434
import com.esotericsoftware.kryo.io.Output;
35-
import com.esotericsoftware.kryo.serializers.CollectionSerializer;
3635

37-
import java.io.Serializable;
3836
import java.lang.reflect.Field;
3937
import java.lang.reflect.GenericArrayType;
4038
import java.lang.reflect.Modifier;
4139
import java.lang.reflect.ParameterizedType;
4240
import java.lang.reflect.Type;
4341
import java.util.ArrayList;
44-
import java.util.Collection;
4542
import java.util.List;
4643
import java.util.Set;
4744

@@ -171,50 +168,8 @@ public void write(Kryo kryo, Output output, Object o) {
171168
}
172169

173170
@Override
174-
public T read(Kryo kryo, Input input, Class<T> aClass) {
171+
public T read(Kryo kryo, Input input, Class<? extends T> aClass) {
175172
throw new UnsupportedOperationException("Could not find required Avro dependency.");
176173
}
177174
}
178-
179-
// --------------------------------------------------------------------------------------------
180-
// Custom Serializers
181-
// --------------------------------------------------------------------------------------------
182-
183-
/** Special serializer for Java's {@link ArrayList} used for Avro's GenericData.Array. */
184-
@SuppressWarnings("rawtypes")
185-
public static class SpecificInstanceCollectionSerializerForArrayList
186-
extends SpecificInstanceCollectionSerializer<ArrayList> {
187-
private static final long serialVersionUID = 1L;
188-
189-
public SpecificInstanceCollectionSerializerForArrayList() {
190-
super(ArrayList.class);
191-
}
192-
}
193-
194-
/**
195-
* Special serializer for Java collections enforcing certain instance types. Avro is serializing
196-
* collections with an "GenericData.Array" type. Kryo is not able to handle this type, so we use
197-
* ArrayLists.
198-
*/
199-
@SuppressWarnings("rawtypes")
200-
public static class SpecificInstanceCollectionSerializer<T extends Collection>
201-
extends CollectionSerializer implements Serializable {
202-
private static final long serialVersionUID = 1L;
203-
204-
private Class<T> type;
205-
206-
public SpecificInstanceCollectionSerializer(Class<T> type) {
207-
this.type = type;
208-
}
209-
210-
@Override
211-
protected Collection create(Kryo kryo, Input input, Class<Collection> type) {
212-
return kryo.newInstance(this.type);
213-
}
214-
215-
@Override
216-
protected Collection createCopy(Kryo kryo, Collection original) {
217-
return kryo.newInstance(this.type);
218-
}
219-
}
220175
}

0 commit comments

Comments
 (0)