diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java index dcfa86a8..af3a8e9b 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java @@ -32,6 +32,8 @@ import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; import com.alibaba.fluss.lakehouse.LakeStorageInfo; +import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.PartitionInfo; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.SchemaInfo; @@ -95,9 +97,44 @@ public interface Admin extends AutoCloseable { * @throws InvalidDatabaseException if the database name is invalid, e.g., contains illegal * characters, or exceeds the maximum length. */ + @Deprecated CompletableFuture createDatabase(String databaseName, boolean ignoreIfExists) throws InvalidDatabaseException; + /** + * Create a new database asynchronously. + * + *

The following exceptions can be anticipated when calling {@code get()} on returned future. + * + *

+ * + * @param databaseName The name of the database to create. + * @param databaseDescriptor The descriptor of the database to create. + * @param ignoreIfExists Flag to specify behavior when a database with the given name already + * exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do + * nothing. + * @throws InvalidDatabaseException if the database name is invalid, e.g., contains illegal + * characters, or exceeds the maximum length. + */ + CompletableFuture createDatabase( + String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists); + + /** + * Get the database with the given database name asynchronously. + * + *

The following exceptions can be anticipated when calling {@code get()} on returned future. + * + *

+ * + * @param databaseName The database name of the database. + */ + CompletableFuture getDatabase(String databaseName); + /** * Delete the database with the given name asynchronously. * diff --git a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java index 948596c3..b3ef2ca7 100644 --- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java @@ -22,6 +22,8 @@ import com.alibaba.fluss.client.table.snapshot.PartitionSnapshotInfo; import com.alibaba.fluss.client.utils.ClientRpcMessageUtils; import com.alibaba.fluss.lakehouse.LakeStorageInfo; +import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.PartitionInfo; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.Schema; @@ -41,6 +43,7 @@ import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest; import com.alibaba.fluss.rpc.messages.DropDatabaseRequest; import com.alibaba.fluss.rpc.messages.DropTableRequest; +import com.alibaba.fluss.rpc.messages.GetDatabaseRequest; import com.alibaba.fluss.rpc.messages.GetKvSnapshotRequest; import com.alibaba.fluss.rpc.messages.GetLakeTableSnapshotRequest; import com.alibaba.fluss.rpc.messages.GetPartitionSnapshotRequest; @@ -120,16 +123,39 @@ public CompletableFuture getTableSchema(TablePath tablePath, int sch @Override public CompletableFuture createDatabase(String databaseName, boolean ignoreIfExists) { + return createDatabase(databaseName, DatabaseDescriptor.builder().build(), ignoreIfExists); + } + + @Override + public CompletableFuture createDatabase( + String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists) { TablePath.validateDatabaseName(databaseName); CreateDatabaseRequest request = new CreateDatabaseRequest(); - request.setDatabaseName(databaseName).setIgnoreIfExists(ignoreIfExists); + request.setDatabaseJson(databaseDescriptor.toJsonBytes()) + .setDatabaseName(databaseName) + .setIgnoreIfExists(ignoreIfExists); return gateway.createDatabase(request).thenApply(r -> null); } + @Override + public CompletableFuture getDatabase(String databaseName) { + GetDatabaseRequest request = new GetDatabaseRequest(); + request.setDatabaseName(databaseName); + return gateway.getDatabase(request) + .thenApply( + r -> + new DatabaseInfo( + databaseName, + DatabaseDescriptor.fromJsonBytes(r.getDatabaseJson()), + r.getCreateTime(), + r.getModifyTime())); + } + @Override public CompletableFuture deleteDatabase( String databaseName, boolean ignoreIfNotExists, boolean cascade) { DropDatabaseRequest request = new DropDatabaseRequest(); + request.setIgnoreIfNotExists(ignoreIfNotExists) .setCascade(cascade) .setDatabaseName(databaseName); @@ -176,7 +202,9 @@ public CompletableFuture getTable(TablePath tablePath) { tablePath, r.getTableId(), TableDescriptor.fromJsonBytes(r.getTableJson()), - r.getSchemaId())); + r.getSchemaId(), + r.getCreateTime(), + r.getModifyTime())); } @Override diff --git a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java index f957e55c..e5c3b7e6 100644 --- a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java @@ -34,6 +34,8 @@ import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; import com.alibaba.fluss.fs.FsPathAndFileName; +import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.PartitionInfo; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.SchemaInfo; @@ -106,18 +108,41 @@ void testMultiClient() throws Exception { admin2.close(); } + @Test + void testGetDatabase() throws Exception { + long timestampBeforeCreate = System.currentTimeMillis(); + admin.createDatabase( + "test_db_2", + DatabaseDescriptor.builder() + .comment("test comment") + .property("key1", "value1") + .build(), + false); + DatabaseInfo databaseInfo = admin.getDatabase("test_db_2").get(); + long timestampAfterCreate = System.currentTimeMillis(); + assertThat(databaseInfo.getDatabaseName()).isEqualTo("test_db_2"); + assertThat(databaseInfo.getDatabaseDescriptor().getComment().get()) + .isEqualTo("test comment"); + assertThat(databaseInfo.getDatabaseDescriptor().getProperties()) + .containsEntry("key1", "value1"); + assertThat(databaseInfo.getCreateTime()) + .isBetween(timestampBeforeCreate, timestampAfterCreate); + } + @Test void testGetTableAndSchema() throws Exception { SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get(); assertThat(schemaInfo.getSchema()).isEqualTo(DEFAULT_SCHEMA); assertThat(schemaInfo.getSchemaId()).isEqualTo(1); SchemaInfo schemaInfo2 = admin.getTableSchema(DEFAULT_TABLE_PATH, 1).get(); - assertThat(schemaInfo2).isEqualTo(schemaInfo); // get default table. + long timestampAfterCreate = System.currentTimeMillis(); TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get(); assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId()); assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR); + assertThat(schemaInfo2).isEqualTo(schemaInfo); + assertThat(tableInfo.getCreateTime()).isLessThan(timestampAfterCreate); // unknown table assertThatThrownBy(() -> admin.getTable(TablePath.of("test_db", "unknown_table")).get()) @@ -127,6 +152,18 @@ void testGetTableAndSchema() throws Exception { () -> admin.getTableSchema(TablePath.of("test_db", "unknown_table")).get()) .cause() .isInstanceOf(SchemaNotExistException.class); + + // create and get a new table + long timestampBeforeCreate = System.currentTimeMillis(); + TablePath tablePath = TablePath.of("test_db", "table_2"); + admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false); + tableInfo = admin.getTable(tablePath).get(); + timestampAfterCreate = System.currentTimeMillis(); + assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId()); + assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR); + assertThat(schemaInfo2).isEqualTo(schemaInfo); + assertThat(tableInfo.getCreateTime()) + .isBetween(timestampBeforeCreate, timestampAfterCreate); } @Test diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/DatabaseDescriptor.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DatabaseDescriptor.java new file mode 100644 index 00000000..3a2bcca9 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DatabaseDescriptor.java @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metadata; + +import com.alibaba.fluss.annotation.PublicEvolving; +import com.alibaba.fluss.config.ConfigOption; +import com.alibaba.fluss.config.ConfigurationUtils; +import com.alibaba.fluss.utils.Preconditions; +import com.alibaba.fluss.utils.json.DatabaseDescriptorJsonSerde; +import com.alibaba.fluss.utils.json.JsonSerdeUtils; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Represents the metadata of a database in Fluss. + * + *

It contains all characteristics that can be expressed in a SQL {@code CREATE Database} + * statement, such as schema, primary keys, partition keys, bucket keys, and options. + * + * @since 0.6 + */ +public class DatabaseDescriptor { + private final Map properties; + private final String comment; + + private DatabaseDescriptor(Map properties, @Nullable String comment) { + this.properties = properties; + this.comment = comment; + } + + public Map getProperties() { + return properties; + } + + public Optional getComment() { + return Optional.ofNullable(comment); + } + + /** + * Serialize the table descriptor to a JSON byte array. + * + * @see DatabaseDescriptorJsonSerde + */ + public byte[] toJsonBytes() { + return JsonSerdeUtils.writeValueAsBytes(this, DatabaseDescriptorJsonSerde.INSTANCE); + } + + /** + * Deserialize from JSON byte array to an instance of {@link DatabaseDescriptor}. + * + * @see DatabaseDescriptor + */ + public static DatabaseDescriptor fromJsonBytes(byte[] json) { + return JsonSerdeUtils.readValue(json, DatabaseDescriptorJsonSerde.INSTANCE); + } + + /** Creates a builder for building database descriptor. */ + public static Builder builder() { + return new Builder(); + } + + // --------------------------------------------------------------------------------------------- + + /** Builder for {@link TableDescriptor}. */ + @PublicEvolving + public static class Builder { + + private final Map properties; + private @Nullable String comment; + + protected Builder() { + this.properties = new HashMap<>(); + } + + protected Builder(TableDescriptor descriptor) { + this.properties = new HashMap<>(descriptor.getProperties()); + this.comment = descriptor.getComment().orElse(null); + } + + /** + * Sets table property on the table. + * + *

Table properties are controlled by Fluss and will change the behavior of the table. + */ + public Builder property(ConfigOption configOption, T value) { + Preconditions.checkNotNull(configOption, "Config option must not be null."); + Preconditions.checkNotNull(value, "Value must not be null."); + properties.put( + configOption.key(), ConfigurationUtils.convertValue(value, String.class)); + return this; + } + + /** + * Sets table property on the table. + * + *

Table properties are controlled by Fluss and will change the behavior of the table. + */ + public Builder property(String key, String value) { + Preconditions.checkNotNull(key, "Key must not be null."); + Preconditions.checkNotNull(value, "Value must not be null."); + properties.put(key, value); + return this; + } + + /** + * Sets table properties on the table. + * + *

Table properties are controlled by Fluss and will change the behavior of the table. + */ + public Builder properties(Map properties) { + Preconditions.checkNotNull(properties, "properties must not be null."); + this.properties.putAll(properties); + return this; + } + + /** Define the comment for this table. */ + public Builder comment(@Nullable String comment) { + this.comment = comment; + return this; + } + + /** Returns an immutable instance of {@link TableDescriptor}. */ + public DatabaseDescriptor build() { + return new DatabaseDescriptor(properties, comment); + } + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/DatabaseInfo.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DatabaseInfo.java new file mode 100644 index 00000000..acc7139c --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/DatabaseInfo.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.metadata; + +/** + * Information of a database metadata, includes {@link DatabaseDescriptor}. + * + * @since 0.6 + */ +public class DatabaseInfo { + private final String databaseName; + private final DatabaseDescriptor databaseDescriptor; + private final long createTime; + private final long modifyTime; + + public DatabaseInfo( + String databaseName, + DatabaseDescriptor databaseDescriptor, + long createTime, + long modifyTime) { + this.databaseName = databaseName; + this.databaseDescriptor = databaseDescriptor; + this.createTime = createTime; + this.modifyTime = modifyTime; + } + + public String getDatabaseName() { + return databaseName; + } + + public DatabaseDescriptor getDatabaseDescriptor() { + return databaseDescriptor; + } + + public long getCreateTime() { + return createTime; + } + + public long getModifyTime() { + return modifyTime; + } +} diff --git a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableInfo.java b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableInfo.java index cdf8c69a..1a7159d2 100644 --- a/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableInfo.java +++ b/fluss-common/src/main/java/com/alibaba/fluss/metadata/TableInfo.java @@ -37,13 +37,27 @@ public final class TableInfo { private final TableDescriptor tableDescriptor; private final long tableId; private final int schemaId; + private final long createTime; + private final long modifyTime; public TableInfo( TablePath tablePath, long tableId, TableDescriptor tableDescriptor, int schemaId) { + this(tablePath, tableId, tableDescriptor, schemaId, -1, -1); + } + + public TableInfo( + TablePath tablePath, + long tableId, + TableDescriptor tableDescriptor, + int schemaId, + long createTime, + long modifyTime) { this.tablePath = tablePath; this.tableId = tableId; this.tableDescriptor = tableDescriptor; this.schemaId = schemaId; + this.createTime = createTime; + this.modifyTime = modifyTime; } public TablePath getTablePath() { @@ -62,6 +76,14 @@ public int getSchemaId() { return schemaId; } + public long getCreateTime() { + return createTime; + } + + public long getModifyTime() { + return modifyTime; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -79,7 +101,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(tablePath, tableDescriptor, tableId, schemaId); + return Objects.hash(tablePath, tableDescriptor, tableId, schemaId, createTime); } @Override @@ -91,6 +113,8 @@ public String toString() { + tableId + ", schemaId=" + schemaId + + ", createTime=" + + createTime + ", tableDescriptor=" + tableDescriptor + '}'; diff --git a/fluss-common/src/main/java/com/alibaba/fluss/utils/json/DatabaseDescriptorJsonSerde.java b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/DatabaseDescriptorJsonSerde.java new file mode 100644 index 00000000..0690da05 --- /dev/null +++ b/fluss-common/src/main/java/com/alibaba/fluss/utils/json/DatabaseDescriptorJsonSerde.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.utils.json; + +import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Json serializer and deserializer for {@link DatabaseDescriptor}. */ +public class DatabaseDescriptorJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final DatabaseDescriptorJsonSerde INSTANCE = new DatabaseDescriptorJsonSerde(); + + static final String PROPERTIES_NAME = "properties"; + static final String COMMENT_NAME = "comment"; + + private static final String VERSION_KEY = "version"; + private static final int VERSION = 1; + + @Override + public void serialize(DatabaseDescriptor databaseDescriptor, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + + // serialize data version. + generator.writeNumberField(VERSION_KEY, VERSION); + + // serialize comment. + if (databaseDescriptor.getComment().isPresent()) { + generator.writeStringField(COMMENT_NAME, databaseDescriptor.getComment().get()); + } + + // serialize properties. + generator.writeObjectFieldStart(PROPERTIES_NAME); + for (Map.Entry entry : databaseDescriptor.getProperties().entrySet()) { + generator.writeObjectField(entry.getKey(), entry.getValue()); + } + generator.writeEndObject(); + + generator.writeEndObject(); + } + + @Override + public DatabaseDescriptor deserialize(JsonNode node) { + DatabaseDescriptor.Builder builder = DatabaseDescriptor.builder(); + + JsonNode commentNode = node.get(COMMENT_NAME); + if (commentNode != null) { + builder.comment(commentNode.asText()); + } + + builder.properties(deserializeProperties(node.get(PROPERTIES_NAME))); + + return builder.build(); + } + + private Map deserializeProperties(JsonNode node) { + HashMap properties = new HashMap<>(); + Iterator optionsKeys = node.fieldNames(); + while (optionsKeys.hasNext()) { + String key = optionsKeys.next(); + properties.put(key, node.get(key).asText()); + } + return properties; + } +} diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java index ddbc8a5f..704987f3 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalog.java @@ -163,7 +163,11 @@ public void createDatabase( String databaseName, CatalogDatabase database, boolean ignoreIfExists) throws DatabaseAlreadyExistException, CatalogException { try { - admin.createDatabase(databaseName, ignoreIfExists).get(); + admin.createDatabase( + databaseName, + FlinkConversions.toFlussDatabase(database), + ignoreIfExists) + .get(); } catch (Exception e) { Throwable t = ExceptionUtils.stripExecutionException(e); if (CatalogExceptionUtils.isDatabaseAlreadyExist(t)) { diff --git a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java index 7eb26706..c2ee5990 100644 --- a/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java +++ b/fluss-connectors/fluss-connector-flink/src/main/java/com/alibaba/fluss/connector/flink/utils/FlinkConversions.java @@ -23,6 +23,7 @@ import com.alibaba.fluss.config.Password; import com.alibaba.fluss.connector.flink.FlinkConnectorOptions; import com.alibaba.fluss.connector.flink.catalog.FlinkCatalogFactory; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; @@ -32,6 +33,7 @@ import com.alibaba.fluss.utils.TimeUtils; import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.CatalogDatabase; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedCatalogTable; @@ -242,6 +244,15 @@ public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) { .build(); } + /** Convert Flink's table to Fluss's database. */ + public static DatabaseDescriptor toFlussDatabase(CatalogDatabase catalogDatabase) { + return DatabaseDescriptor.builder() + .comment(catalogDatabase.getComment()) + // todo: flink databse catalog will support option later. + .properties(Collections.emptyMap()) + .build(); + } + /** Convert Fluss's ConfigOptions to Flink's ConfigOptions. */ public static List> toFlinkOptions( Collection> flussOption) { diff --git a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java index b27deda1..b484165e 100644 --- a/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java +++ b/fluss-connectors/fluss-connector-flink/src/test/java/com/alibaba/fluss/connector/flink/catalog/FlinkCatalogTest.java @@ -27,6 +27,7 @@ import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogBaseTable; import org.apache.flink.table.catalog.CatalogDatabase; +import org.apache.flink.table.catalog.CatalogDatabaseImpl; import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ObjectPath; @@ -123,7 +124,8 @@ static void afterAll() { @BeforeEach void beforeEach() throws Exception { try { - catalog.createDatabase(DEFAULT_DB, null, true); + catalog.createDatabase( + DEFAULT_DB, new CatalogDatabaseImpl(Collections.emptyMap(), null), true); } catch (CatalogException e) { // the auto partitioned manager may create the db zk node // in an another thread, so if exception is NodeExistsException, just ignore @@ -320,14 +322,19 @@ void testUnsupportedTable() { @Test void testDatabase() throws Exception { // test create db1 - catalog.createDatabase("db1", null, false); + catalog.createDatabase("db1", new CatalogDatabaseImpl(Collections.emptyMap(), null), false); // test create db2 - catalog.createDatabase("db2", null, false); + catalog.createDatabase("db2", new CatalogDatabaseImpl(Collections.emptyMap(), null), false); assertThat(catalog.databaseExists("db2")).isTrue(); // create the database again should throw exception with ignore if exist = false - assertThatThrownBy(() -> catalog.createDatabase("db2", null, false)); + assertThatThrownBy( + () -> + catalog.createDatabase( + "db2", + new CatalogDatabaseImpl(Collections.emptyMap(), null), + false)); // should be ok since we set ignore if exist = true - catalog.createDatabase("db2", null, true); + catalog.createDatabase("db2", new CatalogDatabaseImpl(Collections.emptyMap(), null), true); CatalogDatabase db2 = catalog.getDatabase("db2"); assertThat(db2.getProperties()).isEmpty(); // test create table in db1 diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java index 0ab45046..e571fd07 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java @@ -21,6 +21,8 @@ import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse; import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest; import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse; +import com.alibaba.fluss.rpc.messages.GetDatabaseRequest; +import com.alibaba.fluss.rpc.messages.GetDatabaseResponse; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; import com.alibaba.fluss.rpc.messages.GetKvSnapshotRequest; @@ -63,6 +65,16 @@ public interface AdminReadOnlyGateway extends RpcGateway { @RPC(api = ApiKeys.LIST_DATABASES) CompletableFuture listDatabases(ListDatabasesRequest request); + /** + * Return a {@link com.alibaba.fluss.rpc.messages.GetDatabaseResponse} by the given {@link + * com.alibaba.fluss.rpc.messages.GetDatabaseRequest}. + * + * @param request Name of the database + * @return The response of requested database. + */ + @RPC(api = ApiKeys.GET_DATABASE) + CompletableFuture getDatabase(GetDatabaseRequest request); + /** * Check if a database exists in this catalog. * diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java index 949917b2..81ce0577 100644 --- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java @@ -61,7 +61,8 @@ public enum ApiKeys { NOTIFY_LAKE_TABLE_OFFSET(1031, 0, 0, PRIVATE), DESCRIBE_LAKE_STORAGE(1032, 0, 0, PUBLIC), GET_LAKE_TABLE_SNAPSHOT(1033, 0, 0, PUBLIC), - LIMIT_SCAN(1034, 0, 0, PUBLIC); + LIMIT_SCAN(1034, 0, 0, PUBLIC), + GET_DATABASE(1035, 0, 0, PUBLIC); private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 4c68064a..9e5d604b 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -49,11 +49,23 @@ message GetTableSchemaResponse { message CreateDatabaseRequest { required string database_name = 1; required bool ignore_if_exists = 2; + optional bytes database_json = 3; } message CreateDatabaseResponse { } +// get table request and response +message GetDatabaseRequest { + required string database_name = 1; +} + +message GetDatabaseResponse { + required bytes database_json = 3; + optional int64 create_time = 4; + optional int64 modify_time = 5; +} + // drop database request and response message DropDatabaseRequest { required string database_name = 1; @@ -97,6 +109,8 @@ message GetTableResponse { required int64 table_id = 1; required int32 schema_id = 2; required bytes table_json = 3; + optional int64 create_time = 4; + optional int64 modify_time = 5; } // list tables request and response diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java index 57dc8228..d1630b9c 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java @@ -31,6 +31,7 @@ import com.alibaba.fluss.fs.FileSystem; import com.alibaba.fluss.fs.token.ObtainedSecurityToken; import com.alibaba.fluss.lakehouse.LakeStorageInfo; +import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.PhysicalTablePath; import com.alibaba.fluss.metadata.SchemaInfo; import com.alibaba.fluss.metadata.TableBucket; @@ -45,6 +46,8 @@ import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse; import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest; import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse; +import com.alibaba.fluss.rpc.messages.GetDatabaseRequest; +import com.alibaba.fluss.rpc.messages.GetDatabaseResponse; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; import com.alibaba.fluss.rpc.messages.GetKvSnapshotRequest; @@ -177,6 +180,16 @@ public CompletableFuture listDatabases(ListDatabasesReque return CompletableFuture.completedFuture(response); } + @Override + public CompletableFuture getDatabase(GetDatabaseRequest request) { + GetDatabaseResponse response = new GetDatabaseResponse(); + DatabaseInfo databaseInfo = metadataManager.getDatabase(request.getDatabaseName()); + response.setDatabaseJson(databaseInfo.getDatabaseDescriptor().toJsonBytes()) + .setCreateTime(databaseInfo.getCreateTime()) + .setModifyTime(databaseInfo.getModifyTime()); + return CompletableFuture.completedFuture(response); + } + @Override public CompletableFuture databaseExists(DatabaseExistsRequest request) { DatabaseExistsResponse response = new DatabaseExistsResponse(); @@ -201,6 +214,8 @@ public CompletableFuture getTable(GetTableRequest request) { response.setTableJson(tableInfo.getTableDescriptor().toJsonBytes()); response.setSchemaId(tableInfo.getSchemaId()); response.setTableId(tableInfo.getTableId()); + response.setCreateTime(tableInfo.getCreateTime()); + response.setModifyTime(tableInfo.getModifyTime()); return CompletableFuture.completedFuture(response); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java index 20bf38a0..7544d91b 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.config.ConfigOptions; import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.IllegalConfigurationException; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metrics.registry.MetricRegistry; import com.alibaba.fluss.rpc.RpcClient; import com.alibaba.fluss.rpc.RpcServer; @@ -225,7 +226,8 @@ private void createDefaultDatabase() { MetaDataManager metaDataManager = new MetaDataManager(zkClient); List databases = metaDataManager.listDatabases(); if (databases.isEmpty()) { - metaDataManager.createDatabase(DEFAULT_DATABASE, true); + metaDataManager.createDatabase( + DEFAULT_DATABASE, DatabaseDescriptor.builder().build(), true); LOG.info("Created default database '{}' because no database exists.", DEFAULT_DATABASE); } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java index 0271b5c2..f12cd425 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java @@ -22,6 +22,7 @@ import com.alibaba.fluss.exception.InvalidDatabaseException; import com.alibaba.fluss.exception.InvalidTableException; import com.alibaba.fluss.fs.FileSystem; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TablePath; @@ -110,7 +111,15 @@ public CompletableFuture createDatabase(CreateDatabaseRe } catch (InvalidDatabaseException e) { return FutureUtils.failedFuture(e); } - metadataManager.createDatabase(request.getDatabaseName(), request.isIgnoreIfExists()); + + DatabaseDescriptor databaseDescriptor = null; + if (request.getDatabaseJson() != null) { + databaseDescriptor = DatabaseDescriptor.fromJsonBytes(request.getDatabaseJson()); + } else { + databaseDescriptor = DatabaseDescriptor.builder().build(); + } + metadataManager.createDatabase( + request.getDatabaseName(), databaseDescriptor, request.isIgnoreIfExists()); return CompletableFuture.completedFuture(response); } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java index bea0b3a3..04e1c286 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetaDataManager.java @@ -25,11 +25,14 @@ import com.alibaba.fluss.exception.TableAlreadyExistException; import com.alibaba.fluss.exception.TableNotExistException; import com.alibaba.fluss.exception.TableNotPartitionedException; +import com.alibaba.fluss.metadata.DatabaseDescriptor; +import com.alibaba.fluss.metadata.DatabaseInfo; import com.alibaba.fluss.metadata.SchemaInfo; import com.alibaba.fluss.metadata.TableDescriptor; import com.alibaba.fluss.metadata.TableInfo; import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.zk.ZooKeeperClient; +import com.alibaba.fluss.server.zk.data.DatabaseRegistration; import com.alibaba.fluss.server.zk.data.TableAssignment; import com.alibaba.fluss.server.zk.data.TableRegistration; import com.alibaba.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; @@ -57,7 +60,8 @@ public MetaDataManager(ZooKeeperClient zookeeperClient) { this.zookeeperClient = zookeeperClient; } - public void createDatabase(String databaseName, boolean ignoreIfExists) + public void createDatabase( + String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists) throws DatabaseAlreadyExistException { if (databaseExists(databaseName)) { if (ignoreIfExists) { @@ -67,11 +71,41 @@ public void createDatabase(String databaseName, boolean ignoreIfExists) "Database " + databaseName + " already exists."); } + DatabaseRegistration databaseRegistration = DatabaseRegistration.of(databaseDescriptor); + uncheck( - () -> zookeeperClient.registerDatabase(databaseName), + () -> zookeeperClient.registerDatabase(databaseName, databaseRegistration), "Fail to create database: " + databaseName); } + public DatabaseInfo getDatabase(String databaseName) throws DatabaseNotExistException { + if (!databaseExists(databaseName)) { + throw new DatabaseNotExistException("Database " + databaseName + " does not exist."); + } + + Optional optionalTable; + try { + optionalTable = zookeeperClient.getDatabase(databaseName); + } catch (Exception e) { + throw new FlussRuntimeException( + String.format("Fail to get database '%s'.", databaseName), e); + } + + DatabaseRegistration databaseReg = null; + if (optionalTable.isPresent()) { + databaseReg = optionalTable.get(); + } else { + // the database node before 0.6 is empty. + databaseReg = DatabaseRegistration.of(DatabaseDescriptor.builder().build()); + } + + return new DatabaseInfo( + databaseName, + databaseReg.toDatabaseDescriptor(), + databaseReg.createTime, + databaseReg.modifyTime); + } + public boolean databaseExists(String databaseName) { return uncheck( () -> zookeeperClient.databaseExists(databaseName), @@ -230,7 +264,9 @@ public TableInfo getTable(TablePath tablePath) throws TableNotExistException { tablePath, tableReg.tableId, tableReg.toTableDescriptor(schemaInfo.getSchema()), - schemaInfo.getSchemaId()); + schemaInfo.getSchemaId(), + tableReg.createTime, + tableReg.modifyTime); } public SchemaInfo getLatestSchema(TablePath tablePath) throws SchemaNotExistException { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java index 255b9d12..fc07f4da 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java @@ -24,6 +24,7 @@ import com.alibaba.fluss.metadata.TablePath; import com.alibaba.fluss.server.zk.data.BucketSnapshot; import com.alibaba.fluss.server.zk.data.CoordinatorAddress; +import com.alibaba.fluss.server.zk.data.DatabaseRegistration; import com.alibaba.fluss.server.zk.data.LakeTableSnapshot; import com.alibaba.fluss.server.zk.data.LeaderAndIsr; import com.alibaba.fluss.server.zk.data.PartitionAssignment; @@ -259,12 +260,29 @@ public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception { // -------------------------------------------------------------------------------------------- /** Register a database to zk. */ + @Deprecated public void registerDatabase(String database) throws Exception { String path = DatabaseZNode.path(database); zkClient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path); LOG.info("Registered database {}", database); } + public void registerDatabase(String database, DatabaseRegistration databaseRegistration) + throws Exception { + String path = DatabaseZNode.path(database); + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path, DatabaseZNode.encode(databaseRegistration)); + LOG.info("Registered database {}", database); + } + + /** Get the database in ZK. */ + public Optional getDatabase(String database) throws Exception { + Optional bytes = getOrEmpty(DatabaseZNode.path(database)); + return bytes.map(DatabaseZNode::decode); + } + public void deleteDatabase(String database) throws Exception { String path = DatabaseZNode.path(database); zkClient.delete().deletingChildrenIfNeeded().forPath(path); diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/DatabaseRegistration.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/DatabaseRegistration.java new file mode 100644 index 00000000..a07fa009 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/DatabaseRegistration.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.zk.data; + +import com.alibaba.fluss.metadata.DatabaseDescriptor; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.Objects; + +/** + * The registration information of database in {@link ZkData.TableZNode}. It is used to store the + * database information in zookeeper. Basically, it contains the same information with {@link + * com.alibaba.fluss.metadata.DatabaseInfo}. + * + * @see TableRegistrationJsonSerde for json serialization and deserialization. + */ +public class DatabaseRegistration { + public final @Nullable String comment; + public final Map properties; + public final long createTime; + public final long modifyTime; + + public DatabaseRegistration( + @Nullable String comment, + Map properties, + long createTime, + long modifyTime) { + this.comment = comment; + this.properties = properties; + this.createTime = createTime; + this.modifyTime = modifyTime; + } + + public DatabaseDescriptor toDatabaseDescriptor() { + DatabaseDescriptor.Builder builder = DatabaseDescriptor.builder().comment(comment); + properties.forEach(builder::property); + return builder.build(); + } + + public static DatabaseRegistration of(DatabaseDescriptor databaseDescriptor) { + return new DatabaseRegistration( + databaseDescriptor.getComment().orElse(null), + databaseDescriptor.getProperties(), + System.currentTimeMillis(), + System.currentTimeMillis()); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + DatabaseRegistration that = (DatabaseRegistration) o; + return createTime == that.createTime + && Objects.equals(comment, that.comment) + && Objects.equals(properties, that.properties); + } + + @Override + public int hashCode() { + return Objects.hash(comment, properties, createTime); + } + + @Override + public String toString() { + return "DatabaseRegistration{" + + "comment='" + + comment + + '\'' + + ", properties=" + + properties + + ", createTime=" + + createTime + + ", modifyTime=" + + modifyTime + + '}'; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/DatabaseRegistrationJsonSerde.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/DatabaseRegistrationJsonSerde.java new file mode 100644 index 00000000..963d73f5 --- /dev/null +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/DatabaseRegistrationJsonSerde.java @@ -0,0 +1,96 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.zk.data; + +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import com.alibaba.fluss.utils.json.JsonDeserializer; +import com.alibaba.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Json serializer and deserializer for {@link DatabaseRegistration}. */ +public class DatabaseRegistrationJsonSerde + implements JsonSerializer, JsonDeserializer { + + public static final DatabaseRegistrationJsonSerde INSTANCE = + new DatabaseRegistrationJsonSerde(); + + static final String COMMENT_NAME = "comment"; + static final String PROPERTIES_NAME = "properties"; + + static final String CREATE_TIME = "create_time"; + static final String MODIFY_TIME = "modify_time"; + private static final String VERSION_KEY = "version"; + private static final int VERSION = 1; + + @Override + public void serialize(DatabaseRegistration tableReg, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + + // serialize data version. + generator.writeNumberField(VERSION_KEY, VERSION); + + // serialize comment. + if (tableReg.comment != null) { + generator.writeStringField(COMMENT_NAME, tableReg.comment); + } + + // serialize properties. + generator.writeObjectFieldStart(PROPERTIES_NAME); + for (Map.Entry entry : tableReg.properties.entrySet()) { + generator.writeObjectField(entry.getKey(), entry.getValue()); + } + generator.writeEndObject(); + + // serialize create time. + generator.writeNumberField(CREATE_TIME, tableReg.createTime); + // serialize modify time. + generator.writeNumberField(MODIFY_TIME, tableReg.modifyTime); + + generator.writeEndObject(); + } + + @Override + public DatabaseRegistration deserialize(JsonNode node) { + JsonNode commentNode = node.get(COMMENT_NAME); + String comment = null; + if (commentNode != null) { + comment = commentNode.asText(); + } + + Map properties = deserializeProperties(node.get(PROPERTIES_NAME)); + long createTime = node.get(CREATE_TIME).asLong(-1); + long modifyTime = node.get(MODIFY_TIME).asLong(-1); + + return new DatabaseRegistration(comment, properties, createTime, modifyTime); + } + + private Map deserializeProperties(JsonNode node) { + HashMap properties = new HashMap<>(); + Iterator optionsKeys = node.fieldNames(); + while (optionsKeys.hasNext()) { + String key = optionsKeys.next(); + properties.put(key, node.get(key).asText()); + } + return properties; + } +} diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java index 1dfb00f4..74aca077 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java @@ -42,6 +42,8 @@ public class TableRegistration { public final @Nullable TableDescriptor.TableDistribution tableDistribution; public final Map properties; public final Map customProperties; + public final long createTime; + public final long modifyTime; public TableRegistration( long tableId, @@ -49,13 +51,17 @@ public TableRegistration( List partitionKeys, @Nullable TableDescriptor.TableDistribution tableDistribution, Map properties, - Map customProperties) { + Map customProperties, + long createTime, + long modifyTime) { this.tableId = tableId; this.comment = comment; this.partitionKeys = partitionKeys; this.tableDistribution = tableDistribution; this.properties = properties; this.customProperties = customProperties; + this.createTime = createTime; + this.modifyTime = modifyTime; } public TableDescriptor toTableDescriptor(Schema schema) { @@ -81,19 +87,33 @@ public static TableRegistration of(long tableId, TableDescriptor tableDescriptor tableDescriptor.getPartitionKeys(), tableDescriptor.getTableDistribution().orElse(null), tableDescriptor.getProperties(), - tableDescriptor.getCustomProperties()); + tableDescriptor.getCustomProperties(), + System.currentTimeMillis(), + System.currentTimeMillis()); + } + + public static TableRegistration of( + long tableId, TableDescriptor tableDescriptor, long createdTime) { + return new TableRegistration( + tableId, + tableDescriptor.getComment().orElse(null), + tableDescriptor.getPartitionKeys(), + tableDescriptor.getTableDistribution().orElse(null), + tableDescriptor.getProperties(), + tableDescriptor.getCustomProperties(), + createdTime, + System.currentTimeMillis()); } @Override public boolean equals(Object o) { - if (this == o) { - return true; - } if (o == null || getClass() != o.getClass()) { return false; } TableRegistration that = (TableRegistration) o; return tableId == that.tableId + && createTime == that.createTime + && modifyTime == that.modifyTime && Objects.equals(comment, that.comment) && Objects.equals(partitionKeys, that.partitionKeys) && Objects.equals(tableDistribution, that.tableDistribution) @@ -104,7 +124,14 @@ public boolean equals(Object o) { @Override public int hashCode() { return Objects.hash( - tableId, comment, partitionKeys, tableDistribution, properties, customProperties); + tableId, + comment, + partitionKeys, + tableDistribution, + properties, + customProperties, + createTime, + modifyTime); } @Override @@ -123,6 +150,10 @@ public String toString() { + properties + ", customProperties=" + customProperties + + ", createdTime=" + + createTime + + ", modifiedTime=" + + modifyTime + '}'; } } diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerde.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerde.java index 7fe3058a..0f8e240f 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerde.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerde.java @@ -44,9 +44,10 @@ public class TableRegistrationJsonSerde static final String BUCKET_COUNT_NAME = "bucket_count"; static final String PROPERTIES_NAME = "properties"; static final String CUSTOM_PROPERTIES_NAME = "custom_properties"; - + static final String CREATE_TIME = "create_time"; + static final String MODIFY_TIME = "modify_time"; private static final String VERSION_KEY = "version"; - private static final int VERSION = 1; + private static final int VERSION = 2; @Override public void serialize(TableRegistration tableReg, JsonGenerator generator) throws IOException { @@ -97,11 +98,21 @@ public void serialize(TableRegistration tableReg, JsonGenerator generator) throw } generator.writeEndObject(); + // serialize createdTime + generator.writeNumberField(CREATE_TIME, tableReg.createTime); + + // serialize modifiedTime + generator.writeNumberField(MODIFY_TIME, tableReg.createTime); + generator.writeEndObject(); } @Override public TableRegistration deserialize(JsonNode node) { + return deserialize(node, node.get(VERSION_KEY).asInt()); + } + + public TableRegistration deserialize(JsonNode node, int version) { long tableId = node.get(TABLE_ID_NAME).asLong(); JsonNode commentNode = node.get(COMMENT_NAME); @@ -136,8 +147,22 @@ public TableRegistration deserialize(JsonNode node) { Map customProperties = deserializeProperties(node.get(CUSTOM_PROPERTIES_NAME)); + long createdTime = -1; + long modifiedTime = -1; + if (version >= 2) { + createdTime = node.get(CREATE_TIME).asLong(); + modifiedTime = node.get(MODIFY_TIME).asLong(); + } + return new TableRegistration( - tableId, comment, partitionKeys, distribution, properties, customProperties); + tableId, + comment, + partitionKeys, + distribution, + properties, + customProperties, + createdTime, + modifiedTime); } private Map deserializeProperties(JsonNode node) { diff --git a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java index 28233abe..80526563 100644 --- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java @@ -52,6 +52,15 @@ public static final class DatabaseZNode { public static String path(String databaseName) { return DatabasesZNode.path() + "/" + databaseName; } + + public static byte[] encode(DatabaseRegistration databaseRegistration) { + return JsonSerdeUtils.writeValueAsBytes( + databaseRegistration, DatabaseRegistrationJsonSerde.INSTANCE); + } + + public static DatabaseRegistration decode(byte[] json) { + return JsonSerdeUtils.readValue(json, DatabaseRegistrationJsonSerde.INSTANCE); + } } /** diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java index a4571dc7..18ed59ac 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -20,6 +20,7 @@ import com.alibaba.fluss.config.Configuration; import com.alibaba.fluss.exception.FencedLeaderEpochException; import com.alibaba.fluss.exception.InvalidCoordinatorException; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.TableBucket; import com.alibaba.fluss.metadata.TableBucketReplica; @@ -146,7 +147,8 @@ void beforeEach() { autoPartitionManager, TestingMetricGroups.COORDINATOR_METRICS); eventProcessor.startup(); - metaDataManager.createDatabase(defaultDatabase, false); + metaDataManager.createDatabase( + defaultDatabase, DatabaseDescriptor.builder().build(), false); } @AfterEach diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java index a8473dcc..a1315f0b 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java @@ -40,6 +40,8 @@ import com.alibaba.fluss.rpc.messages.DropDatabaseResponse; import com.alibaba.fluss.rpc.messages.DropTableRequest; import com.alibaba.fluss.rpc.messages.DropTableResponse; +import com.alibaba.fluss.rpc.messages.GetDatabaseRequest; +import com.alibaba.fluss.rpc.messages.GetDatabaseResponse; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; import com.alibaba.fluss.rpc.messages.GetKvSnapshotRequest; @@ -139,6 +141,11 @@ public CompletableFuture listDatabases(ListDatabasesReque throw new UnsupportedOperationException(); } + @Override + public CompletableFuture getDatabase(GetDatabaseRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture databaseExists(DatabaseExistsRequest request) { throw new UnsupportedOperationException(); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java index 0e78164c..5961a7a2 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/event/watcher/TableChangeWatcherTest.java @@ -17,6 +17,7 @@ package com.alibaba.fluss.server.coordinator.event.watcher; import com.alibaba.fluss.config.ConfigOptions; +import com.alibaba.fluss.metadata.DatabaseDescriptor; import com.alibaba.fluss.metadata.Schema; import com.alibaba.fluss.metadata.SchemaInfo; import com.alibaba.fluss.metadata.TableDescriptor; @@ -78,7 +79,7 @@ static void beforeAll() { .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); metaDataManager = new MetaDataManager(zookeeperClient); - metaDataManager.createDatabase(DEFAULT_DB, false); + metaDataManager.createDatabase(DEFAULT_DB, DatabaseDescriptor.builder().build(), false); } @BeforeEach diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java index cd9df1d5..4670eb8e 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java @@ -29,6 +29,8 @@ import com.alibaba.fluss.rpc.messages.DescribeLakeStorageResponse; import com.alibaba.fluss.rpc.messages.FetchLogRequest; import com.alibaba.fluss.rpc.messages.FetchLogResponse; +import com.alibaba.fluss.rpc.messages.GetDatabaseRequest; +import com.alibaba.fluss.rpc.messages.GetDatabaseResponse; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse; import com.alibaba.fluss.rpc.messages.GetKvSnapshotRequest; @@ -203,6 +205,11 @@ public CompletableFuture listDatabases(ListDatabasesReque throw new UnsupportedOperationException(); } + @Override + public CompletableFuture getDatabase(GetDatabaseRequest request) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture databaseExists(DatabaseExistsRequest request) { throw new UnsupportedOperationException(); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java index cd88d83b..38bb03da 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/ZooKeeperClientTest.java @@ -170,7 +170,9 @@ void testTable() throws Exception { Arrays.asList("a", "b"), new TableDescriptor.TableDistribution(16, Collections.singletonList("a")), options, - Collections.singletonMap("custom-1", "100")); + Collections.singletonMap("custom-1", "100"), + System.currentTimeMillis(), + System.currentTimeMillis()); zookeeperClient.registerTable(tablePath, tableReg); Optional optionalTable = zookeeperClient.getTable(tablePath); @@ -185,7 +187,9 @@ void testTable() throws Exception { Arrays.asList("a", "b"), new TableDescriptor.TableDistribution(16, Collections.singletonList("a")), options, - Collections.singletonMap("custom-2", "200")); + Collections.singletonMap("custom-2", "200"), + System.currentTimeMillis(), + System.currentTimeMillis()); zookeeperClient.updateTable(tablePath, tableReg); optionalTable = zookeeperClient.getTable(tablePath); assertThat(optionalTable.isPresent()).isTrue(); @@ -332,7 +336,9 @@ void testPartition() throws Exception { Arrays.asList("a", "b"), new TableDescriptor.TableDistribution(16, Collections.singletonList("a")), Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap(), + System.currentTimeMillis(), + System.currentTimeMillis()); zookeeperClient.registerTable(tablePath, tableReg); Set partitions = zookeeperClient.getPartitions(tablePath); diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/DatabaseRegistrationJsonSerdeTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/DatabaseRegistrationJsonSerdeTest.java new file mode 100644 index 00000000..d6c51208 --- /dev/null +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/DatabaseRegistrationJsonSerdeTest.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2024 Alibaba Group Holding Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.alibaba.fluss.server.zk.data; + +import com.alibaba.fluss.shaded.guava32.com.google.common.collect.Maps; +import com.alibaba.fluss.utils.json.JsonSerdeTestBase; + +import java.util.Collections; + +/** Test for {@link DatabaseRegistrationJsonSerde}. */ +public class DatabaseRegistrationJsonSerdeTest extends JsonSerdeTestBase { + DatabaseRegistrationJsonSerdeTest() { + super(DatabaseRegistrationJsonSerde.INSTANCE); + } + + @Override + protected DatabaseRegistration[] createObjects() { + DatabaseRegistration[] databaseRegistrations = new DatabaseRegistration[2]; + + databaseRegistrations[0] = + new DatabaseRegistration( + null, + Collections.singletonMap("option-3", "300"), + 1735538268L, + 1735538268L); + + databaseRegistrations[1] = + new DatabaseRegistration("second-table", Maps.newHashMap(), -1, -1); + + return databaseRegistrations; + } + + @Override + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"properties\":{\"option-3\":\"300\"},\"create_time\":1735538268,\"modify_time\":1735538268}", + "{\"version\":1,\"comment\":\"second-table\",\"properties\":{},\"create_time\":-1,\"modify_time\":-1}", + }; + } +} diff --git a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerdeTest.java b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerdeTest.java index fe97e19c..54ec4f1e 100644 --- a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerdeTest.java +++ b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/TableRegistrationJsonSerdeTest.java @@ -40,7 +40,9 @@ protected TableRegistration[] createObjects() { Arrays.asList("a", "b"), new TableDistribution(16, Arrays.asList("b", "c")), Maps.newHashMap(), - Collections.singletonMap("custom-3", "\"300\"")); + Collections.singletonMap("custom-3", "\"300\""), + 1735538268L, + 1735538268L); tableRegistrations[1] = new TableRegistration( @@ -49,7 +51,9 @@ protected TableRegistration[] createObjects() { Collections.emptyList(), null, Collections.singletonMap("option-3", "300"), - Maps.newHashMap()); + Maps.newHashMap(), + -1, + -1); return tableRegistrations; } @@ -57,9 +61,9 @@ protected TableRegistration[] createObjects() { @Override protected String[] expectedJsons() { return new String[] { - "{\"version\":1,\"table_id\":1234,\"comment\":\"first-table\",\"partition_key\":[\"a\",\"b\"]," - + "\"bucket_key\":[\"b\",\"c\"],\"bucket_count\":16,\"properties\":{},\"custom_properties\":{\"custom-3\":\"\\\"300\\\"\"}}", - "{\"version\":1,\"table_id\":1234,\"comment\":\"second-table\",\"partition_key\":[],\"properties\":{\"option-3\":\"300\"},\"custom_properties\":{}}", + "{\"version\":2,\"table_id\":1234,\"comment\":\"first-table\",\"partition_key\":[\"a\",\"b\"]," + + "\"bucket_key\":[\"b\",\"c\"],\"bucket_count\":16,\"properties\":{},\"custom_properties\":{\"custom-3\":\"\\\"300\\\"\"},\"create_time\":1735538268,\"modify_time\":1735538268}", + "{\"version\":2,\"table_id\":1234,\"comment\":\"second-table\",\"partition_key\":[],\"properties\":{\"option-3\":\"300\"},\"custom_properties\":{},\"create_time\":-1,\"modify_time\":-1}", }; } }