Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 21, 2025
1 parent 92d0edb commit 9668495
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,19 @@ default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf
* both transformed properties and the Flink catalog type.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> all = Maps.newHashMap();
Map<String, String> allProperties = Maps.newHashMap();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);

String convertedKey = transformPropertyToFlinkCatalog(flinkConfigKey);
if (convertedKey != null) {
allProperties.put(convertedKey, value);
}
});
Map<String, String> allProperties = transformPropertiesToFlinkCatalog(all);
allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType());
return allProperties;
}
Expand All @@ -95,16 +98,16 @@ default Map<String, String> toFlinkCatalogProperties(Map<String, String> graviti
String transformPropertyToGravitinoCatalog(String configKey);

/**
* Transforms a map of properties from Gravitino catalog properties to Flink connector properties.
* This method is used to convert properties that are specific to Gravitino into a format that can
* be understood by the Flink connector.
* Transforms a specific configuration key from Gravitino catalog properties to Flink connector
* properties. This method is used to convert a property key that is specific to Gravitino into a
* format that can be understood by the Flink connector.
*
* @param allProperties A map of properties to be transformed.
* @return A map of properties transformed into Flink connector properties.
* @param configKey The configuration key from Gravitino catalog properties to be transformed.
* @return The transformed configuration key that is compatible with the Flink connector.
* @throws IllegalArgumentException If the provided configuration key cannot be transformed or is
* invalid.
*/
default Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return allProperties;
}
String transformPropertyToFlinkCatalog(String configKey);

/**
* Converts properties from Flink connector schema properties to Gravitino schema properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.gravitino.flink.connector.hive;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.gravitino.catalog.hive.HiveConstants;
Expand All @@ -43,13 +42,8 @@ public String transformPropertyToGravitinoCatalog(String configKey) {
}

@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
all.put(GRAVITINO_CONFIG_TO_HIVE.getOrDefault(key, key), value);
});
return all;
public String transformPropertyToFlinkCatalog(String configKey) {
return GRAVITINO_CONFIG_TO_HIVE.getOrDefault(configKey, configKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.gravitino.flink.connector.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
Expand All @@ -42,20 +41,16 @@ public String transformPropertyToGravitinoCatalog(String configKey) {
}

@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
String icebergConfigKey = key;
if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(key);
}
if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(key)) {
icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(key);
}
all.put(icebergConfigKey, value);
});
return all;
public String transformPropertyToFlinkCatalog(String configKey) {

String icebergConfigKey = configKey;
if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(configKey)) {
icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(configKey);
}
if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(configKey)) {
icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(configKey);
}
return icebergConfigKey;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@

package org.apache.gravitino.flink.connector.paimon;

import java.util.Map;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.paimon.catalog.FileSystemCatalogFactory;

public class PaimonPropertiesConverter implements PropertiesConverter {

Expand All @@ -40,15 +38,11 @@ public String transformPropertyToGravitinoCatalog(String configKey) {
}

@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> converted = PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties);
if (allProperties.containsKey(PaimonConstants.CATALOG_BACKEND)) {
converted.put(
PaimonConstants.METASTORE,
allProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
public String transformPropertyToFlinkCatalog(String configKey) {
if (configKey.equals(PaimonConstants.CATALOG_BACKEND)) {
return PaimonConstants.METASTORE;
}
return converted;
return PaimonPropertiesUtils.GRAVITINO_CONFIG_TO_PAIMON.get(configKey);
}

@Override
Expand Down

0 comments on commit 9668495

Please sign in to comment.