From d9f1df9577aec40ee2dc53c3fd6f1514c8d14d6d Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Wed, 4 Feb 2026 17:40:53 -0800 Subject: [PATCH 1/9] [FLINK-39029] move catalog store factory builder to new class --- .../service/context/SessionContext.java | 5 +- .../internal/StreamTableEnvironmentImpl.java | 27 ++-- .../flink/table/api/EnvironmentSettings.java | 5 +- .../api/internal/TableEnvironmentImpl.java | 27 ++-- .../flink/table/factories/ApiFactoryUtil.java | 77 ++++++++++ .../table/factories/TableFactoryUtil.java | 50 ------- .../table/factories/ApiFactoryUtilTest.java | 136 ++++++++++++++++++ .../internal/StreamTableEnvironmentImpl.scala | 23 +-- 8 files changed, 264 insertions(+), 86 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java diff --git a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java index 12bfc2d8e5a87..b5463d44267fe 100644 --- a/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java +++ b/flink-table/flink-sql-gateway/src/main/java/org/apache/flink/table/gateway/service/context/SessionContext.java @@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogStoreHolder; import org.apache.flink.table.catalog.FunctionCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalog; +import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; @@ -390,9 +391,9 @@ private static CatalogManager buildCatalogManager( SessionEnvironment environment) { CatalogStoreFactory catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader); + ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader); CatalogStoreFactory.Context catalogStoreFactoryContext = - TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader); + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader); catalogStoreFactory.open(catalogStoreFactoryContext); CatalogStoreHolder catalogStore = CatalogStoreHolder.newBuilder() diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index 2bba1d76d696c..1ba1f34e6d856 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -46,6 +46,7 @@ import org.apache.flink.table.delegation.Executor; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.expressions.Expression; +import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.PlannerFactoryUtil; import org.apache.flink.table.factories.TableFactoryUtil; @@ -112,17 +113,21 @@ public static StreamTableEnvironment create( new ResourceManager(settings.getConfiguration(), userClassLoader); final ModuleManager moduleManager = new ModuleManager(); - final CatalogStoreFactory catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory( - settings.getConfiguration(), userClassLoader); - final CatalogStoreFactory.Context catalogStoreFactoryContext = - TableFactoryUtil.buildCatalogStoreFactoryContext( - settings.getConfiguration(), userClassLoader); - catalogStoreFactory.open(catalogStoreFactoryContext); - final CatalogStore catalogStore = - settings.getCatalogStore() != null - ? settings.getCatalogStore() - : catalogStoreFactory.createCatalogStore(); + final CatalogStore catalogStore; + final CatalogStoreFactory catalogStoreFactory; + if (settings.getCatalogStore().isPresent()) { + catalogStore = settings.getCatalogStore().get(); + catalogStoreFactory = null; + } else { + catalogStoreFactory = + ApiFactoryUtil.findAndCreateCatalogStoreFactory( + settings.getConfiguration(), userClassLoader); + final CatalogStoreFactory.Context catalogStoreFactoryContext = + ApiFactoryUtil.buildCatalogStoreFactoryContext( + settings.getConfiguration(), userClassLoader); + catalogStoreFactory.open(catalogStoreFactoryContext); + catalogStore = catalogStoreFactory.createCatalogStore(); + } final CatalogManager catalogManager = CatalogManager.newBuilder() diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index 67fcfcec46309..e08704a9b99b0 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -144,9 +144,8 @@ public ClassLoader getUserClassLoader() { } @Internal - @Nullable - public CatalogStore getCatalogStore() { - return catalogStore; + public Optional getCatalogStore() { + return Optional.ofNullable(catalogStore); } @Internal diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 6a40fa9c2384d..4a24c5ae963de 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -83,6 +83,7 @@ import org.apache.flink.table.expressions.ModelReferenceExpression; import org.apache.flink.table.expressions.TableReferenceExpression; import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor; +import org.apache.flink.table.factories.ApiFactoryUtil; import org.apache.flink.table.factories.CatalogStoreFactory; import org.apache.flink.table.factories.FactoryUtil; import org.apache.flink.table.factories.PlannerFactoryUtil; @@ -251,17 +252,21 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { userClassLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER); final Executor executor = executorFactory.create(settings.getConfiguration()); - final CatalogStoreFactory catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory( - settings.getConfiguration(), userClassLoader); - final CatalogStoreFactory.Context context = - TableFactoryUtil.buildCatalogStoreFactoryContext( - settings.getConfiguration(), userClassLoader); - catalogStoreFactory.open(context); - final CatalogStore catalogStore = - settings.getCatalogStore() != null - ? settings.getCatalogStore() - : catalogStoreFactory.createCatalogStore(); + final CatalogStore catalogStore; + final CatalogStoreFactory catalogStoreFactory; + if (settings.getCatalogStore().isPresent()) { + catalogStore = settings.getCatalogStore().get(); + catalogStoreFactory = null; + } else { + catalogStoreFactory = + ApiFactoryUtil.findAndCreateCatalogStoreFactory( + settings.getConfiguration(), userClassLoader); + final CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext( + settings.getConfiguration(), userClassLoader); + catalogStoreFactory.open(context); + catalogStore = catalogStoreFactory.createCatalogStore(); + } // use configuration to init table config final TableConfig tableConfig = TableConfig.getDefault(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java new file mode 100644 index 0000000000000..2f4b36b2bd612 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.DelegatingConfiguration; +import org.apache.flink.table.catalog.CommonCatalogOptions; + +import java.util.Map; + +/** Utility for dealing with catalog store factories. */ +@Internal +public class ApiFactoryUtil { + + /** + * Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and + * user classloader. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static CatalogStoreFactory findAndCreateCatalogStoreFactory( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); + + CatalogStoreFactory catalogStoreFactory = + FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier); + + return catalogStoreFactory; + } + + /** + * Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.catalog-store.kind: {identifier}
+     * table.catalog-store.{identifier}.{param1}: xxx
+     * table.catalog-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); + String catalogStoreOptionPrefix = + CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + "."; + Map options = + new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap(); + CatalogStoreFactory.Context context = + new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader); + + return context; + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index ad681b5439441..ba10e4de707b7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -19,14 +19,11 @@ package org.apache.flink.table.factories; import org.apache.flink.annotation.Internal; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.TableException; import org.apache.flink.table.api.config.TableConfigOptions; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.catalog.ResolvedCatalogTable; import org.apache.flink.table.catalog.listener.CatalogModificationListener; @@ -42,7 +39,6 @@ import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; /** Utility for dealing with {@link TableFactory} using the {@link TableFactoryService}. */ @@ -174,50 +170,4 @@ public ClassLoader getUserClassLoader() { })) .collect(Collectors.toList()); } - - /** - * Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and - * user classloader. - * - *

The configuration format should be as follows: - * - *

{@code
-     * table.catalog-store.kind: {identifier}
-     * table.catalog-store.{identifier}.{param1}: xxx
-     * table.catalog-store.{identifier}.{param2}: xxx
-     * }
- */ - public static CatalogStoreFactory findAndCreateCatalogStoreFactory( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); - - CatalogStoreFactory catalogStoreFactory = - FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier); - - return catalogStoreFactory; - } - - /** - * Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}. - * - *

The configuration format should be as follows: - * - *

{@code
-     * table.catalog-store.kind: {identifier}
-     * table.catalog-store.{identifier}.{param1}: xxx
-     * table.catalog-store.{identifier}.{param2}: xxx
-     * }
- */ - public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND); - String catalogStoreOptionPrefix = - CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + "."; - Map options = - new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap(); - CatalogStoreFactory.Context context = - new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader); - - return context; - } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java new file mode 100644 index 0000000000000..c8e1908291b85 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.FileCatalogStoreFactory; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.File; +import java.util.Map; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ApiFactoryUtil}. */ +class ApiFactoryUtilTest { + + @ParameterizedTest(name = "kind={0}, expectedFactory={1}") + @MethodSource("catalogStoreFactoryTestParameters") + void testFindAndCreateCatalogStoreFactory(String kind, Class expectedFactoryClass) { + Configuration configuration = new Configuration(); + if (kind != null) { + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, kind); + } + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory factory = + ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(expectedFactoryClass); + } + + @Test + void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + configuration.setString("table.catalog-store.file.path", tempFolder.getAbsolutePath()); + configuration.setString("table.catalog-store.file.option1", "value1"); + configuration.setString("table.catalog-store.file.option2", "value2"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of( + "path", tempFolder.getAbsolutePath(), + "option1", "value1", + "option2", "value2")); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextWithGenericInMemory() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + configuration.setString("table.catalog-store.generic_in_memory.option1", "value1"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf(Map.of("option1", "value1")); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextWithoutOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).isEmpty(); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + configuration.setString("table.catalog-store.file.path", "/test/path"); + configuration.setString("table.catalog-store.file.option1", "value1"); + configuration.setString("table.catalog-store.other.irrelevant", "should-not-appear"); + configuration.setString("other.config.key", "should-not-appear"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("path", "/test/path", "option1", "value1")); + } + + private static Stream catalogStoreFactoryTestParameters() { + return Stream.of( + Arguments.of("generic_in_memory", GenericInMemoryCatalogStoreFactory.class), + Arguments.of("file", FileCatalogStoreFactory.class), + Arguments.of(null, GenericInMemoryCatalogStoreFactory.class)); + } +} diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 7e86c5bf256cd..c8cc4e99463d2 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -28,7 +28,7 @@ import org.apache.flink.table.catalog._ import org.apache.flink.table.connector.ChangelogMode import org.apache.flink.table.delegation.{Executor, Planner} import org.apache.flink.table.expressions.Expression -import org.apache.flink.table.factories.{PlannerFactoryUtil, TableFactoryUtil} +import org.apache.flink.table.factories.{ApiFactoryUtil, PlannerFactoryUtil, TableFactoryUtil} import org.apache.flink.table.functions.{AggregateFunction, TableAggregateFunction, TableFunction, UserDefinedFunctionHelper} import org.apache.flink.table.legacy.sources.TableSource import org.apache.flink.table.module.ModuleManager @@ -250,14 +250,19 @@ object StreamTableEnvironmentImpl { val resourceManager = new ResourceManager(settings.getConfiguration, userClassLoader) val moduleManager = new ModuleManager - val catalogStoreFactory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(settings.getConfiguration, userClassLoader) - val catalogStoreFactoryContext = - TableFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration, userClassLoader) - catalogStoreFactory.open(catalogStoreFactoryContext) - val catalogStore = - if (settings.getCatalogStore != null) settings.getCatalogStore - else catalogStoreFactory.createCatalogStore() + val (catalogStore, catalogStoreFactory) = + if (settings.getCatalogStore.isPresent) { + (settings.getCatalogStore.get(), null) + } else { + val factory = + ApiFactoryUtil.findAndCreateCatalogStoreFactory( + settings.getConfiguration, + userClassLoader) + val factoryContext = + ApiFactoryUtil.buildCatalogStoreFactoryContext(settings.getConfiguration, userClassLoader) + factory.open(factoryContext) + (factory.createCatalogStore(), factory) + } val catalogManager = CatalogManager.newBuilder .classLoader(userClassLoader) From 22e04c5f8f4c651bc5d794f942810cac439e8f27 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Thu, 8 Jan 2026 15:07:22 -0800 Subject: [PATCH 2/9] [FLINK-38263][table] add secret store related interfaces --- .../flink/table/api/EnvironmentSettings.java | 31 ++++++++- .../api/internal/TableEnvironmentImpl.java | 15 +++++ .../table/factories/TableFactoryUtil.java | 36 +++++++++++ .../flink/table/factories/FactoryUtil.java | 34 ++++++++++ .../table/secret/CommonSecretOptions.java | 41 ++++++++++++ .../table/secret/ReadableSecretStore.java | 45 +++++++++++++ .../flink/table/secret/SecretStore.java | 33 ++++++++++ .../table/secret/SecretStoreFactory.java | 60 +++++++++++++++++ .../table/secret/WritableSecretStore.java | 64 +++++++++++++++++++ .../secret/exceptions/SecretException.java | 61 ++++++++++++++++++ .../exceptions/SecretNotFoundException.java | 53 +++++++++++++++ 11 files changed, 471 insertions(+), 2 deletions(-) create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java create mode 100644 flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index e08704a9b99b0..bb6e588d92282 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -25,6 +25,7 @@ import org.apache.flink.table.catalog.CatalogStore; import org.apache.flink.table.expressions.SqlFactory; import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.secret.SecretStore; import javax.annotation.Nullable; @@ -66,16 +67,19 @@ public class EnvironmentSettings { private final @Nullable CatalogStore catalogStore; private final @Nullable SqlFactory sqlFactory; + private final @Nullable SecretStore secretStore; private EnvironmentSettings( Configuration configuration, ClassLoader classLoader, CatalogStore catalogStore, - SqlFactory sqlFactory) { + SqlFactory sqlFactory, + SecretStore secretStore) { this.configuration = configuration; this.classLoader = classLoader; this.catalogStore = catalogStore; this.sqlFactory = sqlFactory; + this.secretStore = secretStore; } /** @@ -153,6 +157,12 @@ public Optional getSqlFactory() { return Optional.ofNullable(sqlFactory); } + @Internal + @Nullable + public SecretStore getSecretStore() { + return secretStore; + } + /** A builder for {@link EnvironmentSettings}. */ @PublicEvolving public static class Builder { @@ -162,6 +172,7 @@ public static class Builder { private @Nullable CatalogStore catalogStore; private @Nullable SqlFactory sqlFactory; + private @Nullable SecretStore secretStore; public Builder() {} @@ -250,12 +261,28 @@ public Builder withSqlFactory(SqlFactory sqlFactory) { return this; } + /** + * Specifies the {@link SecretStore} to be used for managing secrets in the {@link + * TableEnvironment}. + * + *

The secret store allows for secure storage and retrieval of sensitive configuration + * data such as credentials, tokens, and passwords. + * + * @param secretStore the secret store instance to use + * @return this builder + */ + public Builder withSecretStore(SecretStore secretStore) { + this.secretStore = secretStore; + return this; + } + /** Returns an immutable instance of {@link EnvironmentSettings}. */ public EnvironmentSettings build() { if (classLoader == null) { classLoader = Thread.currentThread().getContextClassLoader(); } - return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory); + return new EnvironmentSettings( + configuration, classLoader, catalogStore, sqlFactory, secretStore); } } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 4a24c5ae963de..8d031e2a74a1d 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -120,6 +120,8 @@ import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.resource.ResourceType; import org.apache.flink.table.resource.ResourceUri; +import org.apache.flink.table.secret.SecretStore; +import org.apache.flink.table.secret.SecretStoreFactory; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.DataTypeUtils; @@ -268,6 +270,19 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { catalogStore = catalogStoreFactory.createCatalogStore(); } + final SecretStoreFactory secretStoreFactory = + TableFactoryUtil.findAndCreateSecretStoreFactory( + settings.getConfiguration(), userClassLoader); + final SecretStoreFactory.Context secretStoreContext = + TableFactoryUtil.buildSecretStoreFactoryContext( + settings.getConfiguration(), userClassLoader); + secretStoreFactory.open(secretStoreContext); + // TODO: pass secret store to catalog manager for encryption/decryption + final SecretStore secretStore = + settings.getSecretStore() != null + ? settings.getSecretStore() + : secretStoreFactory.createSecretStore(); + // use configuration to init table config final TableConfig tableConfig = TableConfig.getDefault(); tableConfig.setRootConfiguration(executor.getConfiguration()); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index ba10e4de707b7..da425bf4daaad 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -36,6 +36,8 @@ import org.apache.flink.table.legacy.factories.TableSourceFactory; import org.apache.flink.table.legacy.sinks.TableSink; import org.apache.flink.table.legacy.sources.TableSource; +import org.apache.flink.table.secret.CommonSecretOptions; +import org.apache.flink.table.secret.SecretStoreFactory; import java.util.Collections; import java.util.List; @@ -170,4 +172,38 @@ public ClassLoader getUserClassLoader() { })) .collect(Collectors.toList()); } + + public static SecretStoreFactory findAndCreateSecretStoreFactory( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); + + SecretStoreFactory secretStoreFactory = + FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier); + + return secretStoreFactory; + } + + /** + * Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.secret-store.kind: {identifier}
+     * table.secret-store.{identifier}.{param1}: xxx
+     * table.secret-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static SecretStoreFactory.Context buildSecretStoreFactoryContext( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); + String secretStoreOptionPrefix = + CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + "."; + Map options = + new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap(); + SecretStoreFactory.Context context = + new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader); + + return context; + } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index 691397c08d4bd..dc0c942cdf20e 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -42,6 +42,7 @@ import org.apache.flink.table.legacy.factories.TableFactory; import org.apache.flink.table.ml.ModelProvider; import org.apache.flink.table.module.Module; +import org.apache.flink.table.secret.SecretStoreFactory; import org.apache.flink.table.utils.EncodingUtils; import org.apache.flink.table.watermark.WatermarkEmitStrategy; import org.apache.flink.util.Preconditions; @@ -1411,6 +1412,39 @@ public ClassLoader getClassLoader() { } } + public static class DefaultSecretStoreContext implements SecretStoreFactory.Context { + + private Map options; + + private ReadableConfig configuration; + + private ClassLoader classLoader; + + public DefaultSecretStoreContext( + Map options, + ReadableConfig configuration, + ClassLoader classLoader) { + this.options = options; + this.configuration = configuration; + this.classLoader = classLoader; + } + + @Override + public Map getOptions() { + return options; + } + + @Override + public ReadableConfig getConfiguration() { + return configuration; + } + + @Override + public ClassLoader getClassLoader() { + return classLoader; + } + } + /** Default implementation of {@link ModuleFactory.Context}. */ @Internal public static class DefaultModuleContext implements ModuleFactory.Context { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java new file mode 100644 index 0000000000000..bbff5921c76aa --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +/** A collection of {@link ConfigOption} which are used for secret store configuration. */ +@Internal +public class CommonSecretOptions { + + public static final String DEFAULT_SECRET_STORE_KIND = "default_in_memory"; + public static final ConfigOption TABLE_SECRET_STORE_KIND = + ConfigOptions.key("table.secret-store.kind") + .stringType() + .defaultValue(DEFAULT_SECRET_STORE_KIND) + .withDescription( + "The kind of secret store to be used. Out of the box, 'default_in_memory' option is supported. " + + "Implementations can provide custom secret stores for different backends " + + "(e.g., cloud-specific secret managers)."); + + /** Used to filter the specific options for secret store. */ + public static final String TABLE_SECRET_STORE_OPTION_PREFIX = "table.secret-store."; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java new file mode 100644 index 0000000000000..70dd48e21f979 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/ReadableSecretStore.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Map; + +/** + * Interface for retrieving secrets from a secret store. + * + *

This interface enables read-only access to stored secrets, allowing applications to retrieve + * sensitive configuration data such as credentials, API tokens, and passwords. + * + *

Implementations of this interface should ensure secure retrieval and handling of secret data. + */ +@PublicEvolving +public interface ReadableSecretStore extends SecretStore { + + /** + * Retrieves a secret from the store by its identifier. + * + * @param secretId the unique identifier of the secret to retrieve + * @return a map containing the secret data as key-value pairs + * @throws SecretNotFoundException if the secret with the given identifier does not exist + */ + Map getSecret(String secretId) throws SecretNotFoundException; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java new file mode 100644 index 0000000000000..89af1d469e1b8 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStore.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base marker interface for secret store implementations. + * + *

This interface serves as the common base for both {@link ReadableSecretStore} and {@link + * WritableSecretStore}, allowing for flexible secret management implementations. + * + *

Secret stores are used to manage sensitive configuration data (credentials, tokens, passwords, + * etc.) in Flink SQL and Table API applications. + */ +@PublicEvolving +public interface SecretStore {} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java new file mode 100644 index 0000000000000..ceb4f17c93bea --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.factories.Factory; +import org.apache.flink.table.secret.exceptions.SecretException; + +import java.util.Map; + +/** Factory for creating SecretStore instances. */ +@PublicEvolving +public interface SecretStoreFactory extends Factory { + + /** Creates a SecretStore instance. */ + SecretStore createSecretStore(); + + /** Initialize secret store. */ + void open(Context context) throws SecretException; + + /** Close secret store. */ + void close() throws CatalogException; + + interface Context { + /** + * Returns the options with which the secret store is created. + * + *

An implementation should perform validation of these options. + */ + Map getOptions(); + + /** Gives read-only access to the configuration of the current session. */ + ReadableConfig getConfiguration(); + + /** + * Returns the class loader of the current session. + * + *

The class loader is in particular useful for discovering further (nested) factories. + */ + ClassLoader getClassLoader(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java new file mode 100644 index 0000000000000..db5037b7b2f53 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/WritableSecretStore.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import java.util.Map; + +/** + * Interface for storing, updating, and removing secrets in a secret store. + * + *

This interface provides write operations for managing secrets, including adding new secrets, + * updating existing ones, and removing secrets that are no longer needed. + * + *

Implementations should ensure that secret operations are performed securely and, where + * applicable, atomically. + */ +@PublicEvolving +public interface WritableSecretStore extends SecretStore { + + /** + * Stores a new secret in the secret store. + * + * @param secretData a map containing the secret data as key-value pairs to be stored + * @return a unique identifier for the stored secret + */ + String storeSecret(Map secretData); + + /** + * Removes a secret from the secret store. + * + * @param secretId the unique identifier of the secret to remove + */ + void removeSecret(String secretId); + + /** + * Atomically updates an existing secret with new data. + * + *

This operation replaces the entire secret data with the provided new data. + * + * @param secretId the unique identifier of the secret to update + * @param newSecretData a map containing the new secret data as key-value pairs + * @throws SecretNotFoundException if the secret with the given identifier does not exist + */ + void updateSecret(String secretId, Map newSecretData) + throws SecretNotFoundException; +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java new file mode 100644 index 0000000000000..8304e41ac963c --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret.exceptions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Base exception for all secret-related errors in the secret store. + * + *

This exception serves as the parent class for all secret store related exceptions, providing a + * common type for handling errors that occur during secret management operations. + */ +@PublicEvolving +public class SecretException extends RuntimeException { + + private static final long serialVersionUID = 1L; + + /** + * Constructs a new SecretException with the specified detail message. + * + * @param message the detail message explaining the reason for the exception + */ + public SecretException(String message) { + super(message); + } + + /** + * Constructs a new SecretException with the specified detail message and cause. + * + * @param message the detail message explaining the reason for the exception + * @param cause the cause of the exception + */ + public SecretException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs a new SecretException with the specified cause. + * + * @param cause the cause of the exception + */ + public SecretException(Throwable cause) { + super(cause); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java new file mode 100644 index 0000000000000..a773ea7e1e685 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretNotFoundException.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret.exceptions; + +import org.apache.flink.annotation.PublicEvolving; + +/** + * Exception thrown when a requested secret cannot be found in the secret store. + * + *

This exception is typically thrown by {@link ReadableSecretStore#getSecret(String)} or {@link + * WritableSecretStore#updateSecret(String, java.util.Map)} when attempting to access or modify a + * secret that does not exist. + */ +@PublicEvolving +public class SecretNotFoundException extends Exception { + + private static final long serialVersionUID = 1L; + + /** + * Constructs a new SecretNotFoundException with the specified detail message. + * + * @param message the detail message explaining the reason for the exception + */ + public SecretNotFoundException(String message) { + super(message); + } + + /** + * Constructs a new SecretNotFoundException with the specified detail message and cause. + * + * @param message the detail message explaining the reason for the exception + * @param cause the cause of the exception + */ + public SecretNotFoundException(String message, Throwable cause) { + super(message, cause); + } +} From 2d15162d5a2cda082e304f392e35f6398895a205 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Fri, 16 Jan 2026 09:39:11 -0800 Subject: [PATCH 3/9] test --- .../secret/GenericInMemorySecretStore.java | 116 ++++++++++ .../GenericInMemorySecretStoreFactory.java | 77 +++++++ .../org.apache.flink.table.factories.Factory | 1 + ...GenericInMemorySecretStoreFactoryTest.java | 93 ++++++++ .../GenericInMemorySecretStoreTest.java | 202 ++++++++++++++++++ .../table/secret/CommonSecretOptions.java | 2 +- 6 files changed, 490 insertions(+), 1 deletion(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java new file mode 100644 index 0000000000000..baaa1b202abcd --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * A generic in-memory implementation of both {@link ReadableSecretStore} and {@link + * WritableSecretStore}. + * + *

This implementation stores secrets in memory as plaintext JSON strings. It is suitable for + * testing and development purposes but should not be used in production environments as secrets are + * not encrypted. + * + *

All operations are thread-safe using concurrent data structures. + */ +@Internal +public class GenericInMemorySecretStore implements ReadableSecretStore, WritableSecretStore { + + private final Map secrets; + private final ObjectMapper objectMapper; + + public GenericInMemorySecretStore() { + this.secrets = new ConcurrentHashMap<>(); + this.objectMapper = new ObjectMapper(); + } + + @Override + public Map getSecret(String secretId) throws SecretNotFoundException { + checkNotNull(secretId, "Secret ID cannot be null"); + + String secretJson = secrets.get(secretId); + if (secretJson == null) { + throw new SecretNotFoundException( + String.format("Secret with ID '%s' not found", secretId)); + } + + try { + return objectMapper.readValue(secretJson, new TypeReference<>() {}); + } catch (JsonProcessingException e) { + throw new SecretNotFoundException( + String.format("Failed to deserialize secret with ID '%s'", secretId), e); + } + } + + @Override + public String storeSecret(Map secretData) { + checkNotNull(secretData, "Secret data cannot be null"); + + String secretId = UUID.randomUUID().toString(); + try { + String secretJson = objectMapper.writeValueAsString(secretData); + secrets.put(secretId, secretJson); + return secretId; + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize secret data", e); + } + } + + @Override + public void removeSecret(String secretId) { + checkNotNull(secretId, "Secret ID cannot be null"); + secrets.remove(secretId); + } + + @Override + public void updateSecret(String secretId, Map newSecretData) + throws SecretNotFoundException { + checkNotNull(secretId, "Secret ID cannot be null"); + checkNotNull(newSecretData, "New secret data cannot be null"); + + if (!secrets.containsKey(secretId)) { + throw new SecretNotFoundException( + String.format("Secret with ID '%s' not found", secretId)); + } + + try { + String secretJson = objectMapper.writeValueAsString(newSecretData); + secrets.put(secretId, secretJson); + } catch (JsonProcessingException e) { + throw new RuntimeException("Failed to serialize secret data", e); + } + } + + /** Clears all secrets from the store (for testing purposes). */ + void clear() { + secrets.clear(); + } +} diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java new file mode 100644 index 0000000000000..9de1dbfeb034f --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.catalog.exceptions.CatalogException; +import org.apache.flink.table.secret.exceptions.SecretException; + +import java.util.Collections; +import java.util.Set; + +/** + * Factory for creating {@link GenericInMemorySecretStore} instances. + * + *

This factory creates in-memory secret stores that are suitable for testing and development + * purposes. Secrets are stored in plaintext JSON format in memory and are not persisted. + */ +@Internal +public class GenericInMemorySecretStoreFactory implements SecretStoreFactory { + + private GenericInMemorySecretStore secretStore; + public static final String IDENTIFIER = "generic_in_memory"; + + @Override + public String factoryIdentifier() { + return IDENTIFIER; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } + + @Override + public SecretStore createSecretStore() { + if (secretStore == null) { + throw new IllegalStateException( + "SecretStoreFactory must be opened before creating a SecretStore"); + } + return secretStore; + } + + @Override + public void open(Context context) throws SecretException { + this.secretStore = new GenericInMemorySecretStore(); + } + + @Override + public void close() throws CatalogException { + if (secretStore != null) { + secretStore.clear(); + secretStore = null; + } + } +} diff --git a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index d86ead55ad642..8191a71a678a2 100644 --- a/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-table/flink-table-api-java/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -16,3 +16,4 @@ org.apache.flink.table.catalog.GenericInMemoryCatalogFactory org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory org.apache.flink.table.catalog.FileCatalogStoreFactory +org.apache.flink.table.secret.GenericInMemorySecretStoreFactory diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java new file mode 100644 index 0000000000000..bf3cb5db0d187 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.table.factories.FactoryUtil.DefaultSecretStoreContext; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** Test for {@link GenericInMemorySecretStoreFactory}. */ +public class GenericInMemorySecretStoreFactoryTest { + + @Test + void testSecretStoreInit() { + String factoryIdentifier = GenericInMemorySecretStoreFactory.IDENTIFIER; + Map options = new HashMap<>(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + final DefaultSecretStoreContext discoveryContext = + new DefaultSecretStoreContext(options, null, classLoader); + final SecretStoreFactory factory = + FactoryUtil.discoverFactory( + classLoader, SecretStoreFactory.class, factoryIdentifier); + factory.open(discoveryContext); + + SecretStore secretStore = factory.createSecretStore(); + assertThat(secretStore instanceof GenericInMemorySecretStore).isTrue(); + + factory.close(); + } + + @Test + void testCreateSecretStoreBeforeOpen() { + GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); + + assertThatThrownBy(factory::createSecretStore) + .isInstanceOf(IllegalStateException.class) + .hasMessageContaining( + "SecretStoreFactory must be opened before creating a SecretStore"); + } + + @Test + void testFactoryIdentifier() { + GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); + assertThat(factory.factoryIdentifier()).isEqualTo("generic_in_memory"); + } + + @Test + void testRequiredAndOptionalOptions() { + GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); + assertThat(factory.requiredOptions().isEmpty()).isTrue(); + assertThat(factory.optionalOptions().isEmpty()).isTrue(); + } + + @Test + void testOpenAndClose() { + GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); + Map options = new HashMap<>(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + DefaultSecretStoreContext context = + new DefaultSecretStoreContext(options, null, classLoader); + + factory.open(context); + SecretStore secretStore = factory.createSecretStore(); + assertThat(secretStore).isNotNull(); + + factory.close(); + + // After close, createSecretStore should fail + assertThatThrownBy(factory::createSecretStore).isInstanceOf(IllegalStateException.class); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java new file mode 100644 index 0000000000000..e25cb199f8294 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.secret; + +import org.apache.flink.table.secret.exceptions.SecretNotFoundException; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +/** Test for {@link GenericInMemorySecretStore}. */ +public class GenericInMemorySecretStoreTest { + + private GenericInMemorySecretStore secretStore; + + @BeforeEach + void setUp() { + secretStore = new GenericInMemorySecretStore(); + } + + @Test + void testStoreAndGetSecret() throws SecretNotFoundException { + Map secretData = new HashMap<>(); + secretData.put("username", "testuser"); + secretData.put("password", "testpass"); + + String secretId = secretStore.storeSecret(secretData); + assertThat(secretId).isNotNull(); + + Map retrievedSecret = secretStore.getSecret(secretId); + assertThat(retrievedSecret).isNotNull(); + assertThat(retrievedSecret.get("username")).isEqualTo("testuser"); + assertThat(retrievedSecret.get("password")).isEqualTo("testpass"); + } + + @Test + void testGetNonExistentSecret() { + assertThatThrownBy(() -> secretStore.getSecret("non-existent-id")) + .isInstanceOf(SecretNotFoundException.class) + .hasMessageContaining("Secret with ID 'non-existent-id' not found"); + } + + @Test + void testGetSecretWithNullId() { + assertThatThrownBy(() -> secretStore.getSecret(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Secret ID cannot be null"); + } + + @Test + void testStoreSecretWithNullData() { + assertThatThrownBy(() -> secretStore.storeSecret(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Secret data cannot be null"); + } + + @Test + void testRemoveSecret() throws SecretNotFoundException { + Map secretData = new HashMap<>(); + secretData.put("key", "value"); + + String secretId = secretStore.storeSecret(secretData); + assertThat(secretStore.getSecret(secretId)).isNotNull(); + + secretStore.removeSecret(secretId); + assertThatThrownBy(() -> secretStore.getSecret(secretId)) + .isInstanceOf(SecretNotFoundException.class) + .hasMessageContaining("Secret with ID '" + secretId + "' not found"); + } + + @Test + void testRemoveSecretWithNullId() { + assertThatThrownBy(() -> secretStore.removeSecret(null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Secret ID cannot be null"); + } + + @Test + void testRemoveNonExistentSecret() { + // Should not throw exception, just silently remove nothing + secretStore.removeSecret("non-existent-id"); + } + + @Test + void testUpdateSecret() throws SecretNotFoundException { + Map originalData = new HashMap<>(); + originalData.put("username", "olduser"); + originalData.put("password", "oldpass"); + + String secretId = secretStore.storeSecret(originalData); + + Map updatedData = new HashMap<>(); + updatedData.put("username", "newuser"); + updatedData.put("password", "newpass"); + + secretStore.updateSecret(secretId, updatedData); + + Map retrievedSecret = secretStore.getSecret(secretId); + assertThat(retrievedSecret.get("username")).isEqualTo("newuser"); + assertThat(retrievedSecret.get("password")).isEqualTo("newpass"); + } + + @Test + void testUpdateNonExistentSecret() { + Map secretData = new HashMap<>(); + secretData.put("key", "value"); + + assertThatThrownBy(() -> secretStore.updateSecret("non-existent-id", secretData)) + .isInstanceOf(SecretNotFoundException.class) + .hasMessageContaining("Secret with ID 'non-existent-id' not found"); + } + + @Test + void testUpdateSecretWithNullId() { + Map secretData = new HashMap<>(); + secretData.put("key", "value"); + + assertThatThrownBy(() -> secretStore.updateSecret(null, secretData)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("Secret ID cannot be null"); + } + + @Test + void testUpdateSecretWithNullData() { + Map originalData = new HashMap<>(); + originalData.put("key", "value"); + String secretId = secretStore.storeSecret(originalData); + + assertThatThrownBy(() -> secretStore.updateSecret(secretId, null)) + .isInstanceOf(NullPointerException.class) + .hasMessageContaining("New secret data cannot be null"); + } + + @Test + void testClear() { + Map secretData1 = new HashMap<>(); + secretData1.put("key1", "value1"); + String secretId1 = secretStore.storeSecret(secretData1); + + Map secretData2 = new HashMap<>(); + secretData2.put("key2", "value2"); + String secretId2 = secretStore.storeSecret(secretData2); + + secretStore.clear(); + + assertThatThrownBy(() -> secretStore.getSecret(secretId1)) + .isInstanceOf(SecretNotFoundException.class); + assertThatThrownBy(() -> secretStore.getSecret(secretId2)) + .isInstanceOf(SecretNotFoundException.class); + } + + @Test + void testStoreEmptySecret() throws SecretNotFoundException { + Map emptyData = new HashMap<>(); + String secretId = secretStore.storeSecret(emptyData); + + Map retrievedSecret = secretStore.getSecret(secretId); + assertThat(retrievedSecret).isNotNull(); + assertThat(retrievedSecret.isEmpty()).isTrue(); + } + + @Test + void testStoreMultipleSecrets() throws SecretNotFoundException { + Map secret1 = new HashMap<>(); + secret1.put("user1", "pass1"); + + Map secret2 = new HashMap<>(); + secret2.put("user2", "pass2"); + + String secretId1 = secretStore.storeSecret(secret1); + String secretId2 = secretStore.storeSecret(secret2); + + assertThat(secretId1).isNotEqualTo(secretId2); + + Map retrieved1 = secretStore.getSecret(secretId1); + Map retrieved2 = secretStore.getSecret(secretId2); + + assertThat(retrieved1.get("user1")).isEqualTo("pass1"); + assertThat(retrieved2.get("user2")).isEqualTo("pass2"); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java index bbff5921c76aa..36c087ec92dfb 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java @@ -26,7 +26,7 @@ @Internal public class CommonSecretOptions { - public static final String DEFAULT_SECRET_STORE_KIND = "default_in_memory"; + public static final String DEFAULT_SECRET_STORE_KIND = "generic_in_memory"; public static final ConfigOption TABLE_SECRET_STORE_KIND = ConfigOptions.key("table.secret-store.kind") .stringType() From 36f7d240f82f40756b80c4d151150f40e12a80aa Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:05:54 -0800 Subject: [PATCH 4/9] fix --- .../secret/GenericInMemorySecretStore.java | 13 +- .../GenericInMemorySecretStoreFactory.java | 18 +- .../table/factories/TableFactoryUtilTest.java | 222 ++++++++++++++++++ ...GenericInMemorySecretStoreFactoryTest.java | 29 --- .../table/secret/CommonSecretOptions.java | 2 +- 5 files changed, 232 insertions(+), 52 deletions(-) create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java index baaa1b202abcd..154ead1ead305 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java @@ -19,15 +19,16 @@ package org.apache.flink.table.secret; import org.apache.flink.annotation.Internal; +import org.apache.flink.table.secret.exceptions.SecretException; import org.apache.flink.table.secret.exceptions.SecretNotFoundException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -38,8 +39,6 @@ *

This implementation stores secrets in memory as plaintext JSON strings. It is suitable for * testing and development purposes but should not be used in production environments as secrets are * not encrypted. - * - *

All operations are thread-safe using concurrent data structures. */ @Internal public class GenericInMemorySecretStore implements ReadableSecretStore, WritableSecretStore { @@ -48,7 +47,7 @@ public class GenericInMemorySecretStore implements ReadableSecretStore, Writable private final ObjectMapper objectMapper; public GenericInMemorySecretStore() { - this.secrets = new ConcurrentHashMap<>(); + this.secrets = new HashMap<>(); this.objectMapper = new ObjectMapper(); } @@ -65,7 +64,7 @@ public Map getSecret(String secretId) throws SecretNotFoundExcep try { return objectMapper.readValue(secretJson, new TypeReference<>() {}); } catch (JsonProcessingException e) { - throw new SecretNotFoundException( + throw new SecretException( String.format("Failed to deserialize secret with ID '%s'", secretId), e); } } @@ -80,7 +79,7 @@ public String storeSecret(Map secretData) { secrets.put(secretId, secretJson); return secretId; } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize secret data", e); + throw new SecretException("Failed to serialize secret data", e); } } @@ -105,7 +104,7 @@ public void updateSecret(String secretId, Map newSecretData) String secretJson = objectMapper.writeValueAsString(newSecretData); secrets.put(secretId, secretJson); } catch (JsonProcessingException e) { - throw new RuntimeException("Failed to serialize secret data", e); + throw new SecretException("Failed to serialize secret data", e); } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java index 9de1dbfeb034f..17322e954b36a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.exceptions.CatalogException; -import org.apache.flink.table.secret.exceptions.SecretException; import java.util.Collections; import java.util.Set; @@ -55,23 +54,12 @@ public Set> optionalOptions() { @Override public SecretStore createSecretStore() { - if (secretStore == null) { - throw new IllegalStateException( - "SecretStoreFactory must be opened before creating a SecretStore"); - } - return secretStore; + return new GenericInMemorySecretStore(); } @Override - public void open(Context context) throws SecretException { - this.secretStore = new GenericInMemorySecretStore(); - } + public void open(Context context) {} @Override - public void close() throws CatalogException { - if (secretStore != null) { - secretStore.clear(); - secretStore = null; - } - } + public void close() throws CatalogException {} } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java new file mode 100644 index 0000000000000..dbac58d7d0461 --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.FileCatalogStoreFactory; +import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory; +import org.apache.flink.table.secret.CommonSecretOptions; +import org.apache.flink.table.secret.GenericInMemorySecretStoreFactory; +import org.apache.flink.table.secret.SecretStoreFactory; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TableFactoryUtil}. */ +class TableFactoryUtilTest { + + @Test + void testFindAndCreateCatalogStoreFactoryWithGenericInMemory() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory factory = + TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(GenericInMemoryCatalogStoreFactory.class); + } + + @Test + void testFindAndCreateCatalogStoreFactoryWithFile() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory factory = + TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(FileCatalogStoreFactory.class); + } + + @Test + void testFindAndCreateCatalogStoreFactoryWithDefaultKind() { + Configuration configuration = new Configuration(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory factory = + TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(GenericInMemoryCatalogStoreFactory.class); + } + + @Test + void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + configuration.setString("table.catalog-store.file.path", tempFolder.getAbsolutePath()); + configuration.setString("table.catalog-store.file.option1", "value1"); + configuration.setString("table.catalog-store.file.option2", "value2"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).containsEntry("path", tempFolder.getAbsolutePath()); + assertThat(context.getOptions()).containsEntry("option1", "value1"); + assertThat(context.getOptions()).containsEntry("option2", "value2"); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextWithGenericInMemory() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + configuration.setString("table.catalog-store.generic_in_memory.option1", "value1"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).containsEntry("option1", "value1"); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextWithoutOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).isEmpty(); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); + configuration.setString("table.catalog-store.file.path", "/test/path"); + configuration.setString("table.catalog-store.file.option1", "value1"); + configuration.setString("table.catalog-store.other.irrelevant", "should-not-appear"); + configuration.setString("other.config.key", "should-not-appear"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + CatalogStoreFactory.Context context = + TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).containsEntry("path", "/test/path"); + assertThat(context.getOptions()).containsEntry("option1", "value1"); + assertThat(context.getOptions()).doesNotContainKey("irrelevant"); + assertThat(context.getOptions()).doesNotContainKey("other.config.key"); + assertThat(context.getOptions()).hasSize(2); + } + + @Test + void testFindAndCreateSecretStoreFactoryWithGenericInMemory() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory factory = + TableFactoryUtil.findAndCreateSecretStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(GenericInMemorySecretStoreFactory.class); + } + + @Test + void testFindAndCreateSecretStoreFactoryWithDefaultKind() { + Configuration configuration = new Configuration(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory factory = + TableFactoryUtil.findAndCreateSecretStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(GenericInMemorySecretStoreFactory.class); + } + + @Test + void testBuildSecretStoreFactoryContext() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + configuration.setString("table.secret-store.generic_in_memory.option1", "value1"); + configuration.setString("table.secret-store.generic_in_memory.option2", "value2"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory.Context context = + TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).containsEntry("option1", "value1"); + assertThat(context.getOptions()).containsEntry("option2", "value2"); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildSecretStoreFactoryContextWithoutOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory.Context context = + TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).isEmpty(); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildSecretStoreFactoryContextOnlyExtractsRelevantOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + configuration.setString("table.secret-store.generic_in_memory.option1", "value1"); + configuration.setString("table.secret-store.generic_in_memory.option2", "value2"); + configuration.setString("table.secret-store.other.irrelevant", "should-not-appear"); + configuration.setString("other.config.key", "should-not-appear"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory.Context context = + TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).containsEntry("option1", "value1"); + assertThat(context.getOptions()).containsEntry("option2", "value2"); + assertThat(context.getOptions()).doesNotContainKey("irrelevant"); + assertThat(context.getOptions()).doesNotContainKey("other.config.key"); + assertThat(context.getOptions()).hasSize(2); + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java index bf3cb5db0d187..c1e6923df324c 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java @@ -27,7 +27,6 @@ import java.util.Map; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; /** Test for {@link GenericInMemorySecretStoreFactory}. */ public class GenericInMemorySecretStoreFactoryTest { @@ -50,16 +49,6 @@ void testSecretStoreInit() { factory.close(); } - @Test - void testCreateSecretStoreBeforeOpen() { - GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); - - assertThatThrownBy(factory::createSecretStore) - .isInstanceOf(IllegalStateException.class) - .hasMessageContaining( - "SecretStoreFactory must be opened before creating a SecretStore"); - } - @Test void testFactoryIdentifier() { GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); @@ -72,22 +61,4 @@ void testRequiredAndOptionalOptions() { assertThat(factory.requiredOptions().isEmpty()).isTrue(); assertThat(factory.optionalOptions().isEmpty()).isTrue(); } - - @Test - void testOpenAndClose() { - GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); - Map options = new HashMap<>(); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - DefaultSecretStoreContext context = - new DefaultSecretStoreContext(options, null, classLoader); - - factory.open(context); - SecretStore secretStore = factory.createSecretStore(); - assertThat(secretStore).isNotNull(); - - factory.close(); - - // After close, createSecretStore should fail - assertThatThrownBy(factory::createSecretStore).isInstanceOf(IllegalStateException.class); - } } diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java index 36c087ec92dfb..461909a6cd187 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/CommonSecretOptions.java @@ -32,7 +32,7 @@ public class CommonSecretOptions { .stringType() .defaultValue(DEFAULT_SECRET_STORE_KIND) .withDescription( - "The kind of secret store to be used. Out of the box, 'default_in_memory' option is supported. " + "The kind of secret store to be used. Out of the box, 'generic_in_memory' option is supported. " + "Implementations can provide custom secret stores for different backends " + "(e.g., cloud-specific secret managers)."); From aa113ef3b3309e8a7b9dfe65b5ca6ec1cdff5b2b Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Tue, 20 Jan 2026 12:33:58 -0800 Subject: [PATCH 5/9] fix --- .../table/tests/test_environment_settings_completeness.py | 2 +- .../main/java/org/apache/flink/table/factories/FactoryUtil.java | 2 ++ .../java/org/apache/flink/table/secret/SecretStoreFactory.java | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py index 88da848c8ef21..d80553637dec1 100644 --- a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py +++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py @@ -38,7 +38,7 @@ def java_class(cls): def excluded_methods(cls): # internal interfaces, no need to expose to users. return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore', - 'toConfiguration', 'fromConfiguration', 'getSqlFactory'} + 'getSecretStore', 'toConfiguration', 'fromConfiguration', 'getSqlFactory'} class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase): diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java index dc0c942cdf20e..594a7bf1d5d9b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java @@ -1412,6 +1412,8 @@ public ClassLoader getClassLoader() { } } + /** Default implementation of {@link SecretStoreFactory.Context}. */ + @Internal public static class DefaultSecretStoreContext implements SecretStoreFactory.Context { private Map options; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java index ceb4f17c93bea..1fd40b8e78be0 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/SecretStoreFactory.java @@ -39,6 +39,8 @@ public interface SecretStoreFactory extends Factory { /** Close secret store. */ void close() throws CatalogException; + /** Context for creating a SecretStore. */ + @PublicEvolving interface Context { /** * Returns the options with which the secret store is created. From b7852e1e442200f9f1d9a8aca53eb4c94f6b4f27 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Tue, 20 Jan 2026 17:00:56 -0800 Subject: [PATCH 6/9] fix --- .../table/tests/test_environment_settings_completeness.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py index d80553637dec1..5ce757f24721d 100644 --- a/flink-python/pyflink/table/tests/test_environment_settings_completeness.py +++ b/flink-python/pyflink/table/tests/test_environment_settings_completeness.py @@ -59,7 +59,7 @@ def java_class(cls): def excluded_methods(cls): # internal interfaces, no need to expose to users. # withSqlFactory - needs to be implemented - return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'} + return {'withClassLoader', 'withCatalogStore', 'withSecretStore', 'withSqlFactory'} if __name__ == '__main__': import unittest From 3a80f99226e298161c30ed15a9e6271bf22a7b68 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Thu, 29 Jan 2026 11:35:52 -0800 Subject: [PATCH 7/9] comments --- .../api/internal/TableEnvironmentImpl.java | 2 +- .../secret/GenericInMemorySecretStore.java | 3 +- .../GenericInMemorySecretStoreFactory.java | 6 +- .../table/factories/TableFactoryUtilTest.java | 107 ++++++++---------- ...GenericInMemorySecretStoreFactoryTest.java | 23 ++-- .../GenericInMemorySecretStoreTest.java | 47 +++----- .../secret/exceptions/SecretException.java | 4 +- 7 files changed, 84 insertions(+), 108 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 8d031e2a74a1d..1b01429cf8278 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -277,7 +277,7 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { TableFactoryUtil.buildSecretStoreFactoryContext( settings.getConfiguration(), userClassLoader); secretStoreFactory.open(secretStoreContext); - // TODO: pass secret store to catalog manager for encryption/decryption + // TODO (FLINK-38261): pass secret store to catalog manager for encryption/decryption final SecretStore secretStore = settings.getSecretStore() != null ? settings.getSecretStore() diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java index 154ead1ead305..c054f5a070bf3 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java @@ -43,12 +43,11 @@ @Internal public class GenericInMemorySecretStore implements ReadableSecretStore, WritableSecretStore { + private static final ObjectMapper objectMapper = new ObjectMapper(); private final Map secrets; - private final ObjectMapper objectMapper; public GenericInMemorySecretStore() { this.secrets = new HashMap<>(); - this.objectMapper = new ObjectMapper(); } @Override diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java index 17322e954b36a..bbccfd4ec2f4c 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactory.java @@ -22,7 +22,6 @@ import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.exceptions.CatalogException; -import java.util.Collections; import java.util.Set; /** @@ -34,7 +33,6 @@ @Internal public class GenericInMemorySecretStoreFactory implements SecretStoreFactory { - private GenericInMemorySecretStore secretStore; public static final String IDENTIFIER = "generic_in_memory"; @Override @@ -44,12 +42,12 @@ public String factoryIdentifier() { @Override public Set> requiredOptions() { - return Collections.emptySet(); + return Set.of(); } @Override public Set> optionalOptions() { - return Collections.emptySet(); + return Set.of(); } @Override diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java index dbac58d7d0461..0c4a4d7d8438d 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java @@ -28,47 +28,32 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.File; +import java.util.Map; +import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link TableFactoryUtil}. */ class TableFactoryUtilTest { - @Test - void testFindAndCreateCatalogStoreFactoryWithGenericInMemory() { - Configuration configuration = new Configuration(); - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory factory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); - - assertThat(factory).isInstanceOf(GenericInMemoryCatalogStoreFactory.class); - } - - @Test - void testFindAndCreateCatalogStoreFactoryWithFile() { - Configuration configuration = new Configuration(); - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory factory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); - - assertThat(factory).isInstanceOf(FileCatalogStoreFactory.class); - } - - @Test - void testFindAndCreateCatalogStoreFactoryWithDefaultKind() { + @ParameterizedTest(name = "kind={0}, expectedFactory={1}") + @MethodSource("catalogStoreFactoryTestParameters") + void testFindAndCreateCatalogStoreFactory(String kind, Class expectedFactoryClass) { Configuration configuration = new Configuration(); + if (kind != null) { + configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, kind); + } ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); CatalogStoreFactory factory = TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); - assertThat(factory).isInstanceOf(GenericInMemoryCatalogStoreFactory.class); + assertThat(factory).isInstanceOf(expectedFactoryClass); } @Test @@ -84,9 +69,12 @@ void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) { TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); assertThat(context).isNotNull(); - assertThat(context.getOptions()).containsEntry("path", tempFolder.getAbsolutePath()); - assertThat(context.getOptions()).containsEntry("option1", "value1"); - assertThat(context.getOptions()).containsEntry("option2", "value2"); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of( + "path", tempFolder.getAbsolutePath(), + "option1", "value1", + "option2", "value2")); assertThat(context.getConfiguration()).isEqualTo(configuration); assertThat(context.getClassLoader()).isEqualTo(classLoader); } @@ -102,7 +90,8 @@ void testBuildCatalogStoreFactoryContextWithGenericInMemory() { TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); assertThat(context).isNotNull(); - assertThat(context.getOptions()).containsEntry("option1", "value1"); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf(Map.of("option1", "value1")); assertThat(context.getConfiguration()).isEqualTo(configuration); assertThat(context.getClassLoader()).isEqualTo(classLoader); } @@ -136,34 +125,24 @@ void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() { TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); assertThat(context).isNotNull(); - assertThat(context.getOptions()).containsEntry("path", "/test/path"); - assertThat(context.getOptions()).containsEntry("option1", "value1"); - assertThat(context.getOptions()).doesNotContainKey("irrelevant"); - assertThat(context.getOptions()).doesNotContainKey("other.config.key"); - assertThat(context.getOptions()).hasSize(2); - } - - @Test - void testFindAndCreateSecretStoreFactoryWithGenericInMemory() { - Configuration configuration = new Configuration(); - configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - SecretStoreFactory factory = - TableFactoryUtil.findAndCreateSecretStoreFactory(configuration, classLoader); - - assertThat(factory).isInstanceOf(GenericInMemorySecretStoreFactory.class); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("path", "/test/path", "option1", "value1")); } - @Test - void testFindAndCreateSecretStoreFactoryWithDefaultKind() { + @ParameterizedTest(name = "kind={0}, expectedFactory={1}") + @MethodSource("secretStoreFactoryTestParameters") + void testFindAndCreateSecretStoreFactory(String kind, Class expectedFactoryClass) { Configuration configuration = new Configuration(); + if (kind != null) { + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, kind); + } ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); SecretStoreFactory factory = TableFactoryUtil.findAndCreateSecretStoreFactory(configuration, classLoader); - assertThat(factory).isInstanceOf(GenericInMemorySecretStoreFactory.class); + assertThat(factory).isInstanceOf(expectedFactoryClass); } @Test @@ -178,8 +157,9 @@ void testBuildSecretStoreFactoryContext() { TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); assertThat(context).isNotNull(); - assertThat(context.getOptions()).containsEntry("option1", "value1"); - assertThat(context.getOptions()).containsEntry("option2", "value2"); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("option1", "value1", "option2", "value2")); assertThat(context.getConfiguration()).isEqualTo(configuration); assertThat(context.getClassLoader()).isEqualTo(classLoader); } @@ -213,10 +193,21 @@ void testBuildSecretStoreFactoryContextOnlyExtractsRelevantOptions() { TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); assertThat(context).isNotNull(); - assertThat(context.getOptions()).containsEntry("option1", "value1"); - assertThat(context.getOptions()).containsEntry("option2", "value2"); - assertThat(context.getOptions()).doesNotContainKey("irrelevant"); - assertThat(context.getOptions()).doesNotContainKey("other.config.key"); - assertThat(context.getOptions()).hasSize(2); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("option1", "value1", "option2", "value2")); + } + + private static Stream catalogStoreFactoryTestParameters() { + return Stream.of( + Arguments.of("generic_in_memory", GenericInMemoryCatalogStoreFactory.class), + Arguments.of("file", FileCatalogStoreFactory.class), + Arguments.of(null, GenericInMemoryCatalogStoreFactory.class)); + } + + private static Stream secretStoreFactoryTestParameters() { + return Stream.of( + Arguments.of("generic_in_memory", GenericInMemorySecretStoreFactory.class), + Arguments.of(null, GenericInMemorySecretStoreFactory.class)); } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java index c1e6923df324c..a935114330659 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreFactoryTest.java @@ -26,10 +26,10 @@ import java.util.HashMap; import java.util.Map; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Test for {@link GenericInMemorySecretStoreFactory}. */ -public class GenericInMemorySecretStoreFactoryTest { +class GenericInMemorySecretStoreFactoryTest { @Test void testSecretStoreInit() { @@ -41,12 +41,15 @@ void testSecretStoreInit() { final SecretStoreFactory factory = FactoryUtil.discoverFactory( classLoader, SecretStoreFactory.class, factoryIdentifier); - factory.open(discoveryContext); - - SecretStore secretStore = factory.createSecretStore(); - assertThat(secretStore instanceof GenericInMemorySecretStore).isTrue(); - - factory.close(); + try { + factory.open(discoveryContext); + SecretStore secretStore = factory.createSecretStore(); + assertThat(secretStore instanceof GenericInMemorySecretStore).isTrue(); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + factory.close(); + } } @Test @@ -58,7 +61,7 @@ void testFactoryIdentifier() { @Test void testRequiredAndOptionalOptions() { GenericInMemorySecretStoreFactory factory = new GenericInMemorySecretStoreFactory(); - assertThat(factory.requiredOptions().isEmpty()).isTrue(); - assertThat(factory.optionalOptions().isEmpty()).isTrue(); + assertThat(factory.requiredOptions()).isEmpty(); + assertThat(factory.optionalOptions()).isEmpty(); } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java index e25cb199f8294..0e3f5f7b62324 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java @@ -23,14 +23,13 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.HashMap; import java.util.Map; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Test for {@link GenericInMemorySecretStore}. */ -public class GenericInMemorySecretStoreTest { +class GenericInMemorySecretStoreTest { private GenericInMemorySecretStore secretStore; @@ -41,9 +40,7 @@ void setUp() { @Test void testStoreAndGetSecret() throws SecretNotFoundException { - Map secretData = new HashMap<>(); - secretData.put("username", "testuser"); - secretData.put("password", "testpass"); + Map secretData = Map.of("username", "testuser", "password", "testpass"); String secretId = secretStore.storeSecret(secretData); assertThat(secretId).isNotNull(); @@ -77,8 +74,7 @@ void testStoreSecretWithNullData() { @Test void testRemoveSecret() throws SecretNotFoundException { - Map secretData = new HashMap<>(); - secretData.put("key", "value"); + Map secretData = Map.of("key", "value"); String secretId = secretStore.storeSecret(secretData); assertThat(secretStore.getSecret(secretId)).isNotNull(); @@ -104,15 +100,11 @@ void testRemoveNonExistentSecret() { @Test void testUpdateSecret() throws SecretNotFoundException { - Map originalData = new HashMap<>(); - originalData.put("username", "olduser"); - originalData.put("password", "oldpass"); + Map originalData = Map.of("username", "olduser", "password", "oldpass"); String secretId = secretStore.storeSecret(originalData); - Map updatedData = new HashMap<>(); - updatedData.put("username", "newuser"); - updatedData.put("password", "newpass"); + Map updatedData = Map.of("username", "newuser", "password", "newpass"); secretStore.updateSecret(secretId, updatedData); @@ -123,8 +115,7 @@ void testUpdateSecret() throws SecretNotFoundException { @Test void testUpdateNonExistentSecret() { - Map secretData = new HashMap<>(); - secretData.put("key", "value"); + Map secretData = Map.of("key", "value"); assertThatThrownBy(() -> secretStore.updateSecret("non-existent-id", secretData)) .isInstanceOf(SecretNotFoundException.class) @@ -133,8 +124,7 @@ void testUpdateNonExistentSecret() { @Test void testUpdateSecretWithNullId() { - Map secretData = new HashMap<>(); - secretData.put("key", "value"); + Map secretData = Map.of("key", "value"); assertThatThrownBy(() -> secretStore.updateSecret(null, secretData)) .isInstanceOf(NullPointerException.class) @@ -143,8 +133,7 @@ void testUpdateSecretWithNullId() { @Test void testUpdateSecretWithNullData() { - Map originalData = new HashMap<>(); - originalData.put("key", "value"); + Map originalData = Map.of("key", "value"); String secretId = secretStore.storeSecret(originalData); assertThatThrownBy(() -> secretStore.updateSecret(secretId, null)) @@ -154,12 +143,10 @@ void testUpdateSecretWithNullData() { @Test void testClear() { - Map secretData1 = new HashMap<>(); - secretData1.put("key1", "value1"); + Map secretData1 = Map.of("key1", "value1"); String secretId1 = secretStore.storeSecret(secretData1); - Map secretData2 = new HashMap<>(); - secretData2.put("key2", "value2"); + Map secretData2 = Map.of("key2", "value2"); String secretId2 = secretStore.storeSecret(secretData2); secretStore.clear(); @@ -172,21 +159,19 @@ void testClear() { @Test void testStoreEmptySecret() throws SecretNotFoundException { - Map emptyData = new HashMap<>(); + Map emptyData = Map.of(); String secretId = secretStore.storeSecret(emptyData); Map retrievedSecret = secretStore.getSecret(secretId); assertThat(retrievedSecret).isNotNull(); - assertThat(retrievedSecret.isEmpty()).isTrue(); + assertThat(retrievedSecret).isEmpty(); } @Test void testStoreMultipleSecrets() throws SecretNotFoundException { - Map secret1 = new HashMap<>(); - secret1.put("user1", "pass1"); + Map secret1 = Map.of("user1", "pass1"); - Map secret2 = new HashMap<>(); - secret2.put("user2", "pass2"); + Map secret2 = Map.of("user2", "pass2"); String secretId1 = secretStore.storeSecret(secret1); String secretId2 = secretStore.storeSecret(secret2); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java index 8304e41ac963c..b856938ba6a54 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/secret/exceptions/SecretException.java @@ -21,9 +21,9 @@ import org.apache.flink.annotation.PublicEvolving; /** - * Base exception for all secret-related errors in the secret store. + * Base exception for all secret-related errors. * - *

This exception serves as the parent class for all secret store related exceptions, providing a + *

This exception serves as the parent class for all secret related exceptions, providing a * common type for handling errors that occur during secret management operations. */ @PublicEvolving From 1796ddd64299c8ffc04e8ce86ad6be92dc3a72a8 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Mon, 2 Feb 2026 12:28:13 -0800 Subject: [PATCH 8/9] comments --- .../flink/table/secret/GenericInMemorySecretStoreTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java index 0e3f5f7b62324..53674e1e5c03f 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/secret/GenericInMemorySecretStoreTest.java @@ -27,6 +27,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; /** Test for {@link GenericInMemorySecretStore}. */ class GenericInMemorySecretStoreTest { @@ -95,7 +96,7 @@ void testRemoveSecretWithNullId() { @Test void testRemoveNonExistentSecret() { // Should not throw exception, just silently remove nothing - secretStore.removeSecret("non-existent-id"); + assertDoesNotThrow(() -> secretStore.removeSecret("non-existent-id")); } @Test From ef5948c981b7d09020076ada050f96a1444e69f9 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Thu, 5 Feb 2026 17:29:24 -0800 Subject: [PATCH 9/9] comments --- .gitignore | 1 + .../flink/table/api/EnvironmentSettings.java | 5 +- .../api/internal/TableEnvironmentImpl.java | 26 ++- .../flink/table/factories/ApiFactoryUtil.java | 36 +++ .../table/factories/TableFactoryUtil.java | 36 --- .../secret/GenericInMemorySecretStore.java | 38 +--- .../table/factories/ApiFactoryUtilTest.java | 77 +++++++ .../table/factories/TableFactoryUtilTest.java | 213 ------------------ 8 files changed, 140 insertions(+), 292 deletions(-) delete mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java diff --git a/.gitignore b/.gitignore index ff2c78719c82c..7966b633e0e21 100644 --- a/.gitignore +++ b/.gitignore @@ -10,6 +10,7 @@ scalastyle-output.xml .metals .bloop .cursor +.claude .metadata .settings .project diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java index bb6e588d92282..92a375bf0bf8f 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/EnvironmentSettings.java @@ -158,9 +158,8 @@ public Optional getSqlFactory() { } @Internal - @Nullable - public SecretStore getSecretStore() { - return secretStore; + public Optional getSecretStore() { + return Optional.ofNullable(secretStore); } /** A builder for {@link EnvironmentSettings}. */ diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 1b01429cf8278..47c8b53dfd9fd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -270,18 +270,22 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) { catalogStore = catalogStoreFactory.createCatalogStore(); } - final SecretStoreFactory secretStoreFactory = - TableFactoryUtil.findAndCreateSecretStoreFactory( - settings.getConfiguration(), userClassLoader); - final SecretStoreFactory.Context secretStoreContext = - TableFactoryUtil.buildSecretStoreFactoryContext( - settings.getConfiguration(), userClassLoader); - secretStoreFactory.open(secretStoreContext); + final SecretStore secretStore; + final SecretStoreFactory secretStoreFactory; + if (settings.getSecretStore().isPresent()) { + secretStore = settings.getSecretStore().get(); + secretStoreFactory = null; + } else { + secretStoreFactory = + ApiFactoryUtil.findAndCreateSecretStoreFactory( + settings.getConfiguration(), userClassLoader); + final SecretStoreFactory.Context secretStoreContext = + ApiFactoryUtil.buildSecretStoreFactoryContext( + settings.getConfiguration(), userClassLoader); + secretStoreFactory.open(secretStoreContext); + secretStore = secretStoreFactory.createSecretStore(); + } // TODO (FLINK-38261): pass secret store to catalog manager for encryption/decryption - final SecretStore secretStore = - settings.getSecretStore() != null - ? settings.getSecretStore() - : secretStoreFactory.createSecretStore(); // use configuration to init table config final TableConfig tableConfig = TableConfig.getDefault(); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java index 2f4b36b2bd612..92486d94f1342 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/ApiFactoryUtil.java @@ -22,6 +22,8 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DelegatingConfiguration; import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.secret.CommonSecretOptions; +import org.apache.flink.table.secret.SecretStoreFactory; import java.util.Map; @@ -74,4 +76,38 @@ public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext( return context; } + + public static SecretStoreFactory findAndCreateSecretStoreFactory( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); + + SecretStoreFactory secretStoreFactory = + FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier); + + return secretStoreFactory; + } + + /** + * Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}. + * + *

The configuration format should be as follows: + * + *

{@code
+     * table.secret-store.kind: {identifier}
+     * table.secret-store.{identifier}.{param1}: xxx
+     * table.secret-store.{identifier}.{param2}: xxx
+     * }
+ */ + public static SecretStoreFactory.Context buildSecretStoreFactoryContext( + Configuration configuration, ClassLoader classLoader) { + String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); + String secretStoreOptionPrefix = + CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + "."; + Map options = + new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap(); + SecretStoreFactory.Context context = + new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader); + + return context; + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java index da425bf4daaad..ba10e4de707b7 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/factories/TableFactoryUtil.java @@ -36,8 +36,6 @@ import org.apache.flink.table.legacy.factories.TableSourceFactory; import org.apache.flink.table.legacy.sinks.TableSink; import org.apache.flink.table.legacy.sources.TableSource; -import org.apache.flink.table.secret.CommonSecretOptions; -import org.apache.flink.table.secret.SecretStoreFactory; import java.util.Collections; import java.util.List; @@ -172,38 +170,4 @@ public ClassLoader getUserClassLoader() { })) .collect(Collectors.toList()); } - - public static SecretStoreFactory findAndCreateSecretStoreFactory( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); - - SecretStoreFactory secretStoreFactory = - FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier); - - return secretStoreFactory; - } - - /** - * Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}. - * - *

The configuration format should be as follows: - * - *

{@code
-     * table.secret-store.kind: {identifier}
-     * table.secret-store.{identifier}.{param1}: xxx
-     * table.secret-store.{identifier}.{param2}: xxx
-     * }
- */ - public static SecretStoreFactory.Context buildSecretStoreFactoryContext( - Configuration configuration, ClassLoader classLoader) { - String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND); - String secretStoreOptionPrefix = - CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + "."; - Map options = - new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap(); - SecretStoreFactory.Context context = - new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader); - - return context; - } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java index c054f5a070bf3..7554bb789e621 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/secret/GenericInMemorySecretStore.java @@ -19,13 +19,9 @@ package org.apache.flink.table.secret; import org.apache.flink.annotation.Internal; -import org.apache.flink.table.secret.exceptions.SecretException; import org.apache.flink.table.secret.exceptions.SecretNotFoundException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference; -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; - +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -36,15 +32,14 @@ * A generic in-memory implementation of both {@link ReadableSecretStore} and {@link * WritableSecretStore}. * - *

This implementation stores secrets in memory as plaintext JSON strings. It is suitable for + *

This implementation stores secrets in memory as immutable Map objects. It is suitable for * testing and development purposes but should not be used in production environments as secrets are * not encrypted. */ @Internal public class GenericInMemorySecretStore implements ReadableSecretStore, WritableSecretStore { - private static final ObjectMapper objectMapper = new ObjectMapper(); - private final Map secrets; + private final Map> secrets; public GenericInMemorySecretStore() { this.secrets = new HashMap<>(); @@ -54,18 +49,13 @@ public GenericInMemorySecretStore() { public Map getSecret(String secretId) throws SecretNotFoundException { checkNotNull(secretId, "Secret ID cannot be null"); - String secretJson = secrets.get(secretId); - if (secretJson == null) { + Map secretData = secrets.get(secretId); + if (secretData == null) { throw new SecretNotFoundException( String.format("Secret with ID '%s' not found", secretId)); } - try { - return objectMapper.readValue(secretJson, new TypeReference<>() {}); - } catch (JsonProcessingException e) { - throw new SecretException( - String.format("Failed to deserialize secret with ID '%s'", secretId), e); - } + return secretData; } @Override @@ -73,13 +63,8 @@ public String storeSecret(Map secretData) { checkNotNull(secretData, "Secret data cannot be null"); String secretId = UUID.randomUUID().toString(); - try { - String secretJson = objectMapper.writeValueAsString(secretData); - secrets.put(secretId, secretJson); - return secretId; - } catch (JsonProcessingException e) { - throw new SecretException("Failed to serialize secret data", e); - } + secrets.put(secretId, Collections.unmodifiableMap(new HashMap<>(secretData))); + return secretId; } @Override @@ -99,12 +84,7 @@ public void updateSecret(String secretId, Map newSecretData) String.format("Secret with ID '%s' not found", secretId)); } - try { - String secretJson = objectMapper.writeValueAsString(newSecretData); - secrets.put(secretId, secretJson); - } catch (JsonProcessingException e) { - throw new SecretException("Failed to serialize secret data", e); - } + secrets.put(secretId, Collections.unmodifiableMap(new HashMap<>(newSecretData))); } /** Clears all secrets from the store (for testing purposes). */ diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java index c8e1908291b85..e1ea22a6f5771 100644 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/ApiFactoryUtilTest.java @@ -22,6 +22,9 @@ import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.flink.table.catalog.FileCatalogStoreFactory; import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory; +import org.apache.flink.table.secret.CommonSecretOptions; +import org.apache.flink.table.secret.GenericInMemorySecretStoreFactory; +import org.apache.flink.table.secret.SecretStoreFactory; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -127,10 +130,84 @@ void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() { Map.of("path", "/test/path", "option1", "value1")); } + @ParameterizedTest(name = "kind={0}, expectedFactory={1}") + @MethodSource("secretStoreFactoryTestParameters") + void testFindAndCreateSecretStoreFactory(String kind, Class expectedFactoryClass) { + Configuration configuration = new Configuration(); + if (kind != null) { + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, kind); + } + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory factory = + ApiFactoryUtil.findAndCreateSecretStoreFactory(configuration, classLoader); + + assertThat(factory).isInstanceOf(expectedFactoryClass); + } + + @Test + void testBuildSecretStoreFactoryContext() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + configuration.setString("table.secret-store.generic_in_memory.option1", "value1"); + configuration.setString("table.secret-store.generic_in_memory.option2", "value2"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory.Context context = + ApiFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("option1", "value1", "option2", "value2")); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildSecretStoreFactoryContextWithoutOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory.Context context = + ApiFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()).isEmpty(); + assertThat(context.getConfiguration()).isEqualTo(configuration); + assertThat(context.getClassLoader()).isEqualTo(classLoader); + } + + @Test + void testBuildSecretStoreFactoryContextOnlyExtractsRelevantOptions() { + Configuration configuration = new Configuration(); + configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); + configuration.setString("table.secret-store.generic_in_memory.option1", "value1"); + configuration.setString("table.secret-store.generic_in_memory.option2", "value2"); + configuration.setString("table.secret-store.other.irrelevant", "should-not-appear"); + configuration.setString("other.config.key", "should-not-appear"); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + + SecretStoreFactory.Context context = + ApiFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); + + assertThat(context).isNotNull(); + assertThat(context.getOptions()) + .containsExactlyInAnyOrderEntriesOf( + Map.of("option1", "value1", "option2", "value2")); + } + private static Stream catalogStoreFactoryTestParameters() { return Stream.of( Arguments.of("generic_in_memory", GenericInMemoryCatalogStoreFactory.class), Arguments.of("file", FileCatalogStoreFactory.class), Arguments.of(null, GenericInMemoryCatalogStoreFactory.class)); } + + private static Stream secretStoreFactoryTestParameters() { + return Stream.of( + Arguments.of("generic_in_memory", GenericInMemorySecretStoreFactory.class), + Arguments.of(null, GenericInMemorySecretStoreFactory.class)); + } } diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java deleted file mode 100644 index 0c4a4d7d8438d..0000000000000 --- a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/factories/TableFactoryUtilTest.java +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.factories; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.catalog.CommonCatalogOptions; -import org.apache.flink.table.catalog.FileCatalogStoreFactory; -import org.apache.flink.table.catalog.GenericInMemoryCatalogStoreFactory; -import org.apache.flink.table.secret.CommonSecretOptions; -import org.apache.flink.table.secret.GenericInMemorySecretStoreFactory; -import org.apache.flink.table.secret.SecretStoreFactory; - -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.io.TempDir; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; - -import java.io.File; -import java.util.Map; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - -/** Tests for {@link TableFactoryUtil}. */ -class TableFactoryUtilTest { - - @ParameterizedTest(name = "kind={0}, expectedFactory={1}") - @MethodSource("catalogStoreFactoryTestParameters") - void testFindAndCreateCatalogStoreFactory(String kind, Class expectedFactoryClass) { - Configuration configuration = new Configuration(); - if (kind != null) { - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, kind); - } - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory factory = - TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, classLoader); - - assertThat(factory).isInstanceOf(expectedFactoryClass); - } - - @Test - void testBuildCatalogStoreFactoryContext(@TempDir File tempFolder) { - Configuration configuration = new Configuration(); - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); - configuration.setString("table.catalog-store.file.path", tempFolder.getAbsolutePath()); - configuration.setString("table.catalog-store.file.option1", "value1"); - configuration.setString("table.catalog-store.file.option2", "value2"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory.Context context = - TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()) - .containsExactlyInAnyOrderEntriesOf( - Map.of( - "path", tempFolder.getAbsolutePath(), - "option1", "value1", - "option2", "value2")); - assertThat(context.getConfiguration()).isEqualTo(configuration); - assertThat(context.getClassLoader()).isEqualTo(classLoader); - } - - @Test - void testBuildCatalogStoreFactoryContextWithGenericInMemory() { - Configuration configuration = new Configuration(); - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); - configuration.setString("table.catalog-store.generic_in_memory.option1", "value1"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory.Context context = - TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()) - .containsExactlyInAnyOrderEntriesOf(Map.of("option1", "value1")); - assertThat(context.getConfiguration()).isEqualTo(configuration); - assertThat(context.getClassLoader()).isEqualTo(classLoader); - } - - @Test - void testBuildCatalogStoreFactoryContextWithoutOptions() { - Configuration configuration = new Configuration(); - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "generic_in_memory"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory.Context context = - TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()).isEmpty(); - assertThat(context.getConfiguration()).isEqualTo(configuration); - assertThat(context.getClassLoader()).isEqualTo(classLoader); - } - - @Test - void testBuildCatalogStoreFactoryContextOnlyExtractsRelevantOptions() { - Configuration configuration = new Configuration(); - configuration.set(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND, "file"); - configuration.setString("table.catalog-store.file.path", "/test/path"); - configuration.setString("table.catalog-store.file.option1", "value1"); - configuration.setString("table.catalog-store.other.irrelevant", "should-not-appear"); - configuration.setString("other.config.key", "should-not-appear"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - CatalogStoreFactory.Context context = - TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()) - .containsExactlyInAnyOrderEntriesOf( - Map.of("path", "/test/path", "option1", "value1")); - } - - @ParameterizedTest(name = "kind={0}, expectedFactory={1}") - @MethodSource("secretStoreFactoryTestParameters") - void testFindAndCreateSecretStoreFactory(String kind, Class expectedFactoryClass) { - Configuration configuration = new Configuration(); - if (kind != null) { - configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, kind); - } - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - SecretStoreFactory factory = - TableFactoryUtil.findAndCreateSecretStoreFactory(configuration, classLoader); - - assertThat(factory).isInstanceOf(expectedFactoryClass); - } - - @Test - void testBuildSecretStoreFactoryContext() { - Configuration configuration = new Configuration(); - configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); - configuration.setString("table.secret-store.generic_in_memory.option1", "value1"); - configuration.setString("table.secret-store.generic_in_memory.option2", "value2"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - SecretStoreFactory.Context context = - TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()) - .containsExactlyInAnyOrderEntriesOf( - Map.of("option1", "value1", "option2", "value2")); - assertThat(context.getConfiguration()).isEqualTo(configuration); - assertThat(context.getClassLoader()).isEqualTo(classLoader); - } - - @Test - void testBuildSecretStoreFactoryContextWithoutOptions() { - Configuration configuration = new Configuration(); - configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - SecretStoreFactory.Context context = - TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()).isEmpty(); - assertThat(context.getConfiguration()).isEqualTo(configuration); - assertThat(context.getClassLoader()).isEqualTo(classLoader); - } - - @Test - void testBuildSecretStoreFactoryContextOnlyExtractsRelevantOptions() { - Configuration configuration = new Configuration(); - configuration.set(CommonSecretOptions.TABLE_SECRET_STORE_KIND, "generic_in_memory"); - configuration.setString("table.secret-store.generic_in_memory.option1", "value1"); - configuration.setString("table.secret-store.generic_in_memory.option2", "value2"); - configuration.setString("table.secret-store.other.irrelevant", "should-not-appear"); - configuration.setString("other.config.key", "should-not-appear"); - ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); - - SecretStoreFactory.Context context = - TableFactoryUtil.buildSecretStoreFactoryContext(configuration, classLoader); - - assertThat(context).isNotNull(); - assertThat(context.getOptions()) - .containsExactlyInAnyOrderEntriesOf( - Map.of("option1", "value1", "option2", "value2")); - } - - private static Stream catalogStoreFactoryTestParameters() { - return Stream.of( - Arguments.of("generic_in_memory", GenericInMemoryCatalogStoreFactory.class), - Arguments.of("file", FileCatalogStoreFactory.class), - Arguments.of(null, GenericInMemoryCatalogStoreFactory.class)); - } - - private static Stream secretStoreFactoryTestParameters() { - return Stream.of( - Arguments.of("generic_in_memory", GenericInMemorySecretStoreFactory.class), - Arguments.of(null, GenericInMemorySecretStoreFactory.class)); - } -}