Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

IGNITE-24029 Sql schema. Extend Table API with supporting qualified names #5038

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.QualifiedName;

/**
* Broadcast job execution target.
Expand Down Expand Up @@ -68,10 +69,23 @@ static BroadcastJobTarget nodes(Set<ClusterNode> nodes) {
* Creates a job target for partitioned execution. For each partition in the provided table the job will be executed on a node that
* holds the primary replica.
*
* @param tableName Table name.
* @param tableName Name of the table with SQL-parser style quotation, e.g.
* "tbl0" - the table "TBL0" will be looked up, "\"Tbl0\"" - "Tbl0", etc.
* @return Job target.
*/
static BroadcastJobTarget table(String tableName) {
return new TableJobTarget(tableName);
return table(QualifiedName.parse(tableName));
ygerzhedovich marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Creates a job target for partitioned execution. For each partition in the provided table the job will be executed on a node that
* holds the primary replica.
*
* @param tableName Table name.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the same, seems it should be mention not just a table name, but a fully qualified table name together with schema

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? The methods accept an object that represents table name.
We don't care how the object was created.

* @return Job target.
*/
static BroadcastJobTarget table(QualifiedName tableName) {
// TODO IGNITE-24033 Compute API must use QualifiedName.
return new TableJobTarget(tableName.objectName());
}
}
33 changes: 31 additions & 2 deletions modules/api/src/main/java/org/apache/ignite/compute/JobTarget.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.HashSet;
import java.util.Set;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;

Expand Down Expand Up @@ -91,7 +92,21 @@ static JobTarget anyNode(Set<ClusterNode> nodes) {
* @return Job target.
*/
static JobTarget colocated(String tableName, Tuple key) {
return new ColocatedJobTarget(tableName, key, null);
return colocated(QualifiedName.parse(tableName), key);
}

/**
* Creates a colocated job target for a specific table and key.
*
* <p>This target determines that a job should be executed on the same node that hosts the data for a given key of provided table.
*
* @param tableName Table name.
* @param key Key.
* @return Job target.
*/
static JobTarget colocated(QualifiedName tableName, Tuple key) {
// TODO IGNITE-24033 Compute API must use QualifiedName.
return new ColocatedJobTarget(tableName.objectName(), key, null);
}

/**
Expand All @@ -104,6 +119,20 @@ static JobTarget colocated(String tableName, Tuple key) {
* @return Job target.
*/
static <K> JobTarget colocated(String tableName, K key, Mapper<K> keyMapper) {
return new ColocatedJobTarget(tableName, key, keyMapper);
return colocated(QualifiedName.parse(tableName), key, keyMapper);
}

/**
* Creates a colocated job target for a specific table and key with mapper.
*
* <p>This target determines that a job should be executed on the same node that hosts the data for a given key of provided table.
*
* @param tableName Table name.
* @param key Key.
* @return Job target.
*/
static <K> JobTarget colocated(QualifiedName tableName, K key, Mapper<K> keyMapper) {
// TODO IGNITE-24033 Compute API must use QualifiedName.
return new ColocatedJobTarget(tableName.objectName(), key, keyMapper);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,23 @@
package org.apache.ignite.lang;

import static org.apache.ignite.lang.ErrorGroups.Table.TABLE_NOT_FOUND_ERR;
import static org.apache.ignite.lang.util.IgniteNameUtils.canonicalName;

import java.util.UUID;
import org.apache.ignite.table.QualifiedName;
import org.jetbrains.annotations.Nullable;

/**
* Exception is thrown when a specified table cannot be found.
*/
public class TableNotFoundException extends IgniteException {

/**
* Creates an exception with the given table name.
*
* @param schemaName Schema name.
* @param tableName Table name.
*/
public TableNotFoundException(String schemaName, String tableName) {
super(TABLE_NOT_FOUND_ERR, "The table does not exist [name=" + canonicalName(schemaName, tableName) + ']');
public TableNotFoundException(QualifiedName tableName) {
super(TABLE_NOT_FOUND_ERR, "The table does not exist [name=" + tableName.toCanonicalForm() + ']');
ygerzhedovich marked this conversation as resolved.
Show resolved Hide resolved
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ public interface IgniteTables {
* "tbl0" - the table "TBL0" will be looked up, "\"Tbl0\"" - "Tbl0", etc.
* @return Table identified by name or {@code null} if table doesn't exist.
*/
Table table(String name);
default Table table(String name) {
return table(QualifiedName.parse(name));
}

/**
* Gets a table with the specified name if that table exists.
*
* @param name Table name.
* @return Table identified by name or {@code null} if table doesn't exist.
*/
Table table(QualifiedName name);

/**
* Gets a table with the specified name if that table exists.
Expand All @@ -54,5 +64,17 @@ public interface IgniteTables {
* "tbl0" - the table "TBL0" will be looked up, "\"Tbl0\"" - "Tbl0", etc.
* @return Future that represents the pending completion of the operation.
*/
CompletableFuture<Table> tableAsync(String name);
default CompletableFuture<Table> tableAsync(String name) {
return CompletableFuture.completedFuture(name)
.thenApply(QualifiedName::parse)
.thenCompose(this::tableAsync);
}

/**
* Gets a table with the specified name if that table exists.
*
* @param name Table name.
* @return Future that represents the pending completion of the operation.
*/
CompletableFuture<Table> tableAsync(QualifiedName name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.ignite.lang.util.IgniteNameUtils.identifierStart;
import static org.apache.ignite.lang.util.IgniteNameUtils.quote;

import java.io.Serializable;
import java.util.NoSuchElementException;
import java.util.Objects;
import org.jetbrains.annotations.Nullable;
Expand All @@ -46,7 +47,9 @@
* for quoted names - the unnecessary quotes will be removed preserving escaped double-quote symbols.
* E.g. "tbl0" - is equivalent to "TBL0", "\"Tbl0\"" - "Tbl0", etc.
*/
public class QualifiedName {
public final class QualifiedName implements Serializable {
private static final long serialVersionUID = -7016402388810709149L;

/** Default schema name. */
public static final String DEFAULT_SCHEMA_NAME = "PUBLIC";

Expand Down Expand Up @@ -119,7 +122,7 @@ public static QualifiedName of(@Nullable String schemaName, String objectName) {
* @param schemaName Normalized schema name.
* @param objectName Normalized object name.
*/
private QualifiedName(String schemaName, String objectName) {
QualifiedName(String schemaName, String objectName) {
this.schemaIdentifier = schemaName;
this.objectIdentifier = objectName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public interface Table {
*
* @return Table name.
*/
String name();
QualifiedName name();

/**
* Gets the partition manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
import static org.apache.ignite.internal.PublicApiThreadingTests.tryToSwitchFromUserThreadWithDelayedSchemaSync;
import static org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.catalog.ItCatalogDslTest.POJO_RECORD_TABLE_NAME;
import static org.apache.ignite.internal.catalog.ItCatalogDslTest.ZONE_NAME;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
Expand All @@ -32,7 +31,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.Ignite;
import org.apache.ignite.catalog.IgniteCatalog;
import org.apache.ignite.catalog.definitions.TableDefinition;
import org.apache.ignite.catalog.definitions.ZoneDefinition;
Expand All @@ -53,14 +51,8 @@ protected int initialNodes() {

@AfterEach
void clearDatabase() {
Ignite ignite = CLUSTER.aliveNode();

ignite.tables().tables().forEach(table -> sql("DROP TABLE " + table.name()));

CatalogManagerImpl catalogManager = (CatalogManagerImpl) unwrapIgniteImpl(ignite).catalogManager();
catalogManager.zones(catalogManager.latestCatalogVersion()).stream()
.filter(zone -> !CatalogManagerImpl.DEFAULT_ZONE_NAME.equals(zone.name()))
.forEach(zone -> sql("DROP ZONE " + zone.name()));
dropAllTables();
dropAllZonesExceptDefaultOne();
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ public void createAndGetDefinitionTest() {
@Test
public void createAllColumnTypesFromPojo() {
Table table = catalog().createTable(AllColumnTypesPojo.class);
assertEquals("ALLCOLUMNTYPESPOJO", table.name());
assertEquals("ALLCOLUMNTYPESPOJO", table.name().objectName());

TableDefinition tableDef = catalog().tableDefinition(table.name());
assertEquals(tableDef.tableName(), tableDef.tableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public CompletableFuture<Void> stopAsync(ComponentContext componentContext) {
}

@Override
public @Nullable CatalogTableDescriptor table(String tableName, long timestamp) {
CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(SqlCommon.DEFAULT_SCHEMA_NAME);
public @Nullable CatalogTableDescriptor table(String schemaName, String tableName, long timestamp) {
CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(schemaName);
if (schema == null) {
return null;
}
Expand All @@ -216,8 +216,8 @@ public Collection<CatalogTableDescriptor> tables(int catalogVersion) {
}

@Override
public @Nullable CatalogIndexDescriptor aliveIndex(String indexName, long timestamp) {
CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(SqlCommon.DEFAULT_SCHEMA_NAME);
public @Nullable CatalogIndexDescriptor aliveIndex(String schemaName, String indexName, long timestamp) {
CatalogSchemaDescriptor schema = catalogAt(timestamp).schema(schemaName);
if (schema == null) {
return null;
}
Expand Down Expand Up @@ -379,7 +379,7 @@ private CompletableFuture<Void> initCatalog(Catalog emptyCatalog) {
.dataNodesAutoAdjustScaleDown(INFINITE_TIMER_VALUE)
.filter(DEFAULT_FILTER)
.storageProfilesParams(
List.of(StorageProfileParams.builder().storageProfile(CatalogService.DEFAULT_STORAGE_PROFILE).build())
List.of(StorageProfileParams.builder().storageProfile(DEFAULT_STORAGE_PROFILE).build())
)
.build(),
AlterZoneSetDefaultCommand.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,45 @@ public interface CatalogService extends EventProducer<CatalogEvent, CatalogEvent

@Nullable Catalog catalog(int catalogVersion);

@Nullable CatalogTableDescriptor table(String tableName, long timestamp);
/**
* Returns table descriptor by the given schema name and table name at given timestamp.
*
* @return Table descriptor or {@code null} if table not found.
*/
@Nullable CatalogTableDescriptor table(String schemaName, String tableName, long timestamp);

/**
* Returns table descriptor by the given table ID and given timestamp.
*
* @return Table descriptor or {@code null} if table not found.
*/
@Nullable CatalogTableDescriptor table(int tableId, long timestamp);

/**
* Returns table descriptor by the given table ID and catalog version.
*
* @return Table descriptor or {@code null} if table not found.
*/
@Nullable CatalogTableDescriptor table(int tableId, int catalogVersion);

Collection<CatalogTableDescriptor> tables(int catalogVersion);

/**
* Returns an <em>alive</em> index with the given name, that is an index that has not been dropped yet at a given point in time.
* Returns a descriptor for <em>alive</em> index by the given schema name and index name at given timestamp,
* that is an index that has not been dropped yet at a given point in time.
*
* <p>This effectively means that the index must be present in the Catalog and not in the {@link CatalogIndexStatus#STOPPING}
* state.
*/
@Nullable CatalogIndexDescriptor aliveIndex(String indexName, long timestamp);
@Nullable CatalogIndexDescriptor aliveIndex(String schemaName, String indexName, long timestamp);

@Nullable CatalogIndexDescriptor index(int indexId, long timestamp);

/**
* Returns index descriptor by the given index ID and catalog version.
*
* @return Index descriptor or {@code null} if index not found.
*/
@Nullable CatalogIndexDescriptor index(int indexId, int catalogVersion);

Collection<CatalogIndexDescriptor> indexes(int catalogVersion);
Expand Down
Loading