From 03c0d521e5b6c79c8361df1ea5463f184c9d81d4 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Thu, 16 Jan 2025 19:48:25 +0800 Subject: [PATCH] Extract flink converter common logic. --- .../iceberg/IcebergPropertiesUtils.java | 9 ++++ .../flink/connector/PropertiesConverter.java | 36 +++++++++++++- .../hive/HivePropertiesConverter.java | 43 +++++------------ .../iceberg/IcebergPropertiesConverter.java | 48 +++++++------------ .../paimon/PaimonPropertiesConverter.java | 45 +++++++---------- .../TestIcebergPropertiesConverter.java | 12 +++-- .../test/iceberg/FlinkIcebergCatalogIT.java | 2 +- 7 files changed, 99 insertions(+), 96 deletions(-) diff --git a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java index df1340c947e..92c5d18a129 100644 --- a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java +++ b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java @@ -33,6 +33,7 @@ public class IcebergPropertiesUtils { // will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will // change it to `catalogType` automatically and pass it to Iceberg. public static final Map GRAVITINO_CONFIG_TO_ICEBERG; + public static final Map ICEBERG_CATALOG_CONFIG_TO_GRAVITINO; static { Map map = new HashMap(); @@ -65,6 +66,14 @@ public class IcebergPropertiesUtils { AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY, IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY); GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map); + + Map icebergCatalogConfigToGravitino = new HashMap<>(); + map.forEach( + (key, value) -> { + icebergCatalogConfigToGravitino.put(value, key); + }); + ICEBERG_CATALOG_CONFIG_TO_GRAVITINO = + Collections.unmodifiableMap(icebergCatalogConfigToGravitino); } /** diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java index c9fbb8a491c..efc6411b951 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java @@ -19,8 +19,10 @@ package org.apache.gravitino.flink.connector; +import com.google.common.collect.Maps; import java.util.Map; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CommonCatalogOptions; /** * PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino @@ -38,7 +40,18 @@ public interface PropertiesConverter { * @return properties for the Gravitino catalog. */ default Map toGravitinoCatalogProperties(Configuration flinkConf) { - return flinkConf.toMap(); + Map gravitinoProperties = Maps.newHashMap(); + for (Map.Entry entry : flinkConf.toMap().entrySet()) { + String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey()); + if (gravitinoKey != null) { + gravitinoProperties.put(gravitinoKey, entry.getValue()); + } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { + gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); + } else { + gravitinoProperties.put(entry.getKey(), entry.getValue()); + } + } + return gravitinoProperties; } /** @@ -48,7 +61,24 @@ default Map toGravitinoCatalogProperties(Configuration flinkConf * @return properties for the Flink connector. */ default Map toFlinkCatalogProperties(Map gravitinoProperties) { - return gravitinoProperties; + Map all = Maps.newHashMap(); + gravitinoProperties.forEach( + (key, value) -> { + String flinkConfigKey = key; + if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { + flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); + } + all.put(flinkConfigKey, value); + }); + Map allProperties = transformPropertiesToFlinkCatalog(all); + allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType()); + return allProperties; + } + + String transformPropertiesToGravitinoCatalog(String configKey); + + default Map transformPropertiesToFlinkCatalog(Map allProperties) { + return allProperties; } /** @@ -90,4 +120,6 @@ default Map toFlinkTableProperties(Map gravitino default Map toGravitinoTableProperties(Map flinkProperties) { return flinkProperties; } + + String getFlinkCatalogType(); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java index 18095867498..fbd17002df0 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -23,8 +23,6 @@ import com.google.common.collect.Maps; import java.util.Map; import java.util.stream.Collectors; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.gravitino.catalog.hive.HiveConstants; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.hadoop.hive.conf.HiveConf; @@ -34,46 +32,24 @@ public class HivePropertiesConverter implements PropertiesConverter { private HivePropertiesConverter() {} public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter(); - private static final Map HIVE_CATALOG_CONFIG_TO_GRAVITINO = ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, HiveConstants.METASTORE_URIS); private static final Map GRAVITINO_CONFIG_TO_HIVE = ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname); @Override - public Map toGravitinoCatalogProperties(Configuration flinkConf) { - Map gravitinoProperties = Maps.newHashMap(); - - for (Map.Entry entry : flinkConf.toMap().entrySet()) { - String gravitinoKey = HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey()); - if (gravitinoKey != null) { - gravitinoProperties.put(gravitinoKey, entry.getValue()); - } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { - gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); - } else { - gravitinoProperties.put(entry.getKey(), entry.getValue()); - } - } - - return gravitinoProperties; + public String transformPropertiesToGravitinoCatalog(String configKey) { + return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); } @Override - public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map flinkCatalogProperties = Maps.newHashMap(); - flinkCatalogProperties.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoHiveCatalogFactoryOptions.IDENTIFIER); - - gravitinoProperties.forEach( + public Map transformPropertiesToFlinkCatalog(Map allProperties) { + Map all = Maps.newHashMap(); + allProperties.forEach( (key, value) -> { - String flinkConfigKey = key; - if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { - flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); - } - flinkCatalogProperties.put( - GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, flinkConfigKey), value); + all.put(GRAVITINO_CONFIG_TO_HIVE.getOrDefault(key, key), value); }); - return flinkCatalogProperties; + return all; } @Override @@ -95,4 +71,9 @@ public Map toFlinkTableProperties(Map gravitinoP properties.put("connector", "hive"); return properties; } + + @Override + public String getFlinkCatalogType() { + return GravitinoHiveCatalogFactoryOptions.IDENTIFIER; + } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java index 7684d3eadbb..5b0acd2f78b 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -19,11 +19,10 @@ package org.apache.gravitino.flink.connector.iceberg; -import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; @@ -38,36 +37,23 @@ private IcebergPropertiesConverter() {} IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE); @Override - public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Preconditions.checkArgument( - gravitinoProperties != null, "Iceberg Catalog properties should not be null."); - - Map all = new HashMap<>(); - if (gravitinoProperties != null) { - gravitinoProperties.forEach( - (k, v) -> { - if (k.startsWith(FLINK_PROPERTY_PREFIX)) { - String newKey = k.substring(FLINK_PROPERTY_PREFIX.length()); - all.put(newKey, v); - } - }); - } - Map transformedProperties = - IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties); + public String transformPropertiesToGravitinoCatalog(String configKey) { + return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); + } - if (transformedProperties != null) { - all.putAll(transformedProperties); - } - all.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER); - // Map "catalog-backend" to "catalog-type". - // TODO If catalog backend is CUSTOM, we need special compatibility logic. - GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach( + @Override + public Map transformPropertiesToFlinkCatalog(Map allProperties) { + Map all = Maps.newHashMap(); + allProperties.forEach( (key, value) -> { - if (all.containsKey(key)) { - String config = all.remove(key); - all.put(value, config); + String icebergConfigKey = key; + if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) { + icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(key); } + if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(key)) { + icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(key); + } + all.put(icebergConfigKey, value); }); return all; } @@ -78,7 +64,7 @@ public Map toGravitinoTableProperties(Map proper } @Override - public Map toFlinkTableProperties(Map properties) { - return new HashMap<>(properties); + public String getFlinkCatalogType() { + return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER; } } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 58613bee37d..0551c8720d1 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -19,11 +19,8 @@ package org.apache.gravitino.flink.connector.paimon; -import com.google.common.collect.Maps; -import java.util.HashMap; import java.util.Map; import org.apache.flink.configuration.Configuration; -import org.apache.flink.table.catalog.CommonCatalogOptions; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; @@ -37,44 +34,38 @@ private PaimonPropertiesConverter() {} @Override public Map toGravitinoCatalogProperties(Configuration flinkConf) { - Map gravitinoProperties = Maps.newHashMap(); + Map gravitinoProperties = + PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf); Map flinkConfMap = flinkConf.toMap(); - for (Map.Entry entry : flinkConfMap.entrySet()) { - String gravitinoKey = - PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey()); - if (gravitinoKey != null) { - gravitinoProperties.put(gravitinoKey, entry.getValue()); - } else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) { - gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue()); - } else { - gravitinoProperties.put(entry.getKey(), entry.getValue()); - } - } gravitinoProperties.put( PaimonConstants.CATALOG_BACKEND, flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER)); return gravitinoProperties; } + @Override + public String transformPropertiesToGravitinoCatalog(String configKey) { + return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey); + } + @Override public Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map all = new HashMap<>(); - gravitinoProperties.forEach( - (key, value) -> { - String flinkConfigKey = key; - if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { - flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); - } - all.put(flinkConfigKey, value); - }); Map paimonCatalogProperties = - PaimonPropertiesUtils.toPaimonCatalogProperties(all); + PropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties); paimonCatalogProperties.put( PaimonConstants.METASTORE, paimonCatalogProperties.getOrDefault( PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER)); - paimonCatalogProperties.put( - CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER); return paimonCatalogProperties; } + + @Override + public Map transformPropertiesToFlinkCatalog(Map allProperties) { + return PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties); + } + + @Override + public String getFlinkCatalogType() { + return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER; + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java index d6de522f396..a11207b8a11 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/iceberg/TestIcebergPropertiesConverter.java @@ -39,7 +39,7 @@ void testCatalogPropertiesWithHiveBackend() { "hive-uri", IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, "hive-warehouse", - "key1", + "flink.bypass.key1", "value1")); Assertions.assertEquals( ImmutableMap.of( @@ -50,7 +50,9 @@ void testCatalogPropertiesWithHiveBackend() { IcebergPropertiesConstants.ICEBERG_CATALOG_URI, "hive-uri", IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, - "hive-warehouse"), + "hive-warehouse", + "key1", + "value1"), properties); } @@ -65,7 +67,7 @@ void testCatalogPropertiesWithRestBackend() { "rest-uri", IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, "rest-warehouse", - "key1", + "flink.bypass.key1", "value1")); Assertions.assertEquals( ImmutableMap.of( @@ -76,7 +78,9 @@ void testCatalogPropertiesWithRestBackend() { IcebergPropertiesConstants.ICEBERG_CATALOG_URI, "rest-uri", IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE, - "rest-warehouse"), + "rest-warehouse", + "key1", + "value1"), properties); } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index 0834def90b7..027d4e41f96 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -159,7 +159,7 @@ public void testCreateGravitinoIcebergUsingSQL() { Assertions.assertEquals( GravitinoIcebergCatalogFactoryOptions.IDENTIFIER, - properties.get(CommonCatalogOptions.CATALOG_TYPE.key())); + properties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key()))); // Get the created catalog. Optional catalog = tableEnv.getCatalog(catalogName);