Skip to content

Commit

Permalink
fixup! fixup! Add support for case-insensitive object names in Iceber…
Browse files Browse the repository at this point in the history
…g REST catalog - fix invalid cache state
  • Loading branch information
mayankvadariya committed Nov 4, 2024
1 parent 72f3ba7 commit 6a5a081
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 57 deletions.
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/object-storage/metastores.md
Original file line number Diff line number Diff line change
Expand Up @@ -503,8 +503,8 @@ following properties:
* - `iceberg.rest-catalog.case-insensitive-name-matching`
- Match namespace, table, and view names case insensitively. Defaults to `false`.
* - `iceberg.rest-catalog.case-insensitive-name-matching.cache-ttl`
- [Duration](prop-type-duration) for which case-insensitive namespace, table, and view
names are cached. Defaults to `1m`.
- [Duration](prop-type-duration) for which case-insensitive namespace, table,
and view names are cached. Defaults to `1m`.
:::

The following example shows a minimal catalog configuration using an Iceberg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -322,7 +323,7 @@ public Transaction newCreateTableTransaction(
String location,
Map<String, String> properties)
{
return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName), schema)
return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName, true), schema)
.withPartitionSpec(partitionSpec)
.withSortOrder(sortOrder)
.withLocation(location)
Expand All @@ -340,7 +341,7 @@ public Transaction newCreateOrReplaceTableTransaction(
String location,
Map<String, String> properties)
{
return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName), schema)
return restSessionCatalog.buildTable(convert(session), toRemoteTable(session, schemaTableName, true), schema)
.withPartitionSpec(partitionSpec)
.withSortOrder(sortOrder)
.withLocation(location)
Expand All @@ -358,7 +359,7 @@ public void registerTable(ConnectorSession session, SchemaTableName tableName, T
@Override
public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
{
if (!restSessionCatalog.dropTable(convert(session), toRemoteTable(session, tableName))) {
if (!restSessionCatalog.dropTable(convert(session), toRemoteTable(session, tableName, true))) {
throw new TableNotFoundException(tableName);
}
invalidateTableCache(tableName);
Expand All @@ -370,7 +371,7 @@ public void unregisterTable(ConnectorSession session, SchemaTableName tableName)
@Override
public void dropTable(ConnectorSession session, SchemaTableName schemaTableName)
{
if (!restSessionCatalog.purgeTable(convert(session), toRemoteTable(session, schemaTableName))) {
if (!restSessionCatalog.purgeTable(convert(session), toRemoteTable(session, schemaTableName, true))) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to drop table: %s", schemaTableName));
}
invalidateTableCache(schemaTableName);
Expand All @@ -391,7 +392,7 @@ public void dropCorruptedTable(ConnectorSession session, SchemaTableName schemaT
public void renameTable(ConnectorSession session, SchemaTableName from, SchemaTableName to)
{
try {
restSessionCatalog.renameTable(convert(session), toRemoteTable(session, from), toRemoteTable(session, to));
restSessionCatalog.renameTable(convert(session), toRemoteTable(session, from, true), toRemoteTable(session, to, true));
}
catch (RESTException e) {
throw new TrinoException(ICEBERG_CATALOG_ERROR, format("Failed to rename table %s to %s", from, to), e);
Expand Down Expand Up @@ -426,12 +427,12 @@ public Table loadTable(ConnectorSession session, SchemaTableName schemaTableName

private TableIdentifier toRemoteObject(ConnectorSession session, SchemaTableName schemaTableName)
{
TableIdentifier remoteTable = toLiveRemoteTable(session, schemaTableName);
TableIdentifier remoteTable = toRemoteTable(session, schemaTableName, false);
if (!remoteTable.name().equals(schemaTableName.getTableName())) {
return remoteTable;
}

TableIdentifier remoteView = toLiveRemoteView(session, schemaTableName);
TableIdentifier remoteView = toRemoteView(session, schemaTableName, false);
if (!remoteView.name().equals(schemaTableName.getTableName())) {
return remoteView;
}
Expand All @@ -450,7 +451,7 @@ public Map<SchemaTableName, List<ColumnMetadata>> tryGetColumnMetadata(Connector
@Override
public void updateTableComment(ConnectorSession session, SchemaTableName schemaTableName, Optional<String> comment)
{
Table icebergTable = restSessionCatalog.loadTable(convert(session), toRemoteTable(session, schemaTableName));
Table icebergTable = restSessionCatalog.loadTable(convert(session), toRemoteTable(session, schemaTableName, true));
if (comment.isEmpty()) {
icebergTable.updateProperties().remove(TABLE_COMMENT).commit();
}
Expand Down Expand Up @@ -494,7 +495,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
definition.getOwner().ifPresent(owner -> properties.put(ICEBERG_VIEW_RUN_AS_OWNER, owner));
definition.getComment().ifPresent(comment -> properties.put(COMMENT, comment));
Schema schema = IcebergUtil.schemaFromViewColumns(typeManager, definition.getColumns());
ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toRemoteView(session, schemaViewName));
ViewBuilder viewBuilder = restSessionCatalog.buildView(convert(session), toRemoteView(session, schemaViewName, true));
viewBuilder = viewBuilder.withSchema(schema)
.withQuery("trino", definition.getOriginalSql())
.withDefaultNamespace(toRemoteNamespace(session, toNamespace(schemaViewName.getSchemaName())))
Expand All @@ -513,7 +514,7 @@ public void createView(ConnectorSession session, SchemaTableName schemaViewName,
@Override
public void renameView(ConnectorSession session, SchemaTableName source, SchemaTableName target)
{
restSessionCatalog.renameView(convert(session), toRemoteView(session, source), toRemoteView(session, target));
restSessionCatalog.renameView(convert(session), toRemoteView(session, source, true), toRemoteView(session, target, true));
if (caseInsensitiveNameMatching) {
remoteTableMappingCache.get().invalidate(toIdentifier(source));
}
Expand All @@ -528,7 +529,7 @@ public void setViewPrincipal(ConnectorSession session, SchemaTableName schemaVie
@Override
public void dropView(ConnectorSession session, SchemaTableName schemaViewName)
{
restSessionCatalog.dropView(convert(session), toRemoteView(session, schemaViewName));
restSessionCatalog.dropView(convert(session), toRemoteView(session, schemaViewName, true));
if (caseInsensitiveNameMatching) {
remoteTableMappingCache.get().invalidate(toIdentifier(schemaViewName));
}
Expand Down Expand Up @@ -561,7 +562,7 @@ public Map<SchemaTableName, ConnectorViewDefinition> getViews(ConnectorSession s
@Override
public Optional<ConnectorViewDefinition> getView(ConnectorSession session, SchemaTableName viewName)
{
return getIcebergView(session, viewName, true).flatMap(view -> {
return getIcebergView(session, viewName, false).flatMap(view -> {
SQLViewRepresentation sqlView = view.sqlFor("trino");
if (!sqlView.dialect().equalsIgnoreCase("trino")) {
throw new TrinoException(ICEBERG_UNSUPPORTED_VIEW_DIALECT, "Cannot read unsupported dialect '%s' for view '%s'".formatted(sqlView.dialect(), viewName));
Expand All @@ -581,11 +582,10 @@ public Optional<ConnectorViewDefinition> getView(ConnectorSession session, Schem
});
}

private Optional<View> getIcebergView(ConnectorSession session, SchemaTableName viewName, boolean findLiveView)
private Optional<View> getIcebergView(ConnectorSession session, SchemaTableName viewName, boolean getCached)
{
try {
return Optional.of(
restSessionCatalog.loadView(convert(session), caseInsensitiveNameMatching && findLiveView ? toLiveRemoteView(session, viewName) : toRemoteView(session, viewName)));
return Optional.of(restSessionCatalog.loadView(convert(session), toRemoteView(session, viewName, getCached)));
}
catch (NoSuchViewException e) {
return Optional.empty();
Expand Down Expand Up @@ -657,7 +657,7 @@ public Optional<CatalogSchemaTableName> redirectTable(ConnectorSession session,
@Override
public void updateViewComment(ConnectorSession session, SchemaTableName schemaViewName, Optional<String> comment)
{
View view = getIcebergView(session, schemaViewName, false).orElseThrow(() -> new ViewNotFoundException(schemaViewName));
View view = getIcebergView(session, schemaViewName, true).orElseThrow(() -> new ViewNotFoundException(schemaViewName));
UpdateViewProperties updateViewProperties = view.updateProperties();
comment.ifPresentOrElse(
value -> updateViewProperties.set(COMMENT, value),
Expand All @@ -668,7 +668,7 @@ public void updateViewComment(ConnectorSession session, SchemaTableName schemaVi
@Override
public void updateViewColumnComment(ConnectorSession session, SchemaTableName schemaViewName, String columnName, Optional<String> comment)
{
View view = getIcebergView(session, schemaViewName, false)
View view = getIcebergView(session, schemaViewName, true)
.orElseThrow(() -> new ViewNotFoundException(schemaViewName));

ViewVersion current = view.currentVersion();
Expand Down Expand Up @@ -759,27 +759,10 @@ private List<Namespace> listNamespaces(ConnectorSession session, Optional<String
return ImmutableList.of(toNamespace(namespace.get()));
}

private TableIdentifier toRemoteTable(ConnectorSession session, SchemaTableName schemaTableName)
private TableIdentifier toRemoteTable(ConnectorSession session, SchemaTableName schemaTableName, boolean getCached)
{
TableIdentifier tableIdentifier = toIdentifier(schemaTableName);
if (caseInsensitiveNameMatching) {
try {
return remoteTableMappingCache.get().get(tableIdentifier, () -> findRemoteTable(session, tableIdentifier));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
return tableIdentifier;
}

private TableIdentifier toLiveRemoteTable(ConnectorSession session, SchemaTableName schemaTableName)
{
TableIdentifier tableIdentifier = toIdentifier(schemaTableName);
if (caseInsensitiveNameMatching) {
return findRemoteTable(session, tableIdentifier);
}
return tableIdentifier;
return toRemoteObject(tableIdentifier, () -> findRemoteTable(session, tableIdentifier), getCached);
}

private TableIdentifier findRemoteTable(ConnectorSession session, TableIdentifier tableIdentifier)
Expand All @@ -799,27 +782,10 @@ private TableIdentifier findRemoteTable(ConnectorSession session, TableIdentifie
return matchingTable == null ? TableIdentifier.of(remoteNamespace, tableIdentifier.name()) : matchingTable;
}

private TableIdentifier toRemoteView(ConnectorSession session, SchemaTableName schemaViewName)
{
TableIdentifier tableIdentifier = toIdentifier(schemaViewName);
if (caseInsensitiveNameMatching) {
try {
return remoteTableMappingCache.get().get(tableIdentifier, () -> findRemoteView(session, tableIdentifier));
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
return tableIdentifier;
}

private TableIdentifier toLiveRemoteView(ConnectorSession session, SchemaTableName schemaViewName)
private TableIdentifier toRemoteView(ConnectorSession session, SchemaTableName schemaViewName, boolean getCached)
{
TableIdentifier tableIdentifier = toIdentifier(schemaViewName);
if (caseInsensitiveNameMatching) {
return findRemoteView(session, tableIdentifier);
}
return tableIdentifier;
return toRemoteObject(tableIdentifier, () -> findRemoteView(session, tableIdentifier), getCached);
}

private TableIdentifier findRemoteView(ConnectorSession session, TableIdentifier tableIdentifier)
Expand All @@ -839,6 +805,27 @@ private TableIdentifier findRemoteView(ConnectorSession session, TableIdentifier
return matchingView == null ? TableIdentifier.of(remoteNamespace, tableIdentifier.name()) : matchingView;
}

private TableIdentifier toRemoteObject(TableIdentifier tableIdentifier, Callable<TableIdentifier> remoteObjectProvider, boolean getCached)
{
if (caseInsensitiveNameMatching) {
if (getCached) {
try {
return remoteTableMappingCache.get().get(tableIdentifier, remoteObjectProvider);
}
catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
try {
return remoteObjectProvider.call();
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
return tableIdentifier;
}

private Namespace toRemoteNamespace(ConnectorSession session, Namespace trinoNamespace)
{
if (caseInsensitiveNameMatching) {
Expand Down

0 comments on commit 6a5a081

Please sign in to comment.