diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java index a381d573c41d5..3ab136d451f42 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesRuntimeFunctions.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.flink.table.connector.RuntimeConverter; import org.apache.flink.table.connector.sink.DynamicTableSink.DataStructureConverter; import org.apache.flink.table.connector.source.LookupTableSource; import org.apache.flink.table.data.GenericRowData; @@ -631,11 +632,11 @@ protected TestValuesLookupFunction( public void open(FunctionContext context) throws Exception { RESOURCE_COUNTER.incrementAndGet(); isOpenCalled = true; + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (projectable) { - projection = - generatedProjection.newInstance( - Thread.currentThread().getContextClassLoader()); + projection = generatedProjection.newInstance(classLoader); } + converter.open(RuntimeConverter.Context.create(classLoader)); rowSerializer = InternalSerializers.create(producedRowType); indexDataByKey(); } @@ -725,11 +726,11 @@ protected AsyncTestValueLookupFunction( @Override public void open(FunctionContext context) throws Exception { RESOURCE_COUNTER.incrementAndGet(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); if (projectable) { - projection = - generatedProjection.newInstance( - Thread.currentThread().getContextClassLoader()); + projection = generatedProjection.newInstance(classLoader); } + converter.open(RuntimeConverter.Context.create(classLoader)); rowSerializer = InternalSerializers.create(producedRowType); isOpenCalled = true; // generate unordered result for async lookup