From 4d8c993a4b7615664eb5200c25da6e332657ec8b Mon Sep 17 00:00:00 2001 From: Gabor Somogyi Date: Mon, 20 Jan 2025 16:48:52 +0100 Subject: [PATCH] [FLINK-36929][table] Add SQL connector for keyed savepoint data --- .../flink-state-processing-api/pom.xml | 16 +- .../flink/state/table/KeyedStateReader.java | 376 ++++++++++++++++++ .../table/SavepointConnectorOptions.java | 139 +++++++ .../table/SavepointConnectorOptionsUtil.java | 49 +++ .../SavepointDataStreamScanProvider.java | 139 +++++++ .../table/SavepointDynamicTableSource.java | 85 ++++ .../SavepointDynamicTableSourceFactory.java | 355 +++++++++++++++++ .../table/StateValueColumnConfiguration.java | 79 ++++ .../org.apache.flink.table.factories.Factory | 16 + .../state/writer/job/schema/PojoData.java | 33 ++ .../SavepointDynamicTableSourceTest.java | 172 ++++++++ .../resources/table-state-nulls/_metadata | Bin 0 -> 24017 bytes .../src/test/resources/table-state/_metadata | Bin 0 -> 22390 bytes 13 files changed, 1458 insertions(+), 1 deletion(-) create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/KeyedStateReader.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptionsUtil.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java create mode 100644 flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/com/example/state/writer/job/schema/PojoData.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java create mode 100644 flink-libraries/flink-state-processing-api/src/test/resources/table-state-nulls/_metadata create mode 100644 flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata diff --git a/flink-libraries/flink-state-processing-api/pom.xml b/flink-libraries/flink-state-processing-api/pom.xml index 8e087770421fe..691c30a9ef43a 100644 --- a/flink-libraries/flink-state-processing-api/pom.xml +++ b/flink-libraries/flink-state-processing-api/pom.xml @@ -53,7 +53,14 @@ under the License. ${project.version} provided - + + + org.apache.flink + flink-table-runtime + ${project.version} + provided + + @@ -78,6 +85,13 @@ under the License. test + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${project.version} + test + + org.apache.flink flink-runtime diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/KeyedStateReader.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/KeyedStateReader.java new file mode 100644 index 0000000000000..182794485121e --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/KeyedStateReader.java @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.common.functions.OpenContext; +import org.apache.flink.api.common.state.ListState; +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapState; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.ValueState; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.state.api.functions.KeyedStateReaderFunction; +import org.apache.flink.table.data.DecimalData; +import org.apache.flink.table.data.GenericArrayData; +import org.apache.flink.table.data.GenericMapData; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.DecimalType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.types.RowKind; +import org.apache.flink.util.Collector; + +import org.apache.flink.shaded.guava32.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava32.com.google.common.cache.CacheBuilder; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.stream.StreamSupport; + +/** Keyed state reader function for value, list and map state types. */ +@SuppressWarnings({"rawtypes", "unchecked"}) +public class KeyedStateReader extends KeyedStateReaderFunction { + private static final long CACHE_MAX_SIZE = 64000L; + + private final Tuple2> keyValueProjections; + private final RowType rowType; + private final Map states = new HashMap<>(); + private final Cache, Field> classFieldCache; + private final Cache, Method> classMethodCache; + + public KeyedStateReader( + final RowType rowType, + final Tuple2> keyValueProjections) { + this.keyValueProjections = keyValueProjections; + this.rowType = rowType; + this.classMethodCache = CacheBuilder.newBuilder().maximumSize(CACHE_MAX_SIZE).build(); + this.classFieldCache = CacheBuilder.newBuilder().maximumSize(CACHE_MAX_SIZE).build(); + } + + @Override + public void open(OpenContext openContext) throws Exception { + for (StateValueColumnConfiguration columnConfig : keyValueProjections.f1) { + switch (columnConfig.getStateType()) { + case VALUE: + states.put( + columnConfig.getColumnIndex(), + getRuntimeContext() + .getState( + (ValueStateDescriptor) + columnConfig.getStateDescriptor())); + break; + + case LIST: + states.put( + columnConfig.getColumnIndex(), + getRuntimeContext() + .getListState( + (ListStateDescriptor) + columnConfig.getStateDescriptor())); + break; + + case MAP: + states.put( + columnConfig.getColumnIndex(), + getRuntimeContext() + .getMapState( + (MapStateDescriptor) + columnConfig.getStateDescriptor())); + break; + + default: + throw new UnsupportedOperationException( + "Unsupported state type: " + columnConfig.getStateType()); + } + } + } + + @Override + public void close() { + states.clear(); + } + + @Override + public void readKey(Object key, Context context, Collector out) throws Exception { + GenericRowData row = new GenericRowData(RowKind.INSERT, 1 + keyValueProjections.f1.size()); + + List fields = rowType.getFields(); + + // Fill column from key + int columnIndex = keyValueProjections.f0; + LogicalType keyLogicalType = fields.get(columnIndex).getType(); + row.setField(columnIndex, getValue(keyLogicalType, key)); + + // Fill columns from values + for (StateValueColumnConfiguration columnConfig : keyValueProjections.f1) { + LogicalType valueLogicalType = fields.get(columnConfig.getColumnIndex()).getType(); + switch (columnConfig.getStateType()) { + case VALUE: + row.setField( + columnConfig.getColumnIndex(), + getValue( + valueLogicalType, + ((ValueState) states.get(columnConfig.getColumnIndex())) + .value())); + break; + + case LIST: + row.setField( + columnConfig.getColumnIndex(), + getValue( + valueLogicalType, + ((ListState) states.get(columnConfig.getColumnIndex())).get())); + break; + + case MAP: + row.setField( + columnConfig.getColumnIndex(), + getValue( + valueLogicalType, + ((MapState) states.get(columnConfig.getColumnIndex())) + .entries())); + break; + + default: + throw new UnsupportedOperationException( + "Unsupported state type: " + columnConfig.getStateType()); + } + } + + out.collect(row); + } + + private Object getValue(LogicalType logicalType, Object object) { + if (object == null) { + return null; + } + switch (logicalType.getTypeRoot()) { + case CHAR: // String + case VARCHAR: // String + return StringData.fromString(object.toString()); + + case BOOLEAN: // Boolean + return object; + + case BINARY: // byte[] + case VARBINARY: // ByteBuffer, byte[] + return convertToBytes(object); + + case DECIMAL: // BigDecimal, ByteBuffer, byte[] + return convertToDecimal(object, logicalType); + + case TINYINT: // Byte + case SMALLINT: // Short + case INTEGER: // Integer + case BIGINT: // Long + case FLOAT: // Float + case DOUBLE: // Double + case DATE: // Integer + return object; + + case INTERVAL_YEAR_MONTH: // Long + case INTERVAL_DAY_TIME: // Long + return object; + + case ARRAY: + return convertToArray(object, logicalType); + + case MAP: + return convertToMap(object, logicalType); + + case ROW: + return convertToRow(object, logicalType); + + case NULL: + return null; + + case MULTISET: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + case RAW: + case SYMBOL: + case UNRESOLVED: + default: + throw new UnsupportedOperationException("Unsupported type: " + logicalType); + } + } + + private Object getObjectField(Object object, RowType.RowField rowField) { + String rowFieldName = rowField.getName(); + + Class objectClass = object.getClass(); + Object objectField; + try { + Field field = + classFieldCache.get( + Tuple2.of(objectClass, rowFieldName), + () -> objectClass.getField(rowFieldName)); + objectField = field.get(object); + } catch (ExecutionException e1) { + Method method = getMethod(objectClass, rowFieldName); + try { + objectField = method.invoke(object); + } catch (IllegalAccessException | InvocationTargetException e2) { + throw new RuntimeException(e2); + } + } catch (IllegalAccessException e) { + throw new UnsupportedOperationException( + "Cannot access field by either public member or getter function: " + + rowField.getName()); + } + + return objectField; + } + + private Method getMethod(Class objectClass, String rowFieldName) { + String upperRowFieldName = + rowFieldName.substring(0, 1).toUpperCase() + rowFieldName.substring(1); + try { + String methodName = "get" + upperRowFieldName; + return classMethodCache.get( + Tuple2.of(objectClass, methodName), () -> objectClass.getMethod(methodName)); + } catch (ExecutionException e1) { + try { + String methodName = "is" + upperRowFieldName; + return classMethodCache.get( + Tuple2.of(objectClass, methodName), + () -> objectClass.getMethod(methodName)); + } catch (ExecutionException e2) { + throw new RuntimeException(e2); + } + } + } + + private static DecimalData convertToDecimal(Object object, LogicalType logicalType) { + DecimalType decimalType = (DecimalType) logicalType; + + final int precision = decimalType.getPrecision(); + final int scale = decimalType.getScale(); + if (object instanceof BigDecimal) { + return DecimalData.fromBigDecimal((BigDecimal) object, precision, scale); + } else if (object instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) object; + final byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return DecimalData.fromUnscaledBytes(bytes, precision, scale); + } else if (object instanceof byte[]) { + final byte[] bytes = (byte[]) object; + return DecimalData.fromUnscaledBytes(bytes, precision, scale); + } else { + throw new UnsupportedOperationException( + "Decimal conversion supports only BigDecimal, ByteBuffer and byte[] but received: " + + object.getClass().getName()); + } + } + + private byte[] convertToBytes(Object object) { + if (object instanceof ByteBuffer) { + ByteBuffer byteBuffer = (ByteBuffer) object; + byte[] bytes = new byte[byteBuffer.remaining()]; + byteBuffer.get(bytes); + return bytes; + } else if (object instanceof byte[]) { + return (byte[]) object; + } else { + throw new UnsupportedOperationException( + "Byte array conversion supports only ByteBuffer and byte[] but received: " + + object.getClass().getName()); + } + } + + private GenericArrayData convertToArray(Object object, LogicalType logicalType) { + LogicalType elementLogicalType = ((ArrayType) logicalType).getElementType(); + + if (object instanceof Iterable) { + Iterable iterable = (Iterable) object; + return new GenericArrayData( + StreamSupport.stream(iterable.spliterator(), false) + .map(v -> getValue(elementLogicalType, v)) + .toArray()); + } else { + throw new UnsupportedOperationException( + "Array conversion supports only Iterable but received: " + + object.getClass().getName()); + } + } + + private GenericMapData convertToMap(Object object, LogicalType logicalType) { + MapType mapType = (MapType) logicalType; + LogicalType keyLogicalType = mapType.getKeyType(); + LogicalType valueLogicalType = mapType.getValueType(); + + if (object instanceof Iterable) { + Iterable iterable = (Iterable) object; + Iterator iterator = iterable.iterator(); + Map result = new HashMap<>(); + boolean typeChecked = false; + while (iterator.hasNext()) { + Object e = iterator.next(); + // The boolean check here is for performance tuning because instanceof is slow, and + // it's enough to check the type only once. + if (!typeChecked && !(e instanceof Map.Entry)) { + throw new UnsupportedOperationException( + "Map conversion supports only Iterable but received: " + + object.getClass().getName()); + } else { + typeChecked = true; + } + Map.Entry entry = (Map.Entry) e; + result.put( + getValue(keyLogicalType, entry.getKey()), + getValue(valueLogicalType, entry.getValue())); + } + return new GenericMapData(result); + } else { + throw new UnsupportedOperationException( + "Map conversion supports only Iterable but received: " + + object.getClass().getName()); + } + } + + private GenericRowData convertToRow(Object object, LogicalType logicalType) { + RowType rowType = (RowType) logicalType; + GenericRowData result = new GenericRowData(RowKind.INSERT, rowType.getFieldCount()); + List fields = rowType.getFields(); + for (int i = 0; i < rowType.getFieldCount(); i++) { + RowType.RowField subRowField = fields.get(i); + Object subObject = getObjectField(object, subRowField); + result.setField(i, getValue(subRowField.getType(), subObject)); + } + return result; + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java new file mode 100644 index 0000000000000..1c596ad4b49a4 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptions.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.configuration.description.Description; +import org.apache.flink.configuration.description.TextElement; + +import static org.apache.flink.configuration.description.TextElement.code; + +/** Options for the state connector. */ +@PublicEvolving +public class SavepointConnectorOptions { + + public static final String FIELDS = "fields"; + public static final String STATE_NAME = "state-name"; + public static final String STATE_TYPE = "state-type"; + public static final String MAP_KEY_FORMAT = "map-key-format"; + public static final String VALUE_FORMAT = "value-format"; + + /** Value state types. */ + public enum StateType { + VALUE, + LIST, + MAP + } + + // -------------------------------------------------------------------------------------------- + // Common options + // -------------------------------------------------------------------------------------------- + + public static final ConfigOption STATE_BACKEND_TYPE = + ConfigOptions.key("state.backend.type") + .stringType() + .noDefaultValue() + .withDescription( + Description.builder() + .text("The state backend to be used to read state.") + .linebreak() + .text( + "The implementation can be specified either via their shortcut " + + " name, or via the class name of a %s. " + + "If a factory is specified it is instantiated via its " + + "zero argument constructor and its %s " + + "method is called.", + TextElement.code("StateBackendFactory"), + TextElement.code( + "StateBackendFactory#createFromConfig(ReadableConfig, ClassLoader)")) + .linebreak() + .text( + "Recognized shortcut names are 'hashmap', 'rocksdb' and 'forst'.") + .build()); + + public static final ConfigOption STATE_PATH = + ConfigOptions.key("state.path") + .stringType() + .noDefaultValue() + .withDescription( + "Defines the state path which must be used for state reading."); + + public static final ConfigOption OPERATOR_UID = + ConfigOptions.key("operator.uid") + .stringType() + .noDefaultValue() + .withDescription( + "Defines the operator UID which must be used for state reading (Can't be used together with UID hash)."); + + public static final ConfigOption OPERATOR_UID_HASH = + ConfigOptions.key("operator.uid.hash") + .stringType() + .noDefaultValue() + .withDescription( + "Defines the operator UID hash which must be used for state reading (Can't be used together with UID)."); + + // -------------------------------------------------------------------------------------------- + // Value options + // -------------------------------------------------------------------------------------------- + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption STATE_NAME_PLACEHOLDER = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, STATE_NAME)) + .stringType() + .noDefaultValue() + .withDescription( + "Defines the state name which must be used for state reading."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption STATE_TYPE_PLACEHOLDER = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, STATE_TYPE)) + .enumType(StateType.class) + .noDefaultValue() + .withDescription( + Description.builder() + .text( + "Defines the state type which must be used for state reading, including %s, %s and %s. " + + "When it's not provided then it tries to be inferred from the SQL type (ARRAY=list, MAP=map, all others=value).", + code(StateType.VALUE.toString()), + code(StateType.LIST.toString()), + code(StateType.MAP.toString())) + .build()); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption MAP_KEY_FORMAT_PLACEHOLDER = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, MAP_KEY_FORMAT)) + .stringType() + .noDefaultValue() + .withDescription( + "Defines the format class scheme for decoding map value key data. " + + "When it's not provided then it tries to be inferred from the SQL type (only primitive types supported)."); + + /** Placeholder {@link ConfigOption}. Not used for retrieving values. */ + public static final ConfigOption VALUE_FORMAT_PLACEHOLDER = + ConfigOptions.key(String.format("%s.#.%s", FIELDS, VALUE_FORMAT)) + .stringType() + .noDefaultValue() + .withDescription( + "Defines the format class scheme for decoding value data. " + + "When it's not provided then it tries to be inferred from the SQL type (only primitive types supported)."); + + private SavepointConnectorOptions() {} +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptionsUtil.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptionsUtil.java new file mode 100644 index 0000000000000..bdaba67411b79 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointConnectorOptionsUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.state.api.OperatorIdentifier; + +import java.util.Optional; + +import static org.apache.flink.state.table.SavepointConnectorOptions.OPERATOR_UID; +import static org.apache.flink.state.table.SavepointConnectorOptions.OPERATOR_UID_HASH; + +/** Utilities for {@link SavepointConnectorOptions}. */ +@PublicEvolving +public class SavepointConnectorOptionsUtil { + + public static OperatorIdentifier getOperatorIdentifier(ReadableConfig options) { + final Optional operatorUid = options.getOptional(OPERATOR_UID); + final Optional operatorUidHash = options.getOptional(OPERATOR_UID_HASH); + + if (operatorUid.isPresent() == operatorUidHash.isPresent()) { + throw new IllegalArgumentException( + "Either operator uid or operator uid hash must be specified."); + } + + return operatorUid + .map(OperatorIdentifier::forUid) + .orElseGet(() -> OperatorIdentifier.forUidHash(operatorUidHash.get())); + } + + private SavepointConnectorOptionsUtil() {} +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java new file mode 100644 index 0000000000000..e71bff21329a4 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDataStreamScanProvider.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.common.state.ListStateDescriptor; +import org.apache.flink.api.common.state.MapStateDescriptor; +import org.apache.flink.api.common.state.ValueStateDescriptor; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.StateBackendOptions; +import org.apache.flink.runtime.state.StateBackend; +import org.apache.flink.runtime.state.StateBackendLoader; +import org.apache.flink.state.api.OperatorIdentifier; +import org.apache.flink.state.api.SavepointReader; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.connector.ProviderContext; +import org.apache.flink.table.connector.source.DataStreamScanProvider; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; + +import javax.naming.ConfigurationException; + +import java.util.List; + +/** State data stream scan provider. */ +public class SavepointDataStreamScanProvider implements DataStreamScanProvider { + private final String stateBackendType; + private final String statePath; + private final OperatorIdentifier operatorIdentifier; + private final String keyFormat; + private final Tuple2> keyValueProjections; + private final RowType rowType; + + public SavepointDataStreamScanProvider( + final String stateBackendType, + final String statePath, + final OperatorIdentifier operatorIdentifier, + final String keyFormat, + final Tuple2> keyValueProjections, + RowType rowType) { + this.stateBackendType = stateBackendType; + this.statePath = statePath; + this.operatorIdentifier = operatorIdentifier; + this.keyFormat = keyFormat; + this.keyValueProjections = keyValueProjections; + this.rowType = rowType; + } + + @Override + public boolean isBounded() { + return true; + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public DataStream produceDataStream( + ProviderContext providerContext, StreamExecutionEnvironment execEnv) { + try { + Configuration configuration = new Configuration(); + configuration.set(StateBackendOptions.STATE_BACKEND, stateBackendType); + StateBackend stateBackend = + StateBackendLoader.loadStateBackendFromConfig( + configuration, getClass().getClassLoader(), null); + + SavepointReader savepointReader = + SavepointReader.read(execEnv, statePath, stateBackend); + + // Get key type information + TypeInformation keyTypeInfo = TypeInformation.of(Class.forName(keyFormat)); + + // Get value state descriptors + for (StateValueColumnConfiguration columnConfig : keyValueProjections.f1) { + TypeInformation valueTypeInfo = + TypeInformation.of(Class.forName(columnConfig.getValueFormat())); + + switch (columnConfig.getStateType()) { + case VALUE: + columnConfig.setStateDescriptor( + new ValueStateDescriptor<>( + columnConfig.getStateName(), valueTypeInfo)); + break; + + case LIST: + columnConfig.setStateDescriptor( + new ListStateDescriptor<>( + columnConfig.getStateName(), valueTypeInfo)); + break; + + case MAP: + if (columnConfig.getMapKeyFormat() == null) { + throw new ConfigurationException( + "Map key format is required for map state"); + } + TypeInformation mapKeyTypeInfo = + TypeInformation.of(Class.forName(columnConfig.getMapKeyFormat())); + columnConfig.setStateDescriptor( + new MapStateDescriptor<>( + columnConfig.getStateName(), + mapKeyTypeInfo, + valueTypeInfo)); + break; + + default: + throw new UnsupportedOperationException( + "Unsupported state type: " + columnConfig.getStateType()); + } + } + + TypeInformation outTypeInfo = InternalTypeInfo.of(rowType); + + return savepointReader.readKeyedState( + operatorIdentifier, + new KeyedStateReader(rowType, keyValueProjections), + keyTypeInfo, + outTypeInfo); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java new file mode 100644 index 0000000000000..e22b3ea9f5aba --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.state.api.OperatorIdentifier; +import org.apache.flink.table.connector.ChangelogMode; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.ScanTableSource; +import org.apache.flink.table.types.logical.RowType; + +import java.util.List; + +/** State dynamic source. */ +public class SavepointDynamicTableSource implements ScanTableSource { + private final String stateBackendType; + private final String statePath; + private final OperatorIdentifier operatorIdentifier; + private final String keyFormat; + private final Tuple2> keyValueProjections; + private final RowType rowType; + + public SavepointDynamicTableSource( + final String stateBackendType, + final String statePath, + final OperatorIdentifier operatorIdentifier, + final String keyFormat, + final Tuple2> keyValueProjections, + RowType rowType) { + this.stateBackendType = stateBackendType; + this.statePath = statePath; + this.operatorIdentifier = operatorIdentifier; + this.keyValueProjections = keyValueProjections; + this.keyFormat = keyFormat; + this.rowType = rowType; + } + + @Override + public DynamicTableSource copy() { + return new SavepointDynamicTableSource( + stateBackendType, + statePath, + operatorIdentifier, + keyFormat, + keyValueProjections, + rowType); + } + + @Override + public String asSummaryString() { + return "State Table Source"; + } + + @Override + public ChangelogMode getChangelogMode() { + return ChangelogMode.insertOnly(); + } + + @Override + public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { + return new SavepointDataStreamScanProvider( + stateBackendType, + statePath, + operatorIdentifier, + keyFormat, + keyValueProjections, + rowType); + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java new file mode 100644 index 0000000000000..8cdbb1a2c99dc --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/SavepointDynamicTableSourceFactory.java @@ -0,0 +1,355 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.state.api.OperatorIdentifier; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ResolvedCatalogTable; +import org.apache.flink.table.catalog.ResolvedSchema; +import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.factories.DynamicTableSourceFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.ArrayType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.MapType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nullable; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.configuration.ConfigOptions.key; +import static org.apache.flink.state.table.SavepointConnectorOptions.FIELDS; +import static org.apache.flink.state.table.SavepointConnectorOptions.MAP_KEY_FORMAT; +import static org.apache.flink.state.table.SavepointConnectorOptions.MAP_KEY_FORMAT_PLACEHOLDER; +import static org.apache.flink.state.table.SavepointConnectorOptions.OPERATOR_UID; +import static org.apache.flink.state.table.SavepointConnectorOptions.OPERATOR_UID_HASH; +import static org.apache.flink.state.table.SavepointConnectorOptions.STATE_BACKEND_TYPE; +import static org.apache.flink.state.table.SavepointConnectorOptions.STATE_NAME; +import static org.apache.flink.state.table.SavepointConnectorOptions.STATE_NAME_PLACEHOLDER; +import static org.apache.flink.state.table.SavepointConnectorOptions.STATE_PATH; +import static org.apache.flink.state.table.SavepointConnectorOptions.STATE_TYPE; +import static org.apache.flink.state.table.SavepointConnectorOptions.STATE_TYPE_PLACEHOLDER; +import static org.apache.flink.state.table.SavepointConnectorOptions.VALUE_FORMAT; +import static org.apache.flink.state.table.SavepointConnectorOptions.VALUE_FORMAT_PLACEHOLDER; +import static org.apache.flink.state.table.SavepointConnectorOptionsUtil.getOperatorIdentifier; +import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR; + +/** Dynamic source factory for {@link SavepointDynamicTableSource}. */ +public class SavepointDynamicTableSourceFactory implements DynamicTableSourceFactory { + @Override + public DynamicTableSource createDynamicTableSource(Context context) { + Configuration options = new Configuration(); + context.getCatalogTable().getOptions().forEach(options::setString); + + final String stateBackendType = options.get(STATE_BACKEND_TYPE); + final String statePath = options.get(STATE_PATH); + final OperatorIdentifier operatorIdentifier = getOperatorIdentifier(options); + + final Tuple2 keyValueProjections = + createKeyValueProjections(context.getCatalogTable()); + + LogicalType logicalType = context.getPhysicalRowDataType().getLogicalType(); + Preconditions.checkArgument(logicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); + RowType rowType = (RowType) logicalType; + + Set> requiredOptions = new HashSet<>(requiredOptions()); + Set> optionalOptions = new HashSet<>(optionalOptions()); + + RowType.RowField keyRowField = rowType.getFields().get(keyValueProjections.f0); + ConfigOption keyFormatOption = + key(String.format("%s.%s.%s", FIELDS, keyRowField.getName(), VALUE_FORMAT)) + .stringType() + .noDefaultValue(); + optionalOptions.add(keyFormatOption); + final String keyFormat = + options.getOptional(keyFormatOption) + .orElseGet( + () -> + inferStateValueFormat( + keyRowField.getName(), keyRowField.getType())); + + final Tuple2> keyValueConfigProjections = + Tuple2.of( + keyValueProjections.f0, + Arrays.stream(keyValueProjections.f1) + .mapToObj( + columnIndex -> { + RowType.RowField valueRowField = + rowType.getFields().get(columnIndex); + + ConfigOption stateNameOption = + key(String.format( + "%s.%s.%s", + FIELDS, + valueRowField.getName(), + STATE_NAME)) + .stringType() + .noDefaultValue(); + optionalOptions.add(stateNameOption); + + ConfigOption + stateTypeOption = + key(String.format( + "%s.%s.%s", + FIELDS, + valueRowField.getName(), + STATE_TYPE)) + .enumType( + SavepointConnectorOptions + .StateType + .class) + .noDefaultValue(); + optionalOptions.add(stateTypeOption); + + ConfigOption mapKeyFormatOption = + key(String.format( + "%s.%s.%s", + FIELDS, + valueRowField.getName(), + MAP_KEY_FORMAT)) + .stringType() + .noDefaultValue(); + optionalOptions.add(mapKeyFormatOption); + + ConfigOption valueFormatOption = + key(String.format( + "%s.%s.%s", + FIELDS, + valueRowField.getName(), + VALUE_FORMAT)) + .stringType() + .noDefaultValue(); + optionalOptions.add(valueFormatOption); + + LogicalType valueLogicalType = valueRowField.getType(); + return new StateValueColumnConfiguration( + columnIndex, + options.getOptional(stateNameOption) + .orElse(valueRowField.getName()), + options.getOptional(stateTypeOption) + .orElseGet( + () -> + inferStateType( + valueLogicalType)), + options.getOptional(mapKeyFormatOption) + .orElseGet( + () -> + inferStateMapKeyFormat( + valueRowField + .getName(), + valueLogicalType)), + options.getOptional(valueFormatOption) + .orElseGet( + () -> + inferStateValueFormat( + valueRowField + .getName(), + valueLogicalType))); + }) + .collect(Collectors.toList())); + FactoryUtil.validateFactoryOptions(requiredOptions, optionalOptions, options); + + Set consumedOptionKeys = new HashSet<>(); + consumedOptionKeys.add(CONNECTOR.key()); + requiredOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add); + optionalOptions.stream().map(ConfigOption::key).forEach(consumedOptionKeys::add); + FactoryUtil.validateUnconsumedKeys( + factoryIdentifier(), options.keySet(), consumedOptionKeys); + + return new SavepointDynamicTableSource( + stateBackendType, + statePath, + operatorIdentifier, + keyFormat, + keyValueConfigProjections, + rowType); + } + + private Tuple2 createKeyValueProjections(ResolvedCatalogTable catalogTable) { + ResolvedSchema schema = catalogTable.getResolvedSchema(); + if (schema.getPrimaryKey().isEmpty()) { + throw new ValidationException("Could not find the primary key in the table schema."); + } + + List keyFields = schema.getPrimaryKey().get().getColumns(); + if (keyFields.size() != 1) { + throw new ValidationException( + "Only a single primary key must be defined in the table schema."); + } + + DataType physicalDataType = schema.toPhysicalRowDataType(); + int keyProjection = createKeyFormatProjection(physicalDataType, keyFields.get(0)); + int[] valueProjection = createValueFormatProjection(physicalDataType, keyProjection); + + return Tuple2.of(keyProjection, valueProjection); + } + + private int createKeyFormatProjection(DataType physicalDataType, String keyField) { + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + physicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); + final List physicalFields = LogicalTypeChecks.getFieldNames(physicalType); + return physicalFields.indexOf(keyField); + } + + private int[] createValueFormatProjection(DataType physicalDataType, int keyProjection) { + final LogicalType physicalType = physicalDataType.getLogicalType(); + Preconditions.checkArgument( + physicalType.is(LogicalTypeRoot.ROW), "Row data type expected."); + final int physicalFieldCount = LogicalTypeChecks.getFieldCount(physicalType); + final IntStream physicalFields = IntStream.range(0, physicalFieldCount); + + return physicalFields.filter(pos -> keyProjection != pos).toArray(); + } + + private SavepointConnectorOptions.StateType inferStateType(LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case ARRAY: + return SavepointConnectorOptions.StateType.LIST; + + case MAP: + return SavepointConnectorOptions.StateType.MAP; + + default: + return SavepointConnectorOptions.StateType.VALUE; + } + } + + @Nullable + private String inferStateMapKeyFormat(String columnName, LogicalType logicalType) { + return logicalType.is(LogicalTypeRoot.MAP) + ? inferStateValueFormat(columnName, ((MapType) logicalType).getKeyType()) + : null; + } + + private String inferStateValueFormat(String columnName, LogicalType logicalType) { + switch (logicalType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return String.class.getName(); + + case BOOLEAN: + return Boolean.class.getName(); + + case BINARY: + case VARBINARY: + return byte[].class.getName(); + + case DECIMAL: + return BigDecimal.class.getName(); + + case TINYINT: + return Byte.class.getName(); + + case SMALLINT: + return Short.class.getName(); + + case INTEGER: + return Integer.class.getName(); + + case BIGINT: + return Long.class.getName(); + + case FLOAT: + return Float.class.getName(); + + case DOUBLE: + return Double.class.getName(); + + case DATE: + return Integer.class.getName(); + + case INTERVAL_YEAR_MONTH: + case INTERVAL_DAY_TIME: + return Long.class.getName(); + + case ARRAY: + return inferStateValueFormat( + columnName, ((ArrayType) logicalType).getElementType()); + + case MAP: + return inferStateValueFormat(columnName, ((MapType) logicalType).getValueType()); + + case NULL: + return null; + + case ROW: + case MULTISET: + case TIME_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITHOUT_TIME_ZONE: + case TIMESTAMP_WITH_TIME_ZONE: + case TIMESTAMP_WITH_LOCAL_TIME_ZONE: + case DISTINCT_TYPE: + case STRUCTURED_TYPE: + case RAW: + case SYMBOL: + case UNRESOLVED: + default: + throw new UnsupportedOperationException( + String.format( + "Unable to infer state format for SQL type: %s in column: %s. " + + "Please override the type with the following config parameter: %s.%s.%s", + logicalType, columnName, FIELDS, columnName, VALUE_FORMAT)); + } + } + + @Override + public String factoryIdentifier() { + return "savepoint"; + } + + @Override + public Set> requiredOptions() { + final Set> options = new HashSet<>(); + options.add(STATE_BACKEND_TYPE); + options.add(STATE_PATH); + return options; + } + + @Override + public Set> optionalOptions() { + final Set> options = new HashSet<>(); + + // Either UID or hash + options.add(OPERATOR_UID); + options.add(OPERATOR_UID_HASH); + + // Multiple values can be read so registering placeholders + options.add(STATE_NAME_PLACEHOLDER); + options.add(STATE_TYPE_PLACEHOLDER); + options.add(MAP_KEY_FORMAT_PLACEHOLDER); + options.add(VALUE_FORMAT_PLACEHOLDER); + + return options; + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java new file mode 100644 index 0000000000000..b572f1a564a3e --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/table/StateValueColumnConfiguration.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.common.state.StateDescriptor; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** Configuration for SQL state columns. */ +@SuppressWarnings("rawtypes") +public class StateValueColumnConfiguration implements Serializable { + private final int columnIndex; + private final String stateName; + private final SavepointConnectorOptions.StateType stateType; + @Nullable private final String mapKeyFormat; + private final String valueFormat; + @Nullable private StateDescriptor stateDescriptor; + + public StateValueColumnConfiguration( + int columnIndex, + final String stateName, + final SavepointConnectorOptions.StateType stateType, + @Nullable final String mapKeyFormat, + @Nullable final String valueFormat) { + this.columnIndex = columnIndex; + this.stateName = stateName; + this.stateType = stateType; + this.mapKeyFormat = mapKeyFormat; + this.valueFormat = valueFormat; + } + + public int getColumnIndex() { + return columnIndex; + } + + public String getStateName() { + return stateName; + } + + public SavepointConnectorOptions.StateType getStateType() { + return stateType; + } + + @Nullable + public String getMapKeyFormat() { + return mapKeyFormat; + } + + public String getValueFormat() { + return valueFormat; + } + + public void setStateDescriptor(StateDescriptor stateDescriptor) { + this.stateDescriptor = stateDescriptor; + } + + @Nullable + public StateDescriptor getStateDescriptor() { + return stateDescriptor; + } +} diff --git a/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory new file mode 100644 index 0000000000000..730e50e185e9a --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.flink.state.table.SavepointDynamicTableSourceFactory diff --git a/flink-libraries/flink-state-processing-api/src/test/java/com/example/state/writer/job/schema/PojoData.java b/flink-libraries/flink-state-processing-api/src/test/java/com/example/state/writer/job/schema/PojoData.java new file mode 100644 index 0000000000000..74cc20d07bd41 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/com/example/state/writer/job/schema/PojoData.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.example.state.writer.job.schema; + +/** Example POJO data to deserialize state. */ +public class PojoData { + private Long privateLong; + public Long publicLong; + + public Long getPrivateLong() { + return privateLong; + } + + public void setPrivateLong(Long privateLong) { + this.privateLong = privateLong; + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java new file mode 100644 index 0000000000000..c31efeafbebe1 --- /dev/null +++ b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/table/SavepointDynamicTableSourceTest.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.state.table; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; +import org.apache.flink.types.Row; + +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.LongStream; + +import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for the state SQL reader. */ +public class SavepointDynamicTableSourceTest { + @Test + @SuppressWarnings("unchecked") + public void testReadKeyedState() throws Exception { + Configuration config = new Configuration(); + config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + final String sql = + "CREATE TABLE state_table (\n" + + " k bigint,\n" + + " KeyedPrimitiveValue bigint,\n" + + " KeyedPojoValue ROW,\n" + + " KeyedPrimitiveValueList ARRAY,\n" + + " KeyedPrimitiveValueMap MAP,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ")\n" + + "with (\n" + + " 'connector' = 'savepoint',\n" + + " 'state.backend.type' = 'hashmap',\n" + + " 'state.path' = 'src/test/resources/table-state',\n" + + " 'operator.uid' = 'keyed-state-process-uid',\n" + + " 'fields.KeyedPojoValue.value-format' = 'com.example.state.writer.job.schema.PojoData'\n" + + ")"; + tEnv.executeSql(sql); + Table table = tEnv.sqlQuery("SELECT * FROM state_table"); + List result = tEnv.toDataStream(table).executeAndCollect(100); + + assertThat(result.size()).isEqualTo(10); + + // Check key + List keys = + result.stream().map(r -> (Long) r.getField("k")).collect(Collectors.toList()); + List expectedKeys = LongStream.range(0L, 10L).boxed().collect(Collectors.toList()); + assertThat(keys).containsExactlyInAnyOrderElementsOf(expectedKeys); + + // Check primitive value state + Set primitiveValues = + result.stream() + .map(r -> (Long) r.getField("KeyedPrimitiveValue")) + .collect(Collectors.toSet()); + assertThat(primitiveValues.size()).isEqualTo(1); + assertThat(primitiveValues.iterator().next()).isEqualTo(1L); + + // Check pojo value state + Set pojoValues = + result.stream() + .map(r -> (Row) r.getField("KeyedPojoValue")) + .collect(Collectors.toSet()); + assertThat(pojoValues.size()).isEqualTo(1); + Row pojoData = pojoValues.iterator().next(); + assertThat(pojoData.getField("publicLong")).isEqualTo(1L); + assertThat(pojoData.getField("privateLong")).isEqualTo(1L); + + // Check list state + Set> listValues = + result.stream() + .map( + r -> + Tuple2.of( + (Long) r.getField("k"), + (Long[]) r.getField("KeyedPrimitiveValueList"))) + .flatMap(l -> Set.of(l).stream()) + .collect(Collectors.toSet()); + assertThat(listValues.size()).isEqualTo(10); + for (Tuple2 tuple2 : listValues) { + assertThat(tuple2.f0).isEqualTo(tuple2.f1[0]); + } + + // Check map state + Set>> mapValues = + result.stream() + .map( + r -> + Tuple2.of( + (Long) r.getField("k"), + (Map) + r.getField("KeyedPrimitiveValueMap"))) + .flatMap(l -> Set.of(l).stream()) + .collect(Collectors.toSet()); + assertThat(mapValues.size()).isEqualTo(10); + for (Tuple2> tuple2 : mapValues) { + assertThat(tuple2.f1.size()).isEqualTo(1); + assertThat(tuple2.f0).isEqualTo(tuple2.f1.get(tuple2.f0)); + } + } + + @Test + @SuppressWarnings("DataFlowIssue") + public void testReadKeyedStateWithNullValues() throws Exception { + Configuration config = new Configuration(); + config.set(RUNTIME_MODE, RuntimeExecutionMode.BATCH); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config); + StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); + + final String sql = + "CREATE TABLE state_table (\n" + + " k bigint,\n" + + " total ROW,\n" + + " PRIMARY KEY (k) NOT ENFORCED\n" + + ")\n" + + "with (\n" + + " 'connector' = 'savepoint',\n" + + " 'state.backend.type' = 'hashmap',\n" + + " 'state.path' = 'src/test/resources/table-state-nulls',\n" + + " 'operator.uid' = 'keyed-state-process-uid-null',\n" + + " 'fields.total.value-format' = 'com.example.state.writer.job.schema.PojoData'\n" + + ")"; + tEnv.executeSql(sql); + Table table = tEnv.sqlQuery("SELECT * FROM state_table"); + List result = tEnv.toDataStream(table).executeAndCollect(100); + assertThat(result.size()).isEqualTo(5); + + List keys = + result.stream().map(row -> (Long) row.getField("k")).collect(Collectors.toList()); + assertThat(keys).containsExactlyInAnyOrder(1L, 2L, 3L, 4L, 5L); + + // Check pojo value state + Map pojoValues = + result.stream() + .collect( + Collectors.toMap( + v -> (Long) v.getField("k"), + v -> (Row) v.getField("total"))); + assertThat(pojoValues.get(1L)).isEqualTo(Row.of(1L, 1L)); + assertThat(pojoValues.get(2L)).isEqualTo(Row.of(null, null)); + assertThat(pojoValues.get(3L)).isEqualTo(Row.of(null, null)); + assertThat(pojoValues.get(4L)).isEqualTo(Row.of(4L, 4L)); + assertThat(pojoValues.get(5L)).isEqualTo(Row.of(5L, 5L)); + } +} diff --git a/flink-libraries/flink-state-processing-api/src/test/resources/table-state-nulls/_metadata b/flink-libraries/flink-state-processing-api/src/test/resources/table-state-nulls/_metadata new file mode 100644 index 0000000000000000000000000000000000000000..4ac68dbf7bfd4bf493ee572779ae5436f2b59f97 GIT binary patch literal 24017 zcmeHPYiu0F9pAHKC-LLr*v^ATLI@!|_ik@@?`}8Y5j!zp@~~q&hVY1)y`6O~w(pL; zvqOM}G#}_orBYN7LV`-E%7?0=RaE%Y(kf~rRY z-)ERWoG5T3-Tn5r^PAcK&dz&gxBszmRZ)~C_7g7hr-_v>Ffz=35v3=SpNi2Z$W&pH z#)>6UqOpU1wnY8dfxHu&B1Lu>p57T8kNDZ~aq17zVkz&_`||k-7R?ff`_|yNdx*Nz z8R`xcs85(`n8&JIJ3$XoR}JD)3w}OBi$!%h>#Diw$;qR!$kF%SxwhEd{6eS69;%)K7n%Z%yo7uWT_dkdE`e*D3@_67d_;N9 z%TCfw@jXRWLB$I`$x&L`?~^H7+#i1+?@z=BhKJ*%NL6n#o10K4vraKi3apxg6G2TB zFZ!8yiB)ECpBD4eKC6}DCE_qqSW)rHi3RiAWHMo1nQ; zAVg6bUt`8Ka|0Wc!Mr~nBL$M#Ph-LL8Og?2T|1S}#Y%??bh?zCEXEvWUu;J{H$F`L zESbzcN&OA8>2Qt|iu>~=7HcD`3Y$v#5`jcuv}1eU-o9;P!y`Q-ePbhg2m6$!(ViW9 z`T`p|g2S=lzM=k}9sLjW4FzX=Yg%@I%#bR({OMdNJ4IuI`2+ciX;ro~D7SxZ!%<{6 zK1G7?+enEe&Z6_DUb?doSkPMVvoowo;5W+a;msEq7Y;wVK&jnQn06+!8DZw83p5i2 zUnnapDa(?woXU#Qy{l(u-|%41);^pAM(5-J@wkIL9~;eQ-CevKvFuALPk+Na=c{r> zBQrr!TD}tivxZ|=1F(>CvgotVf^#kJf>+QQSA+rOUJ)Vu4+w(`G&V%a9-o3o(7~z1 zQ10(Ei5S8w@}kLHRSZG+mBT^N-?YwUd3BuT)UYa0I%jjl`?qax z{`;p-z0Sh(D33&xb!M7SYMX{_x+zNZ6xH3NX*$%?h>>R11(S8KAHO^P3sQ%nNS_El z5mAIRTDWQ78 zcGA?c^(56&Zqi}m3005Vwx056%2KUN(pAlbtEraf+N$9uZO1lD+sH6;BTLy8S%o%_ zQQ?iO8yM^x>KPdrf?8->|M19AfA5}={()U%I|sJ)DILRuJNie)${xISVpUho+R2}j zH|MW6iSLs`vED-^I_PJovZd?{?ct}Yvh3%1&dIme1a`3;t3(p)kh{fqe-^rsR{aMz ze+i$MmT_FyOJ-DJ8jfn}3Dy|7T7o?>Jz=Gj3F0PF-{dEzZwVjH;%CAKCMpK;$gY}X zt&KYEdg{S!X}=l{Y}EaH{IX_VFJ9ntfBm@ivpatH-@m{0eCPK&R`Z%DDDXUDQFO&c zD#)`3cB@4#7z z%lF)|CjZ@Y&))Ru_8pJGG$`t)W2Cf(s0u36lyG69EFn z6of$Dba5xJC8SD#Z9uk!*a${7b^>1yr~~!_@*XsEjSip>cogs$;BmkcfF}V@1D*vu zAL610z)ryRfI46=;1Hk#=mQ=FJO+3i@C4vVz|(+d0ndln)BxBCxE@dk>;)VGbO3$8 zqkzW%j{}|nJPCLj@GRi@5StqSI|0`N>VUm~Lx2vT4|o*t7~pZh6M!cHPXnF>Jb!Vj z7Ryd+Y0D-m(QP(7OlC~hHhWZc48urz8P*S^Y1Wr$U{?*Y??Md^897#rc2t8}<#^{LuF3-mU40%PcP&dAiBbv3dl9bNST&bR#Uuk?Ongo_1n2U&pKBO#<%=YKA9@TCU;!TJ* zBfc2%C5SIYyan-A#M=;WM?8x7GQ>L&UygVu;%qchefvr`;wuneiTEnSS0jEE;#VVn z4dT}#z6SBNh+l{JI>fI>d_CefAie?d8xh}#_)Uo4jQA~x--`Hch~JL5ig*n1IN}=O z3B+~84a7~vlZdAfPa|$2ZX>=4@y&?ef%u(>--Y560EIE>nhP) zsuClmSxJKsqVb86vg4Wml8zG+PDV?4Y@DcNGL(%ITxM#5jkc3Xhd7pJTEx_tN$)ee zg2uc)PLPVUm~Lx2vT4|o*t7~pZh6M!cHPXnF> zJRjnfK2ESbFP%zSX*I!CjZ~B9y6UjWJJocP)UfT0=Xj((PGHZvK28YN48OUfE?(!( z?)UZ}-i!EF#J3^dhxk2+-;4Nti1#CYKjPaF-+}l}#CIV+fcPNdyAdBkd>HW&#P=XR ziuePFKZy8V#2-TZVZ@h1^~3h}2A{}$qh5r1YD-#`+EZkmRn z8q~B@(_$0%mX*;}ElJbqq)j!4+F`GN|992cT&BRfMYxfpYRB2JA8odN`@t_uue|eC z-_JYK?*$DGUs!~qW^{rO28{5!csrpu{+FP(@Ew8MsQHXaEzQvBaTIINLXU`z>YM5v5&D>R=`S;$i z3w`dYPvjD;`h1;gomDj+=8kNUZ~Nq~*3(?HgYC3pe)B!fo;izn^FL00De31KS)l!v zm#4n3FPXvy<%_oow7z6I5xaP)$;*EO|AV)9!f6q)gwM~5Ju?wWU4x{qQBt=^Qr9G@ zYnIe44s_*dBAyI_h$LZY6Ye159>P-*8*ZvjMS@Kc6-AXijBAz`^JTG#i!70}cd4YV zMN-!)scVzewM*)vlDcJquAFduYnC8ln-+U@tj6_r(Wm;_AV`jd3=Dt8rgq0i&YF_cb>z5py^u3@#<|U6-W2-IBT$ zlDd_Wx>b_8)sniaBz0Fy>aLO0T`Q?uBdJ>}sk=^6w@y-by`*lvr0xbu-3CeBjgq>J zlDeBDbvH}uZjsd8Dyh3oQg^$gPLh6-%-7Tft zV0pwc(~hPZR)W2HV;ULNPA6=Y&Dm-sY1mF8LwJ)fTsf!k^@A<&WAK&N4`BYf73Qy7 zVg9=H6pNg>R+ztTgZb+=n7?lGLjN{Cv&>xE8-Qm*Y==%udkpv};NcLX;2#D5DELQ( z|Er*T8}O45m#qZ^|7GC64E#I5zvG@z*O3E!0q{-04?S zBj7H;nGm}md>4f8g76o8{ovHaSwF})I$`gw=xW9!EFUH;z9dF<)g?5MF_Sc<+f-3j z^1E#2cUWIPxEyc5*4GcJUi+@Us$Kp1e)a3w)yJ1-;UK!#c1r4YN$Lh9b%T<+-IBT?N!_rdZbVYIM^ZN$=*rUo{6G+a z>_nwBR!)B5ywu#dDuH=XX09?X^?;JCWiCM0!}lDa8LT~1P$m(&#`b&m(SavfH^U|Zgi%8A;tiN!=5Yx zCna@HN$Q@K)O|}*cR0|MGaGLR1PL1jcA}{&rwUI+d`P9DX924-U=T>iD-8_KZxA03 zD%yuB#K z(KOc}Da&%$8_~iopZlGt6?0DUFGRik^2qB4Uz+JVx9`Q@$A5MBLlyzQ^bsCW*#@2z zHQ%tVx`B)Jg(#w>_-$&c=CQ5$CVTyaE%6wdny`pwP)GODj?1ihpP3ZQuh+B-jIVzo z3O-qSg}(XC9%IGvqH#&TL11VW`*38+QK^+mvKNJkuG*gAswwvQfuu>DOwzIHIq?EN Po_odT#9#bNX)635C4V{M literal 0 HcmV?d00001 diff --git a/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata b/flink-libraries/flink-state-processing-api/src/test/resources/table-state/_metadata new file mode 100644 index 0000000000000000000000000000000000000000..dc9d5acbbd2ec17b819a7c2f96ab4a258d9f8303 GIT binary patch literal 22390 zcmeHPZEPFInO;hk{2_^!CE0PB+I1Y&Y11sV++C8pc0Np5w4++K6j4zcx3#h#Lz%Wj zs!K|?lh#T7Be)N=1&q6(D9{|>1M=f=a0}y_U)S_t+!;6&Xba!fe+syPf23&926sl$ zUw7%fv-=J|UQx-ou@dCU0mx^coo8le=WBQ7ot^RL=T${fy5PrM2mE6HerCt3fO&qb zWWDlYf}L{~mljx}Qgx~8pg2Fzf@xN%mIL-wxqJqq z*#tlC9)aIRWkxr%vR&ISZNr1)#x}zh zyVp!qbR&Bvr*hNTnaSz7{7iNxH#at(pP3#XeP(8S^7!1b$+4W$pP!l-pQ(G~Al8`D zKbxI+CO4PQO^;_M#!uv?;cSNzl!N(%v*f@%CBnM{QcOUJFP2M*>iH$MTrDnC60TEW z2`31g^NG>(RW=nA7mL;63d{0Sc@`9%h2je=7=AC$e92j=oGMr0x}ES-ln3C)?`cqp zB{>JT@yS0%E}{+p;mv=DC8l|@>X?SBntC#$y1tf#64R4bCY5yjWcts!#Powv z;cR>dku)*_4MbqWS&5lUN4=#tq7j??LAz`k5hmhF^Gp`RS!2-~QL%?|k#= zAHDJQ17{&R9_z~rzd>kJ(De83Sqh-ps}(p_;`L)5{O|s<`1)_HTR(g7pLcbS-*R~` z@+YSIzBBAR^VND9mx8j#Di!5{H9Yv`vybg6|HV76JpAs_zy3GlpMJ{Q)~d^6k2f#@ zMKdb$B<%9I6tgh)=)LLkn4fWMFU?e4H`366+>B})XSJ%gm7&Yy zmv50SI3FeL{d>L5qtlbwu|wJXOcM=xB2FW^hymh7#7l^m5w9X%L%e}_8}ZJ)=~^s1qh&1HQ61ebsAkGD z;n`eZs%scVs^GzUAj?2a?SQL>)we@gpVru+JTzA=F0x=QeE%K6e!5tipIff5Kn?vW zkP6ZG)_My&xT;&KR0HNLLQLWNSb2#B4!liQ5;OdA^@GXS*5Z+68AK`Jrw-Vk@_%^}^2#3)~lu`IUMEG{X zcMyIb;X4VxpYTr*{s7^hBzza)y9xgk;d=;wknp{PKScO2;h!daAK?!Z{u#nQOZX#% ze~$1+30DbE5I#bpO!x}nX9+(?_<6!#ApAwb zze@Pm2){u1OHF*(NgBFo8ir~x(^5?fChjfE(^W0SGMSXkG?&@YTLJ$cj4To1w3jN( z6KJ<+I>}8Fm%eqt`r+H(tG@Q1KhFKbK;{?W1BV+{@PK3J$TabmcyxLT7}~hoauV?s z#4Ct55zz+KvoGR3=--3>J?P(y{=LT|UGECwYlzn)>_h)P^zTFeIEIg-e;obe!XLxO z--+Vcx&!eE!~)_s5#K<(6=8ojVghj%@dD!Oh(AI6LxkIQBOXR9A-;4#W}D zh$j(WLA-)^6Y<>$2lpYO{~-DgqW=*34;_zmLo0}{Aznv(C&KMJ5T8IS+?#1)SF^R0 z1E%)0?U<^m6-?DOJx%p2&9jp_yc}jsMcMvMxN0}Q@_M}mbt{8MxkayLHJ^1Bmbrb9 zJD3s%>rrkDt@$(0P34rX&{Pke9cx(a*JnC==y>*6EWdd~O8qr54fE`Q)` z8C^9k%@;GAc2=B*r3_-GdJ3*spPFu%g6>crea{sy6<8~==p=afF<8C=rnEcPzw`J~ zSX@0|6JCKOBz$PjU#AY-Wn47;>^fO4wk$2X3q?;PbN^i?lLTL{RMvAAva*)Tit^D^ z)EzAv@iB*&J9n+xap$q@6nL%GidS2G)?Rx97iw8qJC(@l6Z;UXKL4M3o%>o8=0vf= z7bkJosx&uDz)CCdTdO#;EqkBVOR4y!q{}n5PMyF<(~R0CQ(R~#9z@~$a$*hBXxn5e z))qEAp*xlJd?DfOzk~nbn%HPsM40gTd9h|DCaLR?)OAYgHc0BaBz4`Ax{aZ(HciB{ zK@gE8kT=l^BJLqP6|vx^;Z!VK6j4_+$fLNLyucS?6BpSencmHkx-F8r9!XuVq^?g= z7njs+4Ry7Q<4dyy5zDk-A$E)M*%a0H20aAiHC_Y{*KZ3F#U)s6X?MMR`X$r5O;R_| zd|!O9CaxY1+?`w5jBIamJ7+y-$yFtnH4oT{^OX_w=>h6=&?UdBr zFRA;4r0xMp-6th=yCikHC3T;Y)a{YfJt(Q$E2(=(Qa3EA`?REPpQP?#N!@28b)S{g zJtC?5oTTniNu4UGOGxTQBz2mkE-9(gC3Qxqt96iksjnbn*)OdBZP5>Je&6TI>bOMA zwH3wn=Zh(s-jt*+Evd^$>MTi}EveftsXHL4drVUIxTNk0N!^oDx?!u}SZ2o6RKrTb zr#GhIsdgr5t1xG)IVr<-lb*vL`NGv|3b!8|#2>&1wjbd9bq~&8_u%|>&vl3#To2A) z_u~9@FV0{079#&%KC=w2_YK5b5%%FrOJ4%{EaHU-dl_$uO?h;K)@EsmH*Jc;-U;uXZ3i0?)?un+M#;!1>r7=94L z4`TRxZa=twZ|n!2t2^-RimrO51NAUzag!L+Ro`Jr&rGqjZZk#M!SAx0-(lN+(6%3h z)u3%ZARh@+lS$itu);!XSKEHD#<#}JYg>n#Y135TM>C7vwjXqUL}s|Q{h)0>_`sdU z`q~c;N{w?AB`c{LmDC-Q)Qw5%a+12klDZ?3x~C*{Lw(0$0T*fC3TaM zx+zKB(~`PrNnKu2HzTQgMp8E$>T1&f{6G+aaH7S$RP24{rB=t)5F;P-@kGSt;FF3Pkp?;8JFln=%qW->~NU5cM=ikPs_Ta?e7 zWO|>M)H#wmS5oIm>U>EZlhhR?b@P(CQcY0dI;E90mDE-Ua#uBzF~YksCYiS)SM1pMMcO=LIGW}gPTI0u_##@kZL9Z*+P>-({|w{1 z-<|ov+3&36-Z}a0e;N7ygxLV+S35*GTAJUcrfLOPns37A zCt%_+G&N~Cn!#MXka2xT&0CP9aDKgIx-jvu9pL-!i`U*E3NmYTl@?_$o)iK_s}Lxf zqQJ0BAU=Vj-xesA90?RAVu4LNM1?66f4pgYsCy6?ngtt2rd^d;=@fh@?C7dpFnl!) z+Yh8n=6WgDYS+YY6UgS7*!P;4{h01SAS$7v{NwMGu1cU-tJZakmAmzgKof^*dxA~{ ze7B}*k@1b6xPynorV~oPU#xge;2&~I9$Q#o{t2ZIPaSq(gHhmH1uvG0l~avp^2=^z zd6AzH(d3saORVJcQ{9C!?4=3^o0m)8f&;rS`4dXKp0*j-#j~pD6L1?N(N3SZk15_p z!+xeCVrHF(IdE<%#+{--Cg^vU2|WDGTa%q#S9a}()vpuEh7xQCRfR47_}4fi3r=Z% zBwvNS6z302C_|M~4nI5$*-(MAU_6>XUsaCWb&7}cEh2*3s483P<#QoWaxyp5m=!G} sj!IWi1{CF7pv1$^jQEWua;4?PMo+kY|KZ8$XS35|bEDZqNBP130dJ?PK>z>% literal 0 HcmV?d00001