From 96684952744508cc119a3c97ff8edc9e5b45535d Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Tue, 21 Jan 2025 18:23:18 +0800 Subject: [PATCH] fixed --- .../flink/connector/PropertiesConverter.java | 25 +++++++++++-------- .../hive/HivePropertiesConverter.java | 10 ++------ .../iceberg/IcebergPropertiesConverter.java | 25 ++++++++----------- .../paimon/PaimonPropertiesConverter.java | 14 +++-------- 4 files changed, 30 insertions(+), 44 deletions(-) diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java index b47c9dbaaa0..c86e2806f21 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java @@ -70,16 +70,19 @@ default Map toGravitinoCatalogProperties(Configuration flinkConf * both transformed properties and the Flink catalog type. */ default Map toFlinkCatalogProperties(Map gravitinoProperties) { - Map all = Maps.newHashMap(); + Map allProperties = Maps.newHashMap(); gravitinoProperties.forEach( (key, value) -> { String flinkConfigKey = key; if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) { flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length()); } - all.put(flinkConfigKey, value); + + String convertedKey = transformPropertyToFlinkCatalog(flinkConfigKey); + if (convertedKey != null) { + allProperties.put(convertedKey, value); + } }); - Map allProperties = transformPropertiesToFlinkCatalog(all); allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType()); return allProperties; } @@ -95,16 +98,16 @@ default Map toFlinkCatalogProperties(Map graviti String transformPropertyToGravitinoCatalog(String configKey); /** - * Transforms a map of properties from Gravitino catalog properties to Flink connector properties. - * This method is used to convert properties that are specific to Gravitino into a format that can - * be understood by the Flink connector. + * Transforms a specific configuration key from Gravitino catalog properties to Flink connector + * properties. This method is used to convert a property key that is specific to Gravitino into a + * format that can be understood by the Flink connector. * - * @param allProperties A map of properties to be transformed. - * @return A map of properties transformed into Flink connector properties. + * @param configKey The configuration key from Gravitino catalog properties to be transformed. + * @return The transformed configuration key that is compatible with the Flink connector. + * @throws IllegalArgumentException If the provided configuration key cannot be transformed or is + * invalid. */ - default Map transformPropertiesToFlinkCatalog(Map allProperties) { - return allProperties; - } + String transformPropertyToFlinkCatalog(String configKey); /** * Converts properties from Flink connector schema properties to Gravitino schema properties. diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java index 34eda6d81ed..1435a028d74 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -20,7 +20,6 @@ package org.apache.gravitino.flink.connector.hive; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import java.util.Map; import java.util.stream.Collectors; import org.apache.gravitino.catalog.hive.HiveConstants; @@ -43,13 +42,8 @@ public String transformPropertyToGravitinoCatalog(String configKey) { } @Override - public Map transformPropertiesToFlinkCatalog(Map allProperties) { - Map all = Maps.newHashMap(); - allProperties.forEach( - (key, value) -> { - all.put(GRAVITINO_CONFIG_TO_HIVE.getOrDefault(key, key), value); - }); - return all; + public String transformPropertyToFlinkCatalog(String configKey) { + return GRAVITINO_CONFIG_TO_HIVE.getOrDefault(configKey, configKey); } @Override diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java index d2a237f0ee7..2e6b604ea38 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/IcebergPropertiesConverter.java @@ -20,7 +20,6 @@ package org.apache.gravitino.flink.connector.iceberg; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; import java.util.HashMap; import java.util.Map; import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants; @@ -42,20 +41,16 @@ public String transformPropertyToGravitinoCatalog(String configKey) { } @Override - public Map transformPropertiesToFlinkCatalog(Map allProperties) { - Map all = Maps.newHashMap(); - allProperties.forEach( - (key, value) -> { - String icebergConfigKey = key; - if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) { - icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(key); - } - if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(key)) { - icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(key); - } - all.put(icebergConfigKey, value); - }); - return all; + public String transformPropertyToFlinkCatalog(String configKey) { + + String icebergConfigKey = configKey; + if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(configKey)) { + icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(configKey); + } + if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(configKey)) { + icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(configKey); + } + return icebergConfigKey; } @Override diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java index 2f4defd72cd..99e402bcb88 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/PaimonPropertiesConverter.java @@ -19,11 +19,9 @@ package org.apache.gravitino.flink.connector.paimon; -import java.util.Map; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants; import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils; import org.apache.gravitino.flink.connector.PropertiesConverter; -import org.apache.paimon.catalog.FileSystemCatalogFactory; public class PaimonPropertiesConverter implements PropertiesConverter { @@ -40,15 +38,11 @@ public String transformPropertyToGravitinoCatalog(String configKey) { } @Override - public Map transformPropertiesToFlinkCatalog(Map allProperties) { - Map converted = PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties); - if (allProperties.containsKey(PaimonConstants.CATALOG_BACKEND)) { - converted.put( - PaimonConstants.METASTORE, - allProperties.getOrDefault( - PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER)); + public String transformPropertyToFlinkCatalog(String configKey) { + if (configKey.equals(PaimonConstants.CATALOG_BACKEND)) { + return PaimonConstants.METASTORE; } - return converted; + return PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.get(configKey); } @Override