Skip to content

Commit

Permalink
IGNITE-24247 Indexes created with a table must be in the AVAILABLE st…
Browse files Browse the repository at this point in the history
…ate (#5113)
  • Loading branch information
xtern authored Jan 28, 2025
1 parent 2725790 commit 292cef1
Show file tree
Hide file tree
Showing 51 changed files with 465 additions and 240 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

import java.util.ArrayList;
import java.util.List;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;

/**
* Update producer that is used to group updates
* when executing a batch of catalog commands.
*/
class BulkUpdateProducer implements UpdateProducer {
private final List<? extends UpdateProducer> commands;

BulkUpdateProducer(List<? extends UpdateProducer> producers) {
this.commands = producers;
}

@Override
public List<UpdateEntry> get(UpdateContext updateContext) {
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();

for (UpdateProducer producer : commands) {
List<UpdateEntry> entries = producer.get(updateContext);

for (UpdateEntry entry : entries) {
updateContext.updateCatalog(
catalog -> entry.applyUpdate(catalog, CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN)
);
}

bulkUpdateEntries.addAll(entries);
}

return bulkUpdateEntries;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ public CompletableFuture<Integer> execute(List<CatalogCommand> commands) {
return nullCompletedFuture();
}

if (commands.size() == 1) {
return execute(commands.get(0));
}

return saveUpdateAndWaitForActivation(new BulkUpdateProducer(List.copyOf(commands)));
}

Expand Down Expand Up @@ -282,7 +286,7 @@ private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
CreateSchemaCommand.builder().name(SYSTEM_SCHEMA_NAME).build()
);

List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(emptyCatalog);
List<UpdateEntry> entries = new BulkUpdateProducer(initCommands).get(new UpdateContext(emptyCatalog));

return updateLog.append(new VersionedUpdate(emptyCatalog.version() + 1, 0L, entries))
.handle((result, error) -> {
Expand Down Expand Up @@ -393,7 +397,7 @@ private CompletableFuture<Integer> saveUpdate(UpdateProducer updateProducer, int

List<UpdateEntry> updates;
try {
updates = updateProducer.get(catalog);
updates = updateProducer.get(new UpdateContext(catalog));
} catch (CatalogValidationException ex) {
return failedFuture(new CatalogVersionAwareValidationException(ex, catalog.version()));
} catch (Exception ex) {
Expand Down Expand Up @@ -511,28 +515,4 @@ private static Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update,
);
}

private static class BulkUpdateProducer implements UpdateProducer {
private final List<? extends UpdateProducer> commands;

BulkUpdateProducer(List<? extends UpdateProducer> producers) {
this.commands = producers;
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
List<UpdateEntry> bulkUpdateEntries = new ArrayList<>();

for (UpdateProducer producer : commands) {
List<UpdateEntry> entries = producer.get(catalog);

for (UpdateEntry entry : entries) {
catalog = entry.applyUpdate(catalog, INITIAL_CAUSALITY_TOKEN);
}

bulkUpdateEntries.addAll(entries);
}

return bulkUpdateEntries;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

import java.util.function.Function;

/**
* Context contains two instances of the catalog: the base one and the updated one.
*
* <p>During batch command processing, changes are generated and applied to
* the updated instance. The base catalog instance can be used by a command
* to determine whether certain changes have been made to the catalog during
* processing of the current batch of commands.
*
* @see BulkUpdateProducer
*/
public class UpdateContext {
/** The base catalog descriptor. */
private final Catalog baseCatalog;

/** The updatable catalog descriptor. */
private Catalog updatableCatalog;

/** Constructor. */
public UpdateContext(Catalog catalog) {
this.baseCatalog = catalog;
this.updatableCatalog = catalog;
}

/**
* Returns the catalog descriptor on the basis of which the command generates the list of updates.
*
* <p>In the case of batch processing, this catalog instance contains the updates made by previous
* commands in the batch.
*
* @return Catalog descriptor.
*/
public Catalog catalog() {
return updatableCatalog;
}

/** Returns base catalog as it was before any updates from the batch were applied. */
public Catalog baseCatalog() {
return baseCatalog;
}

/** Applies specified action to the catalog. */
public void updateCatalog(Function<Catalog, Catalog> updater) {
updatableCatalog = updater.apply(updatableCatalog);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public interface UpdateProducer {
* Returns list of {@link UpdateEntry entries} to be applied to catalog to bring it to the state
* described in the command.
*
* @param catalog Catalog on the basis of which to generate the list of updates.
* @param updateContext Context containing the catalog on the basis of which to generate the list of updates.
* @return List of updates. Should be empty if no updates actually required.
*/
List<UpdateEntry> get(Catalog catalog);
List<UpdateEntry> get(UpdateContext updateContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Set;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
Expand All @@ -50,8 +51,6 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {

protected final List<String> columns;

protected final boolean isCreatedWithTable;

private final boolean ifNotExists;

AbstractCreateIndexCommand(
Expand All @@ -60,8 +59,7 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
boolean ifNotExists,
String tableName,
boolean unique,
List<String> columns,
boolean isCreatedWithTable
List<String> columns
) throws CatalogValidationException {
super(schemaName, indexName);

Expand All @@ -71,17 +69,18 @@ public abstract class AbstractCreateIndexCommand extends AbstractIndexCommand {
this.tableName = tableName;
this.unique = unique;
this.columns = copyOrNull(columns);
this.isCreatedWithTable = isCreatedWithTable;
}

public boolean ifNotExists() {
return ifNotExists;
}

protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status);
protected abstract CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status,
boolean createdWithTable);

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext context) {
Catalog catalog = context.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

ensureNoTableIndexOrSysViewExistsWithGivenName(schema, indexName);
Expand All @@ -101,10 +100,14 @@ public List<UpdateEntry> get(Catalog catalog) {
throw new CatalogValidationException("Unique index must include all colocation columns");
}

CatalogIndexStatus status = isCreatedWithTable ? CatalogIndexStatus.AVAILABLE : CatalogIndexStatus.REGISTERED;
boolean indexCreatedWithTable = context.baseCatalog().table(table.id()) == null;

CatalogIndexStatus status = indexCreatedWithTable
? CatalogIndexStatus.AVAILABLE
: CatalogIndexStatus.REGISTERED;

return List.of(
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), status)),
new NewIndexEntry(createDescriptor(catalog.objectIdGenState(), table.id(), status, indexCreatedWithTable)),
new ObjectIdGenUpdateEntry(1)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,4 @@ interface AbstractCreateIndexCommandBuilder<T extends AbstractIndexCommandBuilde

/** List of the columns to index. There must be at least one column. */
T columns(List<String> columns);

/** Flag indicating that this index has been created at the same time as its table. */
T isCreatedWithTable(boolean isCreatedWithTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -71,7 +72,8 @@ private AlterTableAddColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
Expand Down Expand Up @@ -88,7 +89,8 @@ private AlterTableAlterColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
Expand Down Expand Up @@ -75,7 +76,8 @@ private AlterTableDropColumnCommand(
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);

CatalogTableDescriptor table = tableOrThrow(schema, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogStorageProfilesDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.AlterZoneEntry;
Expand Down Expand Up @@ -104,7 +105,8 @@ public boolean ifExists() {
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor descriptor = fromParamsAndPreviousValue(zone);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.UpdateContext;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.SetDefaultZoneEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;
Expand Down Expand Up @@ -53,7 +54,8 @@ private AlterZoneSetDefaultCommand(String zoneName, boolean ifExists) throws Cat
}

@Override
public List<UpdateEntry> get(Catalog catalog) {
public List<UpdateEntry> get(UpdateContext updateContext) {
Catalog catalog = updateContext.catalog();
CatalogZoneDescriptor zone = zoneOrThrow(catalog, zoneName);

CatalogZoneDescriptor defaultZone = catalog.defaultZone();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,14 @@ private CreateHashIndexCommand(
boolean ifNotExists,
String tableName,
boolean unique,
List<String> columns,
boolean isCreatedWithTable
List<String> columns
) throws CatalogValidationException {
super(schemaName, indexName, ifNotExists, tableName, unique, columns, isCreatedWithTable);
super(schemaName, indexName, ifNotExists, tableName, unique, columns);
}

@Override
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status) {
return new CatalogHashIndexDescriptor(indexId, indexName, tableId, unique, status, columns, isCreatedWithTable);
protected CatalogIndexDescriptor createDescriptor(int indexId, int tableId, CatalogIndexStatus status, boolean createdWithTable) {
return new CatalogHashIndexDescriptor(indexId, indexName, tableId, unique, status, columns, createdWithTable);
}

private static class Builder implements CreateHashIndexCommandBuilder {
Expand All @@ -68,7 +67,6 @@ private static class Builder implements CreateHashIndexCommandBuilder {
private String tableName;
private List<String> columns;
private boolean unique;
private boolean isCreatedWithTable;

@Override
public Builder tableName(String tableName) {
Expand Down Expand Up @@ -112,16 +110,9 @@ public CreateHashIndexCommandBuilder ifNotExists(boolean ifNotExists) {
return this;
}

@Override
public CreateHashIndexCommandBuilder isCreatedWithTable(boolean isCreatedWithTable) {
this.isCreatedWithTable = isCreatedWithTable;

return this;
}

@Override
public CatalogCommand build() {
return new CreateHashIndexCommand(schemaName, indexName, ifNotExists, tableName, unique, columns, isCreatedWithTable);
return new CreateHashIndexCommand(schemaName, indexName, ifNotExists, tableName, unique, columns);
}
}
}
Loading

0 comments on commit 292cef1

Please sign in to comment.