Skip to content

Commit

Permalink
Extract flink converter common logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Jan 17, 2025
1 parent 6fb45d6 commit 97bc2a7
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class IcebergPropertiesUtils {
// will only need to set the configuration 'catalog-backend' in Gravitino and Gravitino will
// change it to `catalogType` automatically and pass it to Iceberg.
public static final Map<String, String> GRAVITINO_CONFIG_TO_ICEBERG;
public static final Map<String, String> ICEBERG_CATALOG_CONFIG_TO_GRAVITINO;

static {
Map<String, String> map = new HashMap();
Expand Down Expand Up @@ -65,6 +66,14 @@ public class IcebergPropertiesUtils {
AzureProperties.GRAVITINO_AZURE_STORAGE_ACCOUNT_KEY,
IcebergConstants.ICEBERG_ADLS_STORAGE_ACCOUNT_KEY);
GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);

Map<String, String> icebergCatalogConfigToGravitino = new HashMap<>();
map.forEach(
(key, value) -> {
icebergCatalogConfigToGravitino.put(value, key);
});
ICEBERG_CATALOG_CONFIG_TO_GRAVITINO =
Collections.unmodifiableMap(icebergCatalogConfigToGravitino);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

package org.apache.gravitino.flink.connector;

import com.google.common.collect.Maps;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;

/**
* PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino
Expand All @@ -38,7 +40,18 @@ public interface PropertiesConverter {
* @return properties for the Gravitino catalog.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
return flinkConf.toMap();
Map<String, String> gravitinoProperties = Maps.newHashMap();
for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = transformPropertiesToGravitinoCatalog(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}
return gravitinoProperties;
}

/**
Expand All @@ -48,7 +61,24 @@ default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf
* @return properties for the Flink connector.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
Map<String, String> all = Maps.newHashMap();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);
});
Map<String, String> allProperties = transformPropertiesToFlinkCatalog(all);
allProperties.put(CommonCatalogOptions.CATALOG_TYPE.key(), getFlinkCatalogType());
return allProperties;
}

String transformPropertiesToGravitinoCatalog(String configKey);

default Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return allProperties;
}

/**
Expand Down Expand Up @@ -90,4 +120,6 @@ default Map<String, String> toFlinkTableProperties(Map<String, String> gravitino
default Map<String, String> toGravitinoTableProperties(Map<String, String> flinkProperties) {
return flinkProperties;
}

String getFlinkCatalogType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -34,46 +32,24 @@ public class HivePropertiesConverter implements PropertiesConverter {
private HivePropertiesConverter() {}

public static final HivePropertiesConverter INSTANCE = new HivePropertiesConverter();

private static final Map<String, String> HIVE_CATALOG_CONFIG_TO_GRAVITINO =
ImmutableMap.of(HiveConf.ConfVars.METASTOREURIS.varname, HiveConstants.METASTORE_URIS);
private static final Map<String, String> GRAVITINO_CONFIG_TO_HIVE =
ImmutableMap.of(HiveConstants.METASTORE_URIS, HiveConf.ConfVars.METASTOREURIS.varname);

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();

for (Map.Entry<String, String> entry : flinkConf.toMap().entrySet()) {
String gravitinoKey = HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}

return gravitinoProperties;
public String transformPropertiesToGravitinoCatalog(String configKey) {
return HIVE_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> flinkCatalogProperties = Maps.newHashMap();
flinkCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoHiveCatalogFactoryOptions.IDENTIFIER);

gravitinoProperties.forEach(
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
flinkCatalogProperties.put(
GRAVITINO_CONFIG_TO_HIVE.getOrDefault(flinkConfigKey, flinkConfigKey), value);
all.put(GRAVITINO_CONFIG_TO_HIVE.getOrDefault(key, key), value);
});
return flinkCatalogProperties;
return all;
}

@Override
Expand All @@ -95,4 +71,9 @@ public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoP
properties.put("connector", "hive");
return properties;
}

@Override
public String getFlinkCatalogType() {
return GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@

package org.apache.gravitino.flink.connector.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -38,36 +37,23 @@ private IcebergPropertiesConverter() {}
IcebergConstants.CATALOG_BACKEND, IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE);

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Preconditions.checkArgument(
gravitinoProperties != null, "Iceberg Catalog properties should not be null.");

Map<String, String> all = new HashMap<>();
if (gravitinoProperties != null) {
gravitinoProperties.forEach(
(k, v) -> {
if (k.startsWith(FLINK_PROPERTY_PREFIX)) {
String newKey = k.substring(FLINK_PROPERTY_PREFIX.length());
all.put(newKey, v);
}
});
}
Map<String, String> transformedProperties =
IcebergPropertiesUtils.toIcebergCatalogProperties(gravitinoProperties);
public String transformPropertiesToGravitinoCatalog(String configKey) {
return IcebergPropertiesUtils.ICEBERG_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

if (transformedProperties != null) {
all.putAll(transformedProperties);
}
all.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoIcebergCatalogFactoryOptions.IDENTIFIER);
// Map "catalog-backend" to "catalog-type".
// TODO If catalog backend is CUSTOM, we need special compatibility logic.
GRAVITINO_CONFIG_TO_FLINK_ICEBERG.forEach(
@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
Map<String, String> all = Maps.newHashMap();
allProperties.forEach(
(key, value) -> {
if (all.containsKey(key)) {
String config = all.remove(key);
all.put(value, config);
String icebergConfigKey = key;
if (IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.containsKey(key)) {
icebergConfigKey = IcebergPropertiesUtils.GRAVITINO_CONFIG_TO_ICEBERG.get(key);
}
if (GRAVITINO_CONFIG_TO_FLINK_ICEBERG.containsKey(key)) {
icebergConfigKey = GRAVITINO_CONFIG_TO_FLINK_ICEBERG.get(key);
}
all.put(icebergConfigKey, value);
});
return all;
}
Expand All @@ -78,7 +64,7 @@ public Map<String, String> toGravitinoTableProperties(Map<String, String> proper
}

@Override
public Map<String, String> toFlinkTableProperties(Map<String, String> properties) {
return new HashMap<>(properties);
public String getFlinkCatalogType() {
return GravitinoIcebergCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,8 @@

package org.apache.gravitino.flink.connector.paimon;

import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonConstants;
import org.apache.gravitino.catalog.lakehouse.paimon.PaimonPropertiesUtils;
import org.apache.gravitino.flink.connector.PropertiesConverter;
Expand All @@ -37,44 +34,38 @@ private PaimonPropertiesConverter() {}

@Override
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
Map<String, String> gravitinoProperties = Maps.newHashMap();
Map<String, String> gravitinoProperties =
PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf);
Map<String, String> flinkConfMap = flinkConf.toMap();
for (Map.Entry<String, String> entry : flinkConfMap.entrySet()) {
String gravitinoKey =
PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(entry.getKey());
if (gravitinoKey != null) {
gravitinoProperties.put(gravitinoKey, entry.getValue());
} else if (!entry.getKey().startsWith(FLINK_PROPERTY_PREFIX)) {
gravitinoProperties.put(FLINK_PROPERTY_PREFIX + entry.getKey(), entry.getValue());
} else {
gravitinoProperties.put(entry.getKey(), entry.getValue());
}
}
gravitinoProperties.put(
PaimonConstants.CATALOG_BACKEND,
flinkConfMap.getOrDefault(PaimonConstants.METASTORE, FileSystemCatalogFactory.IDENTIFIER));
return gravitinoProperties;
}

@Override
public String transformPropertiesToGravitinoCatalog(String configKey) {
return PaimonPropertiesUtils.PAIMON_CATALOG_CONFIG_TO_GRAVITINO.get(configKey);
}

@Override
public Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
Map<String, String> all = new HashMap<>();
gravitinoProperties.forEach(
(key, value) -> {
String flinkConfigKey = key;
if (key.startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX)) {
flinkConfigKey = key.substring(PropertiesConverter.FLINK_PROPERTY_PREFIX.length());
}
all.put(flinkConfigKey, value);
});
Map<String, String> paimonCatalogProperties =
PaimonPropertiesUtils.toPaimonCatalogProperties(all);
PropertiesConverter.super.toFlinkCatalogProperties(gravitinoProperties);
paimonCatalogProperties.put(
PaimonConstants.METASTORE,
paimonCatalogProperties.getOrDefault(
PaimonConstants.CATALOG_BACKEND, FileSystemCatalogFactory.IDENTIFIER));
paimonCatalogProperties.put(
CommonCatalogOptions.CATALOG_TYPE.key(), GravitinoPaimonCatalogFactoryOptions.IDENTIFIER);
return paimonCatalogProperties;
}

@Override
public Map<String, String> transformPropertiesToFlinkCatalog(Map<String, String> allProperties) {
return PaimonPropertiesUtils.toPaimonCatalogProperties(allProperties);
}

@Override
public String getFlinkCatalogType() {
return GravitinoPaimonCatalogFactoryOptions.IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void testCatalogPropertiesWithHiveBackend() {
"hive-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"hive-warehouse",
"key1",
"flink.bypass.key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
Expand All @@ -50,7 +50,9 @@ void testCatalogPropertiesWithHiveBackend() {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"hive-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"hive-warehouse"),
"hive-warehouse",
"key1",
"value1"),
properties);
}

Expand All @@ -65,7 +67,7 @@ void testCatalogPropertiesWithRestBackend() {
"rest-uri",
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse",
"key1",
"flink.bypass.key1",
"value1"));
Assertions.assertEquals(
ImmutableMap.of(
Expand All @@ -76,7 +78,9 @@ void testCatalogPropertiesWithRestBackend() {
IcebergPropertiesConstants.ICEBERG_CATALOG_URI,
"rest-uri",
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
"rest-warehouse"),
"rest-warehouse",
"key1",
"value1"),
properties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void testCreateGravitinoIcebergUsingSQL() {

Assertions.assertEquals(
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
properties.get(CommonCatalogOptions.CATALOG_TYPE.key()));
properties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));

// Get the created catalog.
Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName);
Expand Down

0 comments on commit 97bc2a7

Please sign in to comment.