Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[admin] Add create_time to database and table metadata. #289

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.SchemaInfo;
Expand Down Expand Up @@ -95,9 +97,44 @@ public interface Admin extends AutoCloseable {
* @throws InvalidDatabaseException if the database name is invalid, e.g., contains illegal
* characters, or exceeds the maximum length.
*/
@Deprecated
CompletableFuture<Void> createDatabase(String databaseName, boolean ignoreIfExists)
throws InvalidDatabaseException;

/**
* Create a new database asynchronously.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link DatabaseAlreadyExistException} if the database already exists and {@code
* ignoreIfExists} is false.
* </ul>
*
* @param databaseName The name of the database to create.
* @param databaseDescriptor The descriptor of the database to create.
* @param ignoreIfExists Flag to specify behavior when a database with the given name already
* exists: if set to false, throw a DatabaseAlreadyExistException, if set to true, do
* nothing.
* @throws InvalidDatabaseException if the database name is invalid, e.g., contains illegal
* characters, or exceeds the maximum length.
*/
CompletableFuture<Void> createDatabase(
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists);

/**
* Get the database with the given database name asynchronously.
*
* <p>The following exceptions can be anticipated when calling {@code get()} on returned future.
*
* <ul>
* <li>{@link DatabaseNotExistException} if the database does not exist.
* </ul>
*
* @param databaseName The database name of the database.
*/
CompletableFuture<DatabaseInfo> getDatabase(String databaseName);

/**
* Delete the database with the given name asynchronously.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.alibaba.fluss.client.table.snapshot.PartitionSnapshotInfo;
import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
import com.alibaba.fluss.lakehouse.LakeStorageInfo;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.PhysicalTablePath;
import com.alibaba.fluss.metadata.Schema;
Expand All @@ -41,6 +43,7 @@
import com.alibaba.fluss.rpc.messages.DescribeLakeStorageRequest;
import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
import com.alibaba.fluss.rpc.messages.DropTableRequest;
import com.alibaba.fluss.rpc.messages.GetDatabaseRequest;
import com.alibaba.fluss.rpc.messages.GetKvSnapshotRequest;
import com.alibaba.fluss.rpc.messages.GetLakeTableSnapshotRequest;
import com.alibaba.fluss.rpc.messages.GetPartitionSnapshotRequest;
Expand Down Expand Up @@ -120,16 +123,39 @@ public CompletableFuture<SchemaInfo> getTableSchema(TablePath tablePath, int sch

@Override
public CompletableFuture<Void> createDatabase(String databaseName, boolean ignoreIfExists) {
return createDatabase(databaseName, DatabaseDescriptor.builder().build(), ignoreIfExists);
}

@Override
public CompletableFuture<Void> createDatabase(
String databaseName, DatabaseDescriptor databaseDescriptor, boolean ignoreIfExists) {
TablePath.validateDatabaseName(databaseName);
CreateDatabaseRequest request = new CreateDatabaseRequest();
request.setDatabaseName(databaseName).setIgnoreIfExists(ignoreIfExists);
request.setDatabaseJson(databaseDescriptor.toJsonBytes())
.setDatabaseName(databaseName)
.setIgnoreIfExists(ignoreIfExists);
return gateway.createDatabase(request).thenApply(r -> null);
}

@Override
public CompletableFuture<DatabaseInfo> getDatabase(String databaseName) {
GetDatabaseRequest request = new GetDatabaseRequest();
request.setDatabaseName(databaseName);
return gateway.getDatabase(request)
.thenApply(
r ->
new DatabaseInfo(
databaseName,
DatabaseDescriptor.fromJsonBytes(r.getDatabaseJson()),
r.getCreateTime(),
r.getModifyTime()));
}

@Override
public CompletableFuture<Void> deleteDatabase(
String databaseName, boolean ignoreIfNotExists, boolean cascade) {
DropDatabaseRequest request = new DropDatabaseRequest();

request.setIgnoreIfNotExists(ignoreIfNotExists)
.setCascade(cascade)
.setDatabaseName(databaseName);
Expand Down Expand Up @@ -176,7 +202,9 @@ public CompletableFuture<TableInfo> getTable(TablePath tablePath) {
tablePath,
r.getTableId(),
TableDescriptor.fromJsonBytes(r.getTableJson()),
r.getSchemaId()));
r.getSchemaId(),
r.getCreateTime(),
r.getModifyTime()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import com.alibaba.fluss.exception.TableNotExistException;
import com.alibaba.fluss.exception.TableNotPartitionedException;
import com.alibaba.fluss.fs.FsPathAndFileName;
import com.alibaba.fluss.metadata.DatabaseDescriptor;
import com.alibaba.fluss.metadata.DatabaseInfo;
import com.alibaba.fluss.metadata.PartitionInfo;
import com.alibaba.fluss.metadata.Schema;
import com.alibaba.fluss.metadata.SchemaInfo;
Expand Down Expand Up @@ -106,18 +108,41 @@ void testMultiClient() throws Exception {
admin2.close();
}

@Test
void testGetDatabase() throws Exception {
long timestampBeforeCreate = System.currentTimeMillis();
admin.createDatabase(
"test_db_2",
DatabaseDescriptor.builder()
.comment("test comment")
.property("key1", "value1")
.build(),
false);
DatabaseInfo databaseInfo = admin.getDatabase("test_db_2").get();
long timestampAfterCreate = System.currentTimeMillis();
assertThat(databaseInfo.getDatabaseName()).isEqualTo("test_db_2");
assertThat(databaseInfo.getDatabaseDescriptor().getComment().get())
.isEqualTo("test comment");
assertThat(databaseInfo.getDatabaseDescriptor().getProperties())
.containsEntry("key1", "value1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert the properties size is 1

assertThat(databaseInfo.getCreateTime())
.isBetween(timestampBeforeCreate, timestampAfterCreate);
}

@Test
void testGetTableAndSchema() throws Exception {
SchemaInfo schemaInfo = admin.getTableSchema(DEFAULT_TABLE_PATH).get();
assertThat(schemaInfo.getSchema()).isEqualTo(DEFAULT_SCHEMA);
assertThat(schemaInfo.getSchemaId()).isEqualTo(1);
SchemaInfo schemaInfo2 = admin.getTableSchema(DEFAULT_TABLE_PATH, 1).get();
assertThat(schemaInfo2).isEqualTo(schemaInfo);

// get default table.
long timestampAfterCreate = System.currentTimeMillis();
TableInfo tableInfo = admin.getTable(DEFAULT_TABLE_PATH).get();
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR);
assertThat(schemaInfo2).isEqualTo(schemaInfo);
assertThat(tableInfo.getCreateTime()).isLessThan(timestampAfterCreate);

// unknown table
assertThatThrownBy(() -> admin.getTable(TablePath.of("test_db", "unknown_table")).get())
Expand All @@ -127,6 +152,18 @@ void testGetTableAndSchema() throws Exception {
() -> admin.getTableSchema(TablePath.of("test_db", "unknown_table")).get())
.cause()
.isInstanceOf(SchemaNotExistException.class);

// create and get a new table
long timestampBeforeCreate = System.currentTimeMillis();
TablePath tablePath = TablePath.of("test_db", "table_2");
admin.createTable(tablePath, DEFAULT_TABLE_DESCRIPTOR, false);
tableInfo = admin.getTable(tablePath).get();
timestampAfterCreate = System.currentTimeMillis();
assertThat(tableInfo.getSchemaId()).isEqualTo(schemaInfo.getSchemaId());
assertThat(tableInfo.getTableDescriptor()).isEqualTo(DEFAULT_TABLE_DESCRIPTOR);
assertThat(schemaInfo2).isEqualTo(schemaInfo);
assertThat(tableInfo.getCreateTime())
.isBetween(timestampBeforeCreate, timestampAfterCreate);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.metadata;

import com.alibaba.fluss.annotation.PublicEvolving;
import com.alibaba.fluss.config.ConfigOption;
import com.alibaba.fluss.config.ConfigurationUtils;
import com.alibaba.fluss.utils.Preconditions;
import com.alibaba.fluss.utils.json.DatabaseDescriptorJsonSerde;
import com.alibaba.fluss.utils.json.JsonSerdeUtils;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

/**
* Represents the metadata of a database in Fluss.
*
* <p>It contains all characteristics that can be expressed in a SQL {@code CREATE Database}
* statement, such as schema, primary keys, partition keys, bucket keys, and options.
*
* @since 0.6
*/
public class DatabaseDescriptor {
private final Map<String, String> properties;
private final String comment;

private DatabaseDescriptor(Map<String, String> properties, @Nullable String comment) {
this.properties = properties;
this.comment = comment;
}

public Map<String, String> getProperties() {
return properties;
}

public Optional<String> getComment() {
return Optional.ofNullable(comment);
}

/**
* Serialize the table descriptor to a JSON byte array.
*
* @see DatabaseDescriptorJsonSerde
*/
public byte[] toJsonBytes() {
return JsonSerdeUtils.writeValueAsBytes(this, DatabaseDescriptorJsonSerde.INSTANCE);
}

/**
* Deserialize from JSON byte array to an instance of {@link DatabaseDescriptor}.
*
* @see DatabaseDescriptor
*/
public static DatabaseDescriptor fromJsonBytes(byte[] json) {
return JsonSerdeUtils.readValue(json, DatabaseDescriptorJsonSerde.INSTANCE);
}

/** Creates a builder for building database descriptor. */
public static Builder builder() {
return new Builder();
}

// ---------------------------------------------------------------------------------------------

/** Builder for {@link TableDescriptor}. */
@PublicEvolving
public static class Builder {

private final Map<String, String> properties;
private @Nullable String comment;

protected Builder() {
this.properties = new HashMap<>();
}

protected Builder(TableDescriptor descriptor) {
this.properties = new HashMap<>(descriptor.getProperties());
this.comment = descriptor.getComment().orElse(null);
}

/**
* Sets table property on the table.
*
* <p>Table properties are controlled by Fluss and will change the behavior of the table.
*/
public <T> Builder property(ConfigOption<T> configOption, T value) {
Preconditions.checkNotNull(configOption, "Config option must not be null.");
Preconditions.checkNotNull(value, "Value must not be null.");
properties.put(
configOption.key(), ConfigurationUtils.convertValue(value, String.class));
return this;
}

/**
* Sets table property on the table.
*
* <p>Table properties are controlled by Fluss and will change the behavior of the table.
*/
public Builder property(String key, String value) {
Preconditions.checkNotNull(key, "Key must not be null.");
Preconditions.checkNotNull(value, "Value must not be null.");
properties.put(key, value);
return this;
}

/**
* Sets table properties on the table.
*
* <p>Table properties are controlled by Fluss and will change the behavior of the table.
*/
public Builder properties(Map<String, String> properties) {
Preconditions.checkNotNull(properties, "properties must not be null.");
this.properties.putAll(properties);
return this;
}

/** Define the comment for this table. */
public Builder comment(@Nullable String comment) {
this.comment = comment;
return this;
}

/** Returns an immutable instance of {@link TableDescriptor}. */
public DatabaseDescriptor build() {
return new DatabaseDescriptor(properties, comment);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.fluss.metadata;

/**
* Information of a database metadata, includes {@link DatabaseDescriptor}.
*
* @since 0.6
*/
public class DatabaseInfo {
private final String databaseName;
private final DatabaseDescriptor databaseDescriptor;
private final long createTime;
private final long modifyTime;

public DatabaseInfo(
String databaseName,
DatabaseDescriptor databaseDescriptor,
long createTime,
long modifyTime) {
this.databaseName = databaseName;
this.databaseDescriptor = databaseDescriptor;
this.createTime = createTime;
this.modifyTime = modifyTime;
}

public String getDatabaseName() {
return databaseName;
}

public DatabaseDescriptor getDatabaseDescriptor() {
return databaseDescriptor;
}

public long getCreateTime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCreatedTime

return createTime;
}

public long getModifyTime() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getModifiedTime

return modifyTime;
}
}
Loading