From 364591cb5329c4b484d335b4e96a8a61bbacc39c Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 16 Jan 2025 19:48:25 +0800 Subject: [PATCH] Extract flink converter common logic. --- .../iceberg/IcebergPropertiesUtils.java | 9 ++ .../flink/connector/PropertiesConverter.java | 83 +++++++++++++++++-- .../hive/HivePropertiesConverter.java | 43 ++-------- .../iceberg/IcebergPropertiesConverter.java | 47 ++++------- .../paimon/PaimonPropertiesConverter.java | 56 ++++--------- .../TestIcebergPropertiesConverter.java | 16 +++- .../test/iceberg/FlinkIcebergCatalogIT.java | 2 +- .../test/paimon/FlinkPaimonCatalogIT.java | 2 +- 8 files changed, 135 insertions(+), 123 deletions(-) diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java index df1340c947e..92c5d18a129 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java @@ -33,6 +33,7 @@ public class IcebergPropertiesUtils { // will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will // change it to `catalogType` automatically and pass it to Iceberg. public static final Map GRAVITINO_CONFIG_TO_ICEBERG; + public static final Map ICEBERG_CATALOG_CONFIG_TO_GRAVITINO; static { Map map = new HashMap(); @@ -65,6 +66,14 @@ public class IcebergPropertiesUtils { AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY, IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY); GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map); + + Map icebergCatalogConfigToGravitino = new HashMap<>(); + map.forEach( + (key, value) -> { + icebergCatalogConfigToGravitino.put(value, key); + }); + ICEBERG_CATALOG_CONFIG_TO_GRAVITINO = + Collections.unmodifiableMap(icebergCatalogConfigToGravitino); } /** diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java index c9fbb8a491c..15d1a12fa3b 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java @@ -19,8 +19,10 @@ package org.apache.gravitino.flink.connector; +import com.google.common.collect.Maps; import java.util.Map; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; /** * PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino @@ -32,25 +34,82 @@ public interface PropertiesConverter { /** * Converts properties from application provided properties and Flink connector properties to - * Gravitino properties. + * Gravitino properties.This method processes the Flink configuration and transforms it into a + * format suitable for the Gravitino catalog. * - * @param flinkConf The configuration provided by Flink. - * @return properties for the Gravitino catalog. + * @param flinkConf The Flink configuration containing connector properties. This includes both + * Flink-specific properties and any user-provided properties. + * @return A map of properties converted for use in the Gravitino catalog. The returned map + * includes both directly transformed properties and bypass properties prefixed with {@link + * #FLINK_PROPERTY_PREFIX}. */ default Map toGravitinoCatalogProperties(Configuration flinkConf) { - return flinkConf.toMap(); + Map gravitinoProperties = Maps.newHashMap(); + for (Map.Entry entry : flinkConf.toMap().entrySet()) { + String gravitinoKey = transformPropertyToGravitinoCatalog(entry.getKey()); + if (gravitinoKey != null) { + gravitinoProperties.put(gravitinoKey, entry.getValue()); + } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { + gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); + } else { + gravitinoProperties.put(entry.getKey(), entry.getValue()); + } + } + return gravitinoProperties; } /** - * Converts properties from Gravitino properties to Flink connector properties. + * Converts properties from Gravitino catalog properties to Flink connector properties. This + * method processes the Gravitino properties and transforms them into a format suitable for the + * Flink connector. * - * @param gravitinoProperties The properties provided by Gravitino. - * @return properties for the Flink connector. + * @param gravitinoProperties The properties provided by the Gravitino catalog. This includes both + * Gravitino-specific properties and any bypass properties prefixed with {@link + * #FLINK_PROPERTY_PREFIX}. + * @return A map of properties converted for use in the Flink connector. The returned map includes + * both transformed properties and the Flink catalog type. */ default Map toFlinkCatalogProperties(Map gravitinoProperties) { - return gravitinoProperties; + Map allProperties = Maps.newHashMap(); + gravitinoProperties.forEach( + (key, value) -> { + String flinkConfigKey = key; + if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { + flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); + allProperties.put(flinkConfigKey, value); + } else { + String convertedKey = transformPropertyToFlinkCatalog(flinkConfigKey); + if (convertedKey != null) { + allProperties.put(convertedKey, value); + } + } + }); + allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType()); + return allProperties; } + /** + * Transforms a Flink configuration key to a corresponding Gravitino catalog property key. This + * method is used to map Flink-specific configuration keys to Gravitino catalog properties. + * + * @param configKey The Flink configuration key to be transformed. + * @return The corresponding Gravitino catalog property key, or {@code null} if no transformation + * is needed. + */ + String transformPropertyToGravitinoCatalog(String configKey); + + /** + * Transforms a specific configuration key from Gravitino catalog properties to Flink connector + * properties. This method is used to convert a property key that is specific to Gravitino into a + * format that can be understood by the Flink connector. + * + * @param configKey The configuration key from Gravitino catalog properties to be transformed. + * @return The transformed configuration key that is compatible with the Flink connector. + * @throws IllegalArgumentException If the provided configuration key cannot be transformed or is + * invalid. + */ + String transformPropertyToFlinkCatalog(String configKey); + /** * Converts properties from Flink connector schema properties to Gravitino schema properties. * @@ -90,4 +149,12 @@ default Map toFlinkTableProperties(Map gravitino default Map toGravitinoTableProperties(Map flinkProperties) { return flinkProperties; } + + /** + * Retrieves the Flink catalog type associated with this converter. This method is used to + * determine the type of Flink catalog that this converter is designed for. + * + * @return The Flink catalog type. + */ + String getFlinkCatalogType(); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java index 18095867498..1435a028d74 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -20,11 +20,8 @@ package org.apache.gravitino.flink.connector.hive; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import java.util.Map; import java.util.stream.Collectors; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.gravitino.catalog.hive.HiveConstants; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.hadoop.hive.conf.HiveConf; @@ -34,46 +31,19 @@ public class HivePropertiesConverter implements PropertiesConverter { private HivePropertiesConverter() {} public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter(); - private static final Map HIVE_CATALOG_CONFIG_TO_GRAVITINO = ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, HiveConstants.METASTORE_URIS); private static final Map GRAVITINO_CONFIG_TO_HIVE = ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname); @Override - public Map toGravitinoCatalogProperties(Configuration flinkConf) { - Map gravitinoProperties = Maps.newHashMap(); - - for (Map.Entry entry : flinkConf.toMap().entrySet()) { - String gravitinoKey = HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey()); - if (gravitinoKey != null) { - gravitinoProperties.put(gravitinoKey, entry.getValue()); - } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { - gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); - } else { - gravitinoProperties.put(entry.getKey(), entry.getValue()); - } - } - - return gravitinoProperties; + public String transformPropertyToGravitinoCatalog(String configKey) { + return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } @Override - public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map flinkCatalogProperties = Maps.newHashMap(); - flinkCatalogProperties.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoHiveCatalogFactoryOptions.IDENTIFIER); - - gravitinoProperties.forEach( - (key, value) -> { - String flinkConfigKey = key; - if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { - flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); - } - flinkCatalogProperties.put( - GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, flinkConfigKey), value); - }); - return flinkCatalogProperties; + public String transformPropertyToFlinkCatalog(String configKey) { + return GRAVITINO_CONFIG_TO_HIVE.getOrDefault(configKey, configKey); } @Override @@ -95,4 +65,9 @@ public Map toFlinkTableProperties(Map gravitinoP properties.put("connector", "hive"); return properties; } + + @Override + public String getFlinkCatalogType() { + return GravitinoHiveCatalogFactoryOptions.IDENTIFIER; + } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java index 7684d3eadbb..2e6b604ea38 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -19,11 +19,9 @@ package org.apache.gravitino.flink.connector.iceberg; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; @@ -38,38 +36,21 @@ private IcebergPropertiesConverter() {} IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE); @Override - public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Preconditions.checkArgument( - gravitinoProperties != null, "Iceberg Catalog properties should not be null."); + public String transformPropertyToGravitinoCatalog(String configKey) { + return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); + } - Map all = new HashMap<>(); - if (gravitinoProperties != null) { - gravitinoProperties.forEach( - (k, v) -> { - if (k.startsWith(FLINK_PROPERTY_PREFIX)) { - String newKey = k.substring(FLINK_PROPERTY_PREFIX.length()); - all.put(newKey, v); - } - }); - } - Map transformedProperties = - IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties); + @Override + public String transformPropertyToFlinkCatalog(String configKey) { - if (transformedProperties != null) { - all.putAll(transformedProperties); + String icebergConfigKey = configKey; + if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(configKey)) { + icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(configKey); } - all.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); - // Map "catalog-backend" to "catalog-type". - // TODO If catalog backend is CUSTOM, we need special compatibility logic. - GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach( - (key, value) -> { - if (all.containsKey(key)) { - String config = all.remove(key); - all.put(value, config); - } - }); - return all; + if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(configKey)) { + icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(configKey); + } + return icebergConfigKey; } @Override @@ -78,7 +59,7 @@ public Map toGravitinoTableProperties(Map proper } @Override - public Map toFlinkTableProperties(Map properties) { - return new HashMap<>(properties); + public String getFlinkCatalogType() { + return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER; } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 58613bee37d..99e402bcb88 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -19,15 +19,9 @@ package org.apache.gravitino.flink.connector.paimon; -import com.google.common.collect.Maps; -import java.util.HashMap; -import java.util.Map; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; -import org.apache.paimon.catalog.FileSystemCatalogFactory; public class PaimonPropertiesConverter implements PropertiesConverter { @@ -36,45 +30,23 @@ public class PaimonPropertiesConverter implements PropertiesConverter { private PaimonPropertiesConverter() {} @Override - public Map toGravitinoCatalogProperties(Configuration flinkConf) { - Map gravitinoProperties = Maps.newHashMap(); - Map flinkConfMap = flinkConf.toMap(); - for (Map.Entry entry : flinkConfMap.entrySet()) { - String gravitinoKey = - PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey()); - if (gravitinoKey != null) { - gravitinoProperties.put(gravitinoKey, entry.getValue()); - } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { - gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); - } else { - gravitinoProperties.put(entry.getKey(), entry.getValue()); - } + public String transformPropertyToGravitinoCatalog(String configKey) { + if (configKey.equalsIgnoreCase(PaimonConstants.METASTORE)) { + return PaimonConstants.CATALOG_BACKEND; } - gravitinoProperties.put( - PaimonConstants.CATALOG_BACKEND, - flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER)); - return gravitinoProperties; + return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } @Override - public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map all = new HashMap<>(); - gravitinoProperties.forEach( - (key, value) -> { - String flinkConfigKey = key; - if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { - flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); - } - all.put(flinkConfigKey, value); - }); - Map paimonCatalogProperties = - PaimonPropertiesUtils.toPaimonCatalogProperties(all); - paimonCatalogProperties.put( - PaimonConstants.METASTORE, - paimonCatalogProperties.getOrDefault( - PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER)); - paimonCatalogProperties.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); - return paimonCatalogProperties; + public String transformPropertyToFlinkCatalog(String configKey) { + if (configKey.equals(PaimonConstants.CATALOG_BACKEND)) { + return PaimonConstants.METASTORE; + } + return PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.get(configKey); + } + + @Override + public String getFlinkCatalogType() { + return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER; } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java index d6de522f396..8fa1208163f 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java @@ -40,7 +40,9 @@ void testCatalogPropertiesWithHiveBackend() { IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, "hive-warehouse", "key1", - "value1")); + "value1", + "flink.bypass.key2", + "value2")); Assertions.assertEquals( ImmutableMap.of( CommonCatalogOptions.CATALOG_TYPE.key(), @@ -50,7 +52,11 @@ void testCatalogPropertiesWithHiveBackend() { IcebergPropertiesConstants.ICEBERG_CATALOG_URI, "hive-uri", IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, - "hive-warehouse"), + "hive-warehouse", + "key1", + "value1", + "key2", + "value2"), properties); } @@ -65,7 +71,7 @@ void testCatalogPropertiesWithRestBackend() { "rest-uri", IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, "rest-warehouse", - "key1", + "flink.bypass.key1", "value1")); Assertions.assertEquals( ImmutableMap.of( @@ -76,7 +82,9 @@ void testCatalogPropertiesWithRestBackend() { IcebergPropertiesConstants.ICEBERG_CATALOG_URI, "rest-uri", IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, - "rest-warehouse"), + "rest-warehouse", + "key1", + "value1"), properties); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index 0834def90b7..027d4e41f96 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -159,7 +159,7 @@ public void testCreateGravitinoIcebergUsingSQL() { Assertions.assertEquals( GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, - properties.get(CommonCatalogOptions.CATALOG_TYPE.key())); + properties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key()))); // Get the created catalog. Optional catalog = tableEnv.getCatalog(catalogName); diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index a03b4a198e1..66458ba8e74 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -98,7 +98,7 @@ public void testCreateGravitinoPaimonCatalogUsingSQL() { "create catalog %s with (" + "'type'='gravitino-paimon', " + "'warehouse'='%s'," - + "'catalog.backend'='filesystem'" + + "'metastore'='filesystem'" + ")", catalogName, warehouse)); String[] catalogs = tableEnv.listCatalogs();