Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ scalastyle-output.xml
.metals
.bloop
.cursor
.claude
.metadata
.settings
.project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def java_class(cls):
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
return {'getPlanner', 'getExecutor', 'getUserClassLoader', 'getCatalogStore',
'toConfiguration', 'fromConfiguration', 'getSqlFactory'}
'getSecretStore', 'toConfiguration', 'fromConfiguration', 'getSqlFactory'}


class EnvironmentSettingsBuilderCompletenessTests(PythonAPICompletenessTestCase, PyFlinkTestCase):
Expand All @@ -59,7 +59,7 @@ def java_class(cls):
def excluded_methods(cls):
# internal interfaces, no need to expose to users.
# withSqlFactory - needs to be implemented
return {'withClassLoader', 'withCatalogStore', 'withSqlFactory'}
return {'withClassLoader', 'withCatalogStore', 'withSecretStore', 'withSqlFactory'}

if __name__ == '__main__':
import unittest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.table.catalog.CatalogStoreHolder;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
Expand Down Expand Up @@ -390,9 +391,9 @@ private static CatalogManager buildCatalogManager(
SessionEnvironment environment) {

CatalogStoreFactory catalogStoreFactory =
TableFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader);
ApiFactoryUtil.findAndCreateCatalogStoreFactory(configuration, userClassLoader);
CatalogStoreFactory.Context catalogStoreFactoryContext =
TableFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader);
ApiFactoryUtil.buildCatalogStoreFactoryContext(configuration, userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
CatalogStoreHolder catalogStore =
CatalogStoreHolder.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.factories.TableFactoryUtil;
Expand Down Expand Up @@ -112,17 +113,21 @@ public static StreamTableEnvironment create(
new ResourceManager(settings.getConfiguration(), userClassLoader);
final ModuleManager moduleManager = new ModuleManager();

final CatalogStoreFactory catalogStoreFactory =
TableFactoryUtil.findAndCreateCatalogStoreFactory(
settings.getConfiguration(), userClassLoader);
final CatalogStoreFactory.Context catalogStoreFactoryContext =
TableFactoryUtil.buildCatalogStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
final CatalogStore catalogStore =
settings.getCatalogStore() != null
? settings.getCatalogStore()
: catalogStoreFactory.createCatalogStore();
final CatalogStore catalogStore;
final CatalogStoreFactory catalogStoreFactory;
if (settings.getCatalogStore().isPresent()) {
catalogStore = settings.getCatalogStore().get();
catalogStoreFactory = null;
} else {
catalogStoreFactory =
ApiFactoryUtil.findAndCreateCatalogStoreFactory(
settings.getConfiguration(), userClassLoader);
final CatalogStoreFactory.Context catalogStoreFactoryContext =
ApiFactoryUtil.buildCatalogStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
catalogStoreFactory.open(catalogStoreFactoryContext);
catalogStore = catalogStoreFactory.createCatalogStore();
}

final CatalogManager catalogManager =
CatalogManager.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.table.catalog.CatalogStore;
import org.apache.flink.table.expressions.SqlFactory;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.secret.SecretStore;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -66,16 +67,19 @@ public class EnvironmentSettings {

private final @Nullable CatalogStore catalogStore;
private final @Nullable SqlFactory sqlFactory;
private final @Nullable SecretStore secretStore;

private EnvironmentSettings(
Configuration configuration,
ClassLoader classLoader,
CatalogStore catalogStore,
SqlFactory sqlFactory) {
SqlFactory sqlFactory,
SecretStore secretStore) {
this.configuration = configuration;
this.classLoader = classLoader;
this.catalogStore = catalogStore;
this.sqlFactory = sqlFactory;
this.secretStore = secretStore;
}

/**
Expand Down Expand Up @@ -144,16 +148,20 @@ public ClassLoader getUserClassLoader() {
}

@Internal
@Nullable
public CatalogStore getCatalogStore() {
return catalogStore;
public Optional<CatalogStore> getCatalogStore() {
return Optional.ofNullable(catalogStore);
}

@Internal
public Optional<SqlFactory> getSqlFactory() {
return Optional.ofNullable(sqlFactory);
}

@Internal
public Optional<SecretStore> getSecretStore() {
return Optional.ofNullable(secretStore);
}

/** A builder for {@link EnvironmentSettings}. */
@PublicEvolving
public static class Builder {
Expand All @@ -163,6 +171,7 @@ public static class Builder {

private @Nullable CatalogStore catalogStore;
private @Nullable SqlFactory sqlFactory;
private @Nullable SecretStore secretStore;

public Builder() {}

Expand Down Expand Up @@ -251,12 +260,28 @@ public Builder withSqlFactory(SqlFactory sqlFactory) {
return this;
}

/**
* Specifies the {@link SecretStore} to be used for managing secrets in the {@link
* TableEnvironment}.
*
* <p>The secret store allows for secure storage and retrieval of sensitive configuration
* data such as credentials, tokens, and passwords.
*
* @param secretStore the secret store instance to use
* @return this builder
*/
public Builder withSecretStore(SecretStore secretStore) {
this.secretStore = secretStore;
return this;
}

/** Returns an immutable instance of {@link EnvironmentSettings}. */
public EnvironmentSettings build() {
if (classLoader == null) {
classLoader = Thread.currentThread().getContextClassLoader();
}
return new EnvironmentSettings(configuration, classLoader, catalogStore, sqlFactory);
return new EnvironmentSettings(
configuration, classLoader, catalogStore, sqlFactory, secretStore);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.flink.table.expressions.ModelReferenceExpression;
import org.apache.flink.table.expressions.TableReferenceExpression;
import org.apache.flink.table.expressions.utils.ApiExpressionDefaultVisitor;
import org.apache.flink.table.factories.ApiFactoryUtil;
import org.apache.flink.table.factories.CatalogStoreFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.PlannerFactoryUtil;
Expand Down Expand Up @@ -119,6 +120,8 @@
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
import org.apache.flink.table.resource.ResourceUri;
import org.apache.flink.table.secret.SecretStore;
import org.apache.flink.table.secret.SecretStoreFactory;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.DataTypeUtils;
Expand Down Expand Up @@ -251,17 +254,38 @@ public static TableEnvironmentImpl create(EnvironmentSettings settings) {
userClassLoader, ExecutorFactory.class, ExecutorFactory.DEFAULT_IDENTIFIER);
final Executor executor = executorFactory.create(settings.getConfiguration());

final CatalogStoreFactory catalogStoreFactory =
TableFactoryUtil.findAndCreateCatalogStoreFactory(
settings.getConfiguration(), userClassLoader);
final CatalogStoreFactory.Context context =
TableFactoryUtil.buildCatalogStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
catalogStoreFactory.open(context);
final CatalogStore catalogStore =
settings.getCatalogStore() != null
? settings.getCatalogStore()
: catalogStoreFactory.createCatalogStore();
final CatalogStore catalogStore;
final CatalogStoreFactory catalogStoreFactory;
if (settings.getCatalogStore().isPresent()) {
catalogStore = settings.getCatalogStore().get();
catalogStoreFactory = null;
} else {
catalogStoreFactory =
ApiFactoryUtil.findAndCreateCatalogStoreFactory(
settings.getConfiguration(), userClassLoader);
final CatalogStoreFactory.Context context =
ApiFactoryUtil.buildCatalogStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
catalogStoreFactory.open(context);
catalogStore = catalogStoreFactory.createCatalogStore();
}

final SecretStore secretStore;
final SecretStoreFactory secretStoreFactory;
if (settings.getSecretStore().isPresent()) {
secretStore = settings.getSecretStore().get();
secretStoreFactory = null;
} else {
secretStoreFactory =
ApiFactoryUtil.findAndCreateSecretStoreFactory(
settings.getConfiguration(), userClassLoader);
final SecretStoreFactory.Context secretStoreContext =
ApiFactoryUtil.buildSecretStoreFactoryContext(
settings.getConfiguration(), userClassLoader);
secretStoreFactory.open(secretStoreContext);
secretStore = secretStoreFactory.createSecretStore();
}
// TODO (FLINK-38261): pass secret store to catalog manager for encryption/decryption

// use configuration to init table config
final TableConfig tableConfig = TableConfig.getDefault();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories;

import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DelegatingConfiguration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.secret.CommonSecretOptions;
import org.apache.flink.table.secret.SecretStoreFactory;

import java.util.Map;

/** Utility for dealing with catalog store factories. */
@Internal
public class ApiFactoryUtil {

/**
* Finds and creates a {@link CatalogStoreFactory} using the provided {@link Configuration} and
* user classloader.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.catalog-store.kind: {identifier}
* table.catalog-store.{identifier}.{param1}: xxx
* table.catalog-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static CatalogStoreFactory findAndCreateCatalogStoreFactory(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);

CatalogStoreFactory catalogStoreFactory =
FactoryUtil.discoverFactory(classLoader, CatalogStoreFactory.class, identifier);

return catalogStoreFactory;
}

/**
* Build a {@link CatalogStoreFactory.Context} for opening the {@link CatalogStoreFactory}.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.catalog-store.kind: {identifier}
* table.catalog-store.{identifier}.{param1}: xxx
* table.catalog-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static CatalogStoreFactory.Context buildCatalogStoreFactoryContext(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonCatalogOptions.TABLE_CATALOG_STORE_KIND);
String catalogStoreOptionPrefix =
CommonCatalogOptions.TABLE_CATALOG_STORE_OPTION_PREFIX + identifier + ".";
Map<String, String> options =
new DelegatingConfiguration(configuration, catalogStoreOptionPrefix).toMap();
CatalogStoreFactory.Context context =
new FactoryUtil.DefaultCatalogStoreContext(options, configuration, classLoader);

return context;
}

public static SecretStoreFactory findAndCreateSecretStoreFactory(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);

SecretStoreFactory secretStoreFactory =
FactoryUtil.discoverFactory(classLoader, SecretStoreFactory.class, identifier);

return secretStoreFactory;
}

/**
* Build a {@link SecretStoreFactory.Context} for opening the {@link SecretStoreFactory}.
*
* <p>The configuration format should be as follows:
*
* <pre>{@code
* table.secret-store.kind: {identifier}
* table.secret-store.{identifier}.{param1}: xxx
* table.secret-store.{identifier}.{param2}: xxx
* }</pre>
*/
public static SecretStoreFactory.Context buildSecretStoreFactoryContext(
Configuration configuration, ClassLoader classLoader) {
String identifier = configuration.get(CommonSecretOptions.TABLE_SECRET_STORE_KIND);
String secretStoreOptionPrefix =
CommonSecretOptions.TABLE_SECRET_STORE_OPTION_PREFIX + identifier + ".";
Map<String, String> options =
new DelegatingConfiguration(configuration, secretStoreOptionPrefix).toMap();
SecretStoreFactory.Context context =
new FactoryUtil.DefaultSecretStoreContext(options, configuration, classLoader);

return context;
}
}
Loading