diff --git a/core/src/main/java/org/apache/gravitino/listener/ModelEventDispatcher.java b/core/src/main/java/org/apache/gravitino/listener/ModelEventDispatcher.java new file mode 100644 index 00000000000..7e9b0ca7532 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/ModelEventDispatcher.java @@ -0,0 +1,400 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener; + +import java.util.Map; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.catalog.ModelDispatcher; +import org.apache.gravitino.exceptions.ModelAlreadyExistsException; +import org.apache.gravitino.exceptions.ModelVersionAliasesAlreadyExistException; +import org.apache.gravitino.exceptions.NoSuchModelException; +import org.apache.gravitino.exceptions.NoSuchModelVersionException; +import org.apache.gravitino.exceptions.NoSuchSchemaException; +import org.apache.gravitino.listener.api.event.DropModelEvent; +import org.apache.gravitino.listener.api.event.DropModelVersionEvent; +import org.apache.gravitino.listener.api.event.GetModelEvent; +import org.apache.gravitino.listener.api.event.GetModelVersionEvent; +import org.apache.gravitino.listener.api.event.LinkModelVersionEvent; +import org.apache.gravitino.listener.api.event.ListModelEvent; +import org.apache.gravitino.listener.api.event.ListModelVersionsEvent; +import org.apache.gravitino.listener.api.event.RegisterModelEvent; +import org.apache.gravitino.listener.api.info.ModelInfo; +import org.apache.gravitino.listener.api.info.ModelVersionInfo; +import org.apache.gravitino.model.Model; +import org.apache.gravitino.model.ModelCatalog; +import org.apache.gravitino.model.ModelVersion; +import org.apache.gravitino.utils.PrincipalUtils; + +/** + * {@code ModelEventDispatcher} is a decorator for {@link ModelDispatcher} that not only delegates + * model operations to the underlying catalog dispatcher but also dispatches corresponding events to + * an {@link org.apache.gravitino.listener.EventBus} after each operation is completed. This allows + * for event-driven workflows or monitoring of model operations. + */ +public class ModelEventDispatcher implements ModelDispatcher { + private final EventBus eventBus; + private final ModelDispatcher dispatcher; + + /** + * Constructs a {@link ModelEventDispatcher} with a specified EventBus and {@link + * ModelDispatcher}. + * + * @param eventBus The EventBus to which events will be dispatched. + * @param dispatcher The underlying {@link ModelDispatcher} that will perform the actual model + * operations. + */ + public ModelEventDispatcher(EventBus eventBus, ModelDispatcher dispatcher) { + this.eventBus = eventBus; + this.dispatcher = dispatcher; + } + + /** + * Register a model in the catalog if the model is not existed, otherwise the {@link + * ModelAlreadyExistsException} will be thrown. The {@link Model} object will be created when the + * model is registered, users can call {@link ModelCatalog#linkModelVersion(NameIdentifier, + * String, String[], String, Map)} to link the model version to the registered {@link Model}. + * + * @param ident The name identifier of the model. + * @param comment The comment of the model. The comment is optional and can be null. + * @param properties The properties of the model. The properties are optional and can be null or + * empty. + * @return The registered model object. + * @throws NoSuchSchemaException If the schema does not exist. + * @throws ModelAlreadyExistsException If the model already registered. + */ + @Override + public Model registerModel(NameIdentifier ident, String comment, Map properties) + throws NoSuchSchemaException, ModelAlreadyExistsException { + // TODO: preEvent + try { + Model model = dispatcher.registerModel(ident, comment, properties); + ModelInfo modelInfo = new ModelInfo(model); + eventBus.dispatchEvent( + new RegisterModelEvent(PrincipalUtils.getCurrentUserName(), ident, modelInfo)); + return model; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Register a model in the catalog if the model is not existed, otherwise the {@link + * ModelAlreadyExistsException} will be thrown. The {@link Model} object will be created when the + * model is registered, in the meantime, the model version (version 0) will also be created and + * linked to the registered model. + * + * @param ident The name identifier of the model. + * @param uri The model artifact URI. + * @param aliases The aliases of the model version. The aliases should be unique in this model, + * otherwise the {@link ModelVersionAliasesAlreadyExistException} will be thrown. The aliases + * are optional and can be empty. Also, be aware that the alias cannot be a number or a number + * string. + * @param comment The comment of the model. The comment is optional and can be null. + * @param properties The properties of the model. The properties are optional and can be null or + * empty. + * @return The registered model object. + * @throws NoSuchSchemaException If the schema does not exist when register a model. + * @throws ModelAlreadyExistsException If the model already registered. + * @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model. + */ + @Override + public Model registerModel( + NameIdentifier ident, + String uri, + String[] aliases, + String comment, + Map properties) + throws NoSuchSchemaException, ModelAlreadyExistsException, + ModelVersionAliasesAlreadyExistException { + return dispatcher.registerModel(ident, uri, aliases, comment, properties); + } + + /** + * Get a model metadata by {@link NameIdentifier} from the catalog. + * + * @param ident A model identifier. + * @return The model metadata. + * @throws NoSuchModelException If the model does not exist. + */ + @Override + public Model getModel(NameIdentifier ident) throws NoSuchModelException { + // TODO: preEvent + try { + Model model = dispatcher.getModel(ident); + ModelInfo modelInfo = new ModelInfo(model); + eventBus.dispatchEvent( + new GetModelEvent(PrincipalUtils.getCurrentUserName(), ident, modelInfo)); + return model; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Delete the model from the catalog. If the model does not exist, return false. Otherwise, return + * true. The deletion of the model will also delete all the model versions linked to this model. + * + * @param ident The name identifier of the model. + * @return True if the model is deleted, false if the model does not exist. + */ + @Override + public boolean deleteModel(NameIdentifier ident) { + // TODO: preEvent + try { + boolean isExists = dispatcher.deleteModel(ident); + eventBus.dispatchEvent( + new DropModelEvent(PrincipalUtils.getCurrentUserName(), ident, isExists)); + return isExists; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * List the models in a schema namespace from the catalog. + * + * @param namespace A schema namespace. + * @return An array of model identifiers in the namespace. + * @throws NoSuchModelException If the model does not exist. + */ + @Override + public NameIdentifier[] listModels(Namespace namespace) throws NoSuchSchemaException { + // TODO: preEvent + try { + NameIdentifier[] models = dispatcher.listModels(namespace); + eventBus.dispatchEvent(new ListModelEvent(PrincipalUtils.getCurrentUserName(), namespace)); + return models; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Link a new model version to the registered model object. The new model version will be added to + * the model object. If the model object does not exist, it will throw an exception. If the + * version alias already exists in the model, it will throw an exception. + * + * @param ident The name identifier of the model. + * @param uri The URI of the model version artifact. + * @param aliases The aliases of the model version. The aliases should be unique in this model, + * otherwise the {@link ModelVersionAliasesAlreadyExistException} will be thrown. The aliases + * are optional and can be empty. Also, be aware that the alias cannot be a number or a number + * string. + * @param comment The comment of the model version. The comment is optional and can be null. + * @param properties The properties of the model version. The properties are optional and can be + * null or empty. + * @throws NoSuchModelException If the model does not exist. + * @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model. + */ + @Override + public void linkModelVersion( + NameIdentifier ident, + String uri, + String[] aliases, + String comment, + Map properties) + throws NoSuchModelException, ModelVersionAliasesAlreadyExistException { + // TODO: preEvent + try { + Model model = dispatcher.getModel(ident); + ModelVersionInfo modelVersionInfo = new ModelVersionInfo(uri, aliases, comment, properties); + ModelInfo modelInfo = new ModelInfo(model, new ModelVersionInfo[] {modelVersionInfo}); + dispatcher.linkModelVersion(ident, uri, aliases, comment, properties); + eventBus.dispatchEvent( + new LinkModelVersionEvent(PrincipalUtils.getCurrentUserName(), ident, modelInfo)); + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Get a model version by the {@link NameIdentifier} and version number from the catalog. + * + * @param ident The name identifier of the model. + * @param version The version number of the model. + * @return The model version object. + * @throws NoSuchModelVersionException If the model version does not exist. + */ + @Override + public ModelVersion getModelVersion(NameIdentifier ident, int version) + throws NoSuchModelVersionException { + // TODO: preEvent + try { + ModelVersion modelVersion = dispatcher.getModelVersion(ident, version); + ModelInfo modelInfo = getModelInfo(ident, version); + eventBus.dispatchEvent( + new GetModelVersionEvent(PrincipalUtils.getCurrentUserName(), ident, modelInfo)); + return modelVersion; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Get a model version by the {@link NameIdentifier} and version alias from the catalog. + * + * @param ident The name identifier of the model. + * @param alias The version alias of the model. + * @return The model version object. + * @throws NoSuchModelVersionException If the model version does not exist. + */ + @Override + public ModelVersion getModelVersion(NameIdentifier ident, String alias) + throws NoSuchModelVersionException { + // TODO: preEvent + try { + ModelVersion modelVersion = dispatcher.getModelVersion(ident, alias); + ModelInfo modelInfo = getModelInfo(ident, alias); + eventBus.dispatchEvent( + new GetModelVersionEvent(PrincipalUtils.getCurrentUserName(), ident, modelInfo)); + return modelVersion; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Delete the model version by the {@link NameIdentifier} and version number. If the model version + * does not exist, return false. If the model version is deleted, return true. + * + * @param ident The name identifier of the model. + * @param version The version number of the model. + * @return True if the model version is deleted, false if the model version does not exist. + */ + @Override + public boolean deleteModelVersion(NameIdentifier ident, int version) { + // TODO: preEvent + try { + boolean isExists = dispatcher.deleteModelVersion(ident, version); + eventBus.dispatchEvent( + new DropModelVersionEvent(PrincipalUtils.getCurrentUserName(), ident, isExists)); + return isExists; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Delete the model version by the {@link NameIdentifier} and version alias. If the model version + * does not exist, return false. If the model version is deleted, return true. + * + * @param ident The name identifier of the model. + * @param alias The version alias of the model. + * @return True if the model version is deleted, false if the model version does not exist. + */ + @Override + public boolean deleteModelVersion(NameIdentifier ident, String alias) { + // TODO: preEvent + try { + boolean isExists = dispatcher.deleteModelVersion(ident, alias); + eventBus.dispatchEvent( + new DropModelVersionEvent(PrincipalUtils.getCurrentUserName(), ident, isExists)); + return isExists; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * List all the versions of the register model by {@link NameIdentifier} in the catalog. + * + * @param ident The name identifier of the model. + * @return An array of version numbers of the model. + * @throws NoSuchModelException If the model does not exist. + */ + @Override + public int[] listModelVersions(NameIdentifier ident) throws NoSuchModelException { + // TODO: preEvent + try { + int[] versions = dispatcher.listModelVersions(ident); + Model model = dispatcher.getModel(ident); + ModelInfo modelInfo = new ModelInfo(model); + eventBus.dispatchEvent( + new ListModelVersionsEvent(PrincipalUtils.getCurrentUserName(), ident, modelInfo)); + return versions; + } catch (Exception e) { + // TODO: failureEvent + throw e; + } + } + + /** + * Check if a model exists using an {@link NameIdentifier} from the catalog. + * + * @param ident A model identifier. + * @return true If the model exists, false if the model does not exist. + */ + @Override + public boolean modelExists(NameIdentifier ident) { + return dispatcher.modelExists(ident); + } + + /** + * Check if the model version exists by the {@link NameIdentifier} and version number. If the + * model version exists, return true, otherwise return false. + * + * @param ident The name identifier of the model. + * @param version The version number of the model. + * @return True if the model version exists, false if the model version does not exist. + */ + @Override + public boolean modelVersionExists(NameIdentifier ident, int version) { + return dispatcher.modelVersionExists(ident, version); + } + + /** + * Check if the model version exists by the {@link NameIdentifier} and version alias. If the model + * version exists, return true, otherwise return false. + * + * @param ident The name identifier of the model. + * @param alias The version alias of the model. + * @return True if the model version exists, false if the model version does not exist. + */ + @Override + public boolean modelVersionExists(NameIdentifier ident, String alias) { + return dispatcher.modelVersionExists(ident, alias); + } + + private ModelInfo getModelInfo(NameIdentifier modelIdent, int version) { + Model model = getModel(modelIdent); + ModelVersion modelVersion = dispatcher.getModelVersion(modelIdent, version); + ModelVersionInfo modelVersionInfo = new ModelVersionInfo(modelVersion); + + return new ModelInfo(model, new ModelVersionInfo[] {modelVersionInfo}); + } + + private ModelInfo getModelInfo(NameIdentifier modelIdent, String alias) { + Model model = getModel(modelIdent); + ModelVersion modelVersion = dispatcher.getModelVersion(modelIdent, alias); + ModelVersionInfo modelVersionInfo = new ModelVersionInfo(modelVersion); + + return new ModelInfo(model, new ModelVersionInfo[] {modelVersionInfo}); + } +} diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelEvent.java index 5a063ea0ee6..7a8ec3cb95a 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelEvent.java @@ -20,11 +20,9 @@ package org.apache.gravitino.listener.api.event; import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.listener.api.info.ModelInfo; /** Event representing the successful drop of a model. */ public class DropModelEvent extends ModelEvent { - private final ModelInfo dropModelInfo; private final boolean isExists; /** @@ -33,26 +31,14 @@ public class DropModelEvent extends ModelEvent { * * @param user The username of the individual who initiated the model drop operation. * @param identifier The unique identifier of the model that was dropped. - * @param dropModelInfo The state of the model post-drop operation. * @param isExists A boolean flag indicating whether the model existed at the time of the drop * operation. */ - public DropModelEvent( - String user, NameIdentifier identifier, ModelInfo dropModelInfo, boolean isExists) { + public DropModelEvent(String user, NameIdentifier identifier, boolean isExists) { super(user, identifier); - this.dropModelInfo = dropModelInfo; this.isExists = isExists; } - /** - * Retrieves the state of the model post-drop operation. - * - * @return The state of the model post-drop operation. - */ - public ModelInfo DropModelInfo() { - return dropModelInfo; - } - /** * Retrieves the existence status of the model at the time of the drop operation. * @@ -72,4 +58,14 @@ public boolean isExists() { public OperationType operationType() { return OperationType.DROP_MODEL; } + + /** + * The status of the operation. + * + * @return The operation status. + */ + @Override + public OperationStatus operationStatus() { + return isExists() ? OperationStatus.SUCCESS : OperationStatus.UNPROCESSED; + } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelVersionEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelVersionEvent.java index d7118565fac..e74b4416fe8 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelVersionEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/DropModelVersionEvent.java @@ -20,7 +20,6 @@ package org.apache.gravitino.listener.api.event; import org.apache.gravitino.NameIdentifier; -import org.apache.gravitino.listener.api.info.ModelInfo; /** * Represents an event that is generated after a model version is successfully dropped from the @@ -28,7 +27,6 @@ */ public class DropModelVersionEvent extends ModelEvent { - private final ModelInfo dropModelVersionInfo; private final boolean isExists; /** @@ -40,22 +38,11 @@ public class DropModelVersionEvent extends ModelEvent { * @param isExists A boolean flag indicating whether the model version existed at the time of the * drop operation. */ - public DropModelVersionEvent( - String user, NameIdentifier identifier, ModelInfo dropModelVersionInfo, boolean isExists) { + public DropModelVersionEvent(String user, NameIdentifier identifier, boolean isExists) { super(user, identifier); - this.dropModelVersionInfo = dropModelVersionInfo; this.isExists = isExists; } - /** - * Retrieves the state of the model after the drop version operation. - * - * @return The state of the model after the drop version operation. - */ - public ModelInfo DropModelVersionInfo() { - return dropModelVersionInfo; - } - /** * Retrieves the existence status of the model version at the time of the drop operation. * @@ -75,4 +62,9 @@ public boolean isExists() { public OperationType operationType() { return OperationType.DROP_MODEL_VERSION; } + + @Override + public OperationStatus operationStatus() { + return isExists() ? OperationStatus.SUCCESS : OperationStatus.UNPROCESSED; + } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/GetModelVersionEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/GetModelVersionEvent.java index 527fdd485e9..1f26b949bc0 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/GetModelVersionEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/GetModelVersionEvent.java @@ -24,19 +24,18 @@ /** Represents an event triggered upon the successful getting the version of a model. */ public class GetModelVersionEvent extends ModelEvent { - public final ModelInfo getModelVersionInfo; + public final ModelInfo modelInfo; /** * Constructs an instance of {@code GetModelVersionEvent}. * * @param user The username of the individual who initiated the get model version event. * @param identifier The unique identifier of the model that was getting the version. - * @param getModelVersionInfo The state of the model after the version was loaded. + * @param modelInfo The state of the model after the version was loaded. */ - public GetModelVersionEvent( - String user, NameIdentifier identifier, ModelInfo getModelVersionInfo) { + public GetModelVersionEvent(String user, NameIdentifier identifier, ModelInfo modelInfo) { super(user, identifier); - this.getModelVersionInfo = getModelVersionInfo; + this.modelInfo = modelInfo; } /** @@ -46,7 +45,7 @@ public GetModelVersionEvent( * @return A {@link ModelInfo} instance encapsulating the details of the model version. */ public ModelInfo getModelVersionInfo() { - return getModelVersionInfo; + return modelInfo; } /** diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/LinkModelVersionEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/LinkModelVersionEvent.java index 17eae5d5de0..7683c8fe701 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/LinkModelVersionEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/LinkModelVersionEvent.java @@ -24,7 +24,7 @@ /** Represents an event triggered upon the successful linking of a model version. */ public class LinkModelVersionEvent extends ModelEvent { - private ModelInfo linkModelVersionInfo; + private ModelInfo modelInfo; /** * Constructs an instance of {@code LinkModelVersionEvent}, capturing essential details about the @@ -32,12 +32,11 @@ public class LinkModelVersionEvent extends ModelEvent { * * @param user The username of the individual who initiated the model version linking. * @param identifier The unique identifier of the model that was linked. - * @param linkModelVersionInfo The final state of the model after linking. + * @param modelInfo The final state of the model after linking. */ - public LinkModelVersionEvent( - String user, NameIdentifier identifier, ModelInfo linkModelVersionInfo) { + public LinkModelVersionEvent(String user, NameIdentifier identifier, ModelInfo modelInfo) { super(user, identifier); - this.linkModelVersionInfo = linkModelVersionInfo; + this.modelInfo = modelInfo; } /** @@ -48,7 +47,7 @@ public LinkModelVersionEvent( * version. */ public ModelInfo linkModelVersionInfo() { - return linkModelVersionInfo; + return modelInfo; } /** diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/ListModelVersionsEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/ListModelVersionsEvent.java index 5892b2d73ea..029558f695e 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/ListModelVersionsEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/ListModelVersionsEvent.java @@ -27,19 +27,18 @@ */ public class ListModelVersionsEvent extends ModelEvent { - private final ModelInfo listModelVersionInfo; + private final ModelInfo modelInfo; /** * Constructs an instance of {@code ListModelVersionsEvent}. * * @param user The username of the individual who initiated the model version listing. * @param identifier The unique identifier of the model that it's version was listed. - * @param listModelVersionInfo The model information containing the list of model versions. + * @param modelInfo The model information containing the list of model versions. */ - public ListModelVersionsEvent( - String user, NameIdentifier identifier, ModelInfo listModelVersionInfo) { + public ListModelVersionsEvent(String user, NameIdentifier identifier, ModelInfo modelInfo) { super(user, identifier); - this.listModelVersionInfo = listModelVersionInfo; + this.modelInfo = modelInfo; } /** @@ -48,7 +47,7 @@ public ListModelVersionsEvent( * @return A {@link ModelInfo} instance containing the list of model versions. */ public ModelInfo listModelVersionInfo() { - return listModelVersionInfo; + return modelInfo; } /** diff --git a/core/src/main/java/org/apache/gravitino/listener/api/event/RegisterModelEvent.java b/core/src/main/java/org/apache/gravitino/listener/api/event/RegisterModelEvent.java index feeb1a1dfdf..376c79e4267 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/event/RegisterModelEvent.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/event/RegisterModelEvent.java @@ -35,8 +35,7 @@ public class RegisterModelEvent extends ModelEvent { * details such as the metalake, catalog, schema, and Model name. * @param registeredTopicInfo The final state of the model post-creation. */ - protected RegisterModelEvent( - String user, NameIdentifier identifier, ModelInfo registeredTopicInfo) { + public RegisterModelEvent(String user, NameIdentifier identifier, ModelInfo registeredTopicInfo) { super(user, identifier); this.registeredTopicInfo = registeredTopicInfo; } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/info/ModelInfo.java b/core/src/main/java/org/apache/gravitino/listener/api/info/ModelInfo.java index c62dacc942c..c5aeaeb1af5 100644 --- a/core/src/main/java/org/apache/gravitino/listener/api/info/ModelInfo.java +++ b/core/src/main/java/org/apache/gravitino/listener/api/info/ModelInfo.java @@ -25,7 +25,6 @@ import org.apache.gravitino.Audit; import org.apache.gravitino.annotation.DeveloperApi; import org.apache.gravitino.model.Model; -import org.apache.gravitino.model.ModelVersion; /** * ModelInfo exposes model information for event listener, it's supposed to be read only. Most of @@ -38,7 +37,7 @@ public class ModelInfo { @Getter private final Map properties; @Nullable private final Audit audit; @Getter private final int lastVersion; - private final ModelVersion[] versions; + private final ModelVersionInfo[] versions; /** * Constructs model information based on a given model. @@ -55,7 +54,7 @@ public ModelInfo(Model model) { * @param model the model to expose information for. * @param modelVersions the versions of the model. */ - public ModelInfo(Model model, ModelVersion[] modelVersions) { + public ModelInfo(Model model, ModelVersionInfo[] modelVersions) { this.name = model.name(); this.properties = model.properties(); this.comment = model.comment(); @@ -90,7 +89,7 @@ public Audit getAudit() { * @return the versions of the model or null if not set. */ @Nullable - public ModelVersion[] getModelVersion() { + public ModelVersionInfo[] modelVersions() { return versions; } } diff --git a/core/src/main/java/org/apache/gravitino/listener/api/info/ModelVersionInfo.java b/core/src/main/java/org/apache/gravitino/listener/api/info/ModelVersionInfo.java new file mode 100644 index 00000000000..8c196d35c50 --- /dev/null +++ b/core/src/main/java/org/apache/gravitino/listener/api/info/ModelVersionInfo.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.listener.api.info; + +import java.util.Map; +import javax.annotation.Nullable; +import lombok.Getter; +import org.apache.gravitino.Audit; +import org.apache.gravitino.model.ModelVersion; + +/** + * {@link ModelVersionInfo} exposes model version information for event listener, it's supposed to + * be read only. Most of the fields are shallow copied internally not deep copies for performance. + */ +public class ModelVersionInfo { + @Getter private final String uri; + @Getter @Nullable private final String[] aliases; + @Nullable private final String comment; + @Getter private final Map properties; + @Nullable private final Audit auditInfo; + + /** + * Constructs model version information based on a given {@link ModelVersion}. + * + * @param modelVersion the model version to expose information for. + */ + public ModelVersionInfo(ModelVersion modelVersion) { + this.uri = modelVersion.uri(); + this.aliases = modelVersion.aliases(); + this.comment = modelVersion.comment(); + this.properties = modelVersion.properties(); + this.auditInfo = modelVersion.auditInfo(); + } + + /** + * Constructs model version information based on a given arguments. + * + * @param uri + * @param aliases + * @param comment + * @param properties + */ + public ModelVersionInfo( + String uri, String[] aliases, String comment, Map properties) { + this.uri = uri; + this.aliases = aliases; + this.comment = comment; + this.properties = properties; + this.auditInfo = null; + } + + /** + * Returns the comment of the model version. + * + * @return the comment of the model version or null if not set. + */ + @Nullable + public String getComment() { + return comment; + } + + /** + * Returns the audit information of the model version. + * + * @return the audit information of the model version or null if not set. + */ + @Nullable + public Audit getAudit() { + return auditInfo; + } +} diff --git a/core/src/test/java/org/apache/gravitino/listener/api/event/TestModelEvent.java b/core/src/test/java/org/apache/gravitino/listener/api/event/TestModelEvent.java index 4638d22314d..05c7b7ba42b 100644 --- a/core/src/test/java/org/apache/gravitino/listener/api/event/TestModelEvent.java +++ b/core/src/test/java/org/apache/gravitino/listener/api/event/TestModelEvent.java @@ -19,14 +19,363 @@ package org.apache.gravitino.listener.api.event; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableMap; +import java.util.Collections; +import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.Namespace; +import org.apache.gravitino.catalog.ModelDispatcher; +import org.apache.gravitino.exceptions.GravitinoRuntimeException; +import org.apache.gravitino.listener.DummyEventListener; +import org.apache.gravitino.listener.EventBus; +import org.apache.gravitino.listener.ModelEventDispatcher; +import org.apache.gravitino.listener.api.info.ModelInfo; +import org.apache.gravitino.listener.api.info.ModelVersionInfo; +import org.apache.gravitino.model.Model; +import org.apache.gravitino.model.ModelVersion; +import org.apache.gravitino.utils.NameIdentifierUtil; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; @TestInstance(TestInstance.Lifecycle.PER_CLASS) public class TestModelEvent { + private ModelEventDispatcher dispatcher; + private ModelEventDispatcher failureDispatcher; + private DummyEventListener dummyEventListener; + private Model modelA; + private Model modelB; + private NameIdentifier existingIdentA; + private NameIdentifier existingIdentB; + private NameIdentifier notExistingIdent; + private Namespace namespace; + private ModelVersion firstModelVersion; + private ModelVersion secondModelVersion; @BeforeAll void init() { - // TODO: Implement tests for ModelEvent + this.namespace = Namespace.of("metalake", "catalog", "schema"); + this.modelA = mockModel("modelA", "commentA"); + this.modelB = mockModel("modelB", "commentB"); + this.firstModelVersion = + mockModelVersion("uriA", new String[] {"aliasProduction"}, "versionInfoA"); + this.secondModelVersion = mockModelVersion("uriB", new String[] {"aliasTest"}, "versionInfoB"); + System.out.println(secondModelVersion.toString()); + this.existingIdentA = NameIdentifierUtil.ofModel("metalake", "catalog", "schema", "modelA"); + this.existingIdentB = NameIdentifierUtil.ofModel("metalake", "catalog", "schema", "modelB"); + this.notExistingIdent = + NameIdentifierUtil.ofModel("metalake", "catalog", "schema", "not_exist"); + this.dummyEventListener = new DummyEventListener(); + + EventBus eventBus = new EventBus(Collections.singletonList(dummyEventListener)); + this.dispatcher = new ModelEventDispatcher(eventBus, mockTagDispatcher()); + this.failureDispatcher = new ModelEventDispatcher(eventBus, mockExceptionModelDispatcher()); + // TODO: add failure dispatcher tests. + System.out.println(this.failureDispatcher.toString()); + } + + @Test + void testRegisterModelEvent() { + dispatcher.registerModel(existingIdentA, "comment", ImmutableMap.of("color", "#FFFFFF")); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(RegisterModelEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.REGISTER_MODEL, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + + RegisterModelEvent registerModelEvent = (RegisterModelEvent) event; + ModelInfo modelInfo = registerModelEvent.registeredModelInfo(); + Assertions.assertEquals("modelA", modelInfo.getName()); + Assertions.assertEquals("commentA", modelInfo.getComment()); + Assertions.assertEquals(ImmutableMap.of("color", "#FFFFFF"), modelInfo.getProperties()); + Assertions.assertNull(modelInfo.modelVersions()); + } + + @Test + void testGetModelEvent() { + Model model = dispatcher.getModel(existingIdentA); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(GetModelEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.GET_MODEL, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + Assertions.assertNotNull(model); + + // validate model info + GetModelEvent getModelEvent = (GetModelEvent) event; + ModelInfo modelInfo = getModelEvent.getModelInfo(); + Assertions.assertEquals("modelA", modelInfo.getName()); + Assertions.assertEquals("commentA", modelInfo.getComment()); + Assertions.assertEquals(ImmutableMap.of("color", "#FFFFFF"), modelInfo.getProperties()); + Assertions.assertNull(modelInfo.modelVersions()); + } + + @Test + void testDeleteExistsModelEvent() { + boolean isExists = dispatcher.deleteModel(existingIdentA); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(DropModelEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + Assertions.assertTrue(isExists); + + DropModelEvent dropModelEvent = (DropModelEvent) event; + Assertions.assertEquals(existingIdentA, dropModelEvent.identifier()); + Assertions.assertEquals(DropModelEvent.class, dropModelEvent.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL, dropModelEvent.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, dropModelEvent.operationStatus()); + } + + @Test + void testDeleteNotExistsModelEvent() { + boolean isExists = dispatcher.deleteModel(notExistingIdent); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(DropModelEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL, event.operationType()); + Assertions.assertEquals(OperationStatus.UNPROCESSED, event.operationStatus()); + Assertions.assertFalse(isExists); + + DropModelEvent dropModelEvent = (DropModelEvent) event; + Assertions.assertEquals(DropModelEvent.class, dropModelEvent.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL, dropModelEvent.operationType()); + Assertions.assertEquals(OperationStatus.UNPROCESSED, dropModelEvent.operationStatus()); + } + + @Test + void testListModelEvent() { + NameIdentifier[] nameIdentifiers = dispatcher.listModels(namespace); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(ListModelEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.LIST_MODEL, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + + ListModelEvent listModelEvent = (ListModelEvent) event; + Assertions.assertEquals(namespace, listModelEvent.namespace()); + + Assertions.assertEquals(2, nameIdentifiers.length); + Assertions.assertEquals(existingIdentA, nameIdentifiers[0]); + Assertions.assertEquals(existingIdentB, nameIdentifiers[1]); + } + + @Test + void testLinkModelVersionEvent() { + dispatcher.linkModelVersion( + existingIdentA, + "uriA", + new String[] {"aliasProduction"}, + "versionInfoA", + ImmutableMap.of("color", "#FFFFFF")); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(LinkModelVersionEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.LINK_MODEL_VERSION, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + + // validate model info + LinkModelVersionEvent linkModelVersionEvent = (LinkModelVersionEvent) event; + ModelInfo modelInfo = linkModelVersionEvent.linkModelVersionInfo(); + Assertions.assertEquals("modelA", modelInfo.getName()); + Assertions.assertEquals("commentA", modelInfo.getComment()); + Assertions.assertEquals(ImmutableMap.of("color", "#FFFFFF"), modelInfo.getProperties()); + + // validate model version info + ModelVersionInfo[] modelVersionInfos = modelInfo.modelVersions(); + Assertions.assertNotNull(modelVersionInfos); + Assertions.assertEquals(1, modelVersionInfos.length); + Assertions.assertEquals("versionInfoA", modelVersionInfos[0].getComment()); + Assertions.assertEquals("uriA", modelVersionInfos[0].getUri()); + Assertions.assertEquals("aliasProduction", modelVersionInfos[0].getAliases()[0]); + } + + @Test + void testGetModelVersionEventViaVersion() { + dispatcher.getModelVersion(existingIdentA, 1); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(GetModelVersionEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.GET_MODEL_VERSION, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + + // validate model info + GetModelVersionEvent getModelVersionEvent = (GetModelVersionEvent) event; + ModelInfo modelInfo = getModelVersionEvent.getModelVersionInfo(); + Assertions.assertEquals("modelA", modelInfo.getName()); + Assertions.assertEquals("commentA", modelInfo.getComment()); + Assertions.assertEquals(ImmutableMap.of("color", "#FFFFFF"), modelInfo.getProperties()); + + // validate model version info + ModelVersionInfo[] modelVersionInfos = modelInfo.modelVersions(); + Assertions.assertNotNull(modelVersionInfos); + Assertions.assertEquals(1, modelVersionInfos.length); + Assertions.assertEquals("model version versionInfoA", modelVersionInfos[0].getComment()); + Assertions.assertEquals("uriA", modelVersionInfos[0].getUri()); + Assertions.assertEquals("aliasProduction", modelVersionInfos[0].getAliases()[0]); + } + + @Test + void testGetModelVersionEventViaAlias() { + dispatcher.getModelVersion(existingIdentB, "aliasTest"); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(GetModelVersionEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.GET_MODEL_VERSION, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + + // validate model info + GetModelVersionEvent getModelVersionEvent = (GetModelVersionEvent) event; + ModelInfo modelInfo = getModelVersionEvent.getModelVersionInfo(); + Assertions.assertEquals("modelB", modelInfo.getName()); + Assertions.assertEquals("commentB", modelInfo.getComment()); + Assertions.assertEquals(ImmutableMap.of("color", "#FFFFFF"), modelInfo.getProperties()); + + // validate model version info + ModelVersionInfo[] modelVersionInfos = modelInfo.modelVersions(); + Assertions.assertNotNull(modelVersionInfos); + Assertions.assertEquals(1, modelVersionInfos.length); + Assertions.assertEquals("model version versionInfoB", modelVersionInfos[0].getComment()); + Assertions.assertEquals("uriB", modelVersionInfos[0].getUri()); + Assertions.assertEquals("aliasTest", modelVersionInfos[0].getAliases()[0]); + } + + @Test + void testDeleteModelVersionEventViaVersion() { + boolean isExists = dispatcher.deleteModelVersion(existingIdentA, 1); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(DropModelVersionEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL_VERSION, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + Assertions.assertTrue(isExists); + + // validate model info + DropModelVersionEvent dropModelVersionEvent = (DropModelVersionEvent) event; + Assertions.assertEquals(existingIdentA, dropModelVersionEvent.identifier()); + Assertions.assertEquals(DropModelVersionEvent.class, dropModelVersionEvent.getClass()); + Assertions.assertEquals( + OperationType.DROP_MODEL_VERSION, dropModelVersionEvent.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, dropModelVersionEvent.operationStatus()); + } + + @Test + void testDeleteModelVersionEventViaAlias() { + boolean isExists = dispatcher.deleteModelVersion(existingIdentB, "aliasTest"); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(DropModelVersionEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL_VERSION, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + Assertions.assertTrue(isExists); + + // validate model info + DropModelVersionEvent dropModelVersionEvent = (DropModelVersionEvent) event; + Assertions.assertEquals(existingIdentB, dropModelVersionEvent.identifier()); + Assertions.assertEquals(DropModelVersionEvent.class, dropModelVersionEvent.getClass()); + Assertions.assertEquals( + OperationType.DROP_MODEL_VERSION, dropModelVersionEvent.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, dropModelVersionEvent.operationStatus()); + } + + @Test + void testDeleteModelVersionEventViaVersionNotExists() { + boolean isExists = dispatcher.deleteModelVersion(existingIdentA, 3); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(DropModelVersionEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.DROP_MODEL_VERSION, event.operationType()); + Assertions.assertEquals(OperationStatus.UNPROCESSED, event.operationStatus()); + Assertions.assertFalse(isExists); + + // validate model info + DropModelVersionEvent dropModelVersionEvent = (DropModelVersionEvent) event; + Assertions.assertEquals(DropModelVersionEvent.class, dropModelVersionEvent.getClass()); + Assertions.assertEquals( + OperationType.DROP_MODEL_VERSION, dropModelVersionEvent.operationType()); + Assertions.assertEquals(OperationStatus.UNPROCESSED, dropModelVersionEvent.operationStatus()); + } + + @Test + void testListModelVersionsEvent() { + int[] versions = dispatcher.listModelVersions(existingIdentA); + Event event = dummyEventListener.popPostEvent(); + Assertions.assertEquals(ListModelVersionsEvent.class, event.getClass()); + Assertions.assertEquals(OperationType.LIST_MODEL_VERSIONS, event.operationType()); + Assertions.assertEquals(OperationStatus.SUCCESS, event.operationStatus()); + Assertions.assertEquals(2, versions.length); + Assertions.assertEquals(1, versions[0]); + Assertions.assertEquals(2, versions[1]); + + // validate model info + ListModelVersionsEvent listModelVersionsEvent = (ListModelVersionsEvent) event; + ModelInfo modelInfo = listModelVersionsEvent.listModelVersionInfo(); + Assertions.assertEquals("modelA", modelInfo.getName()); + Assertions.assertEquals("commentA", modelInfo.getComment()); + Assertions.assertEquals(ImmutableMap.of("color", "#FFFFFF"), modelInfo.getProperties()); + Assertions.assertNull(modelInfo.modelVersions()); + } + + private ModelDispatcher mockExceptionModelDispatcher() { + return mock( + ModelDispatcher.class, + invocation -> { + throw new GravitinoRuntimeException("Exception for all methods"); + }); + } + + private ModelDispatcher mockTagDispatcher() { + ModelDispatcher dispatcher = mock(ModelDispatcher.class); + + when(dispatcher.registerModel(existingIdentA, "comment", ImmutableMap.of("color", "#FFFFFF"))) + .thenReturn(modelA); + when(dispatcher.registerModel(existingIdentB, "comment", ImmutableMap.of("color", "#FFFFFF"))) + .thenReturn(modelB); + + when(dispatcher.getModel(existingIdentA)).thenReturn(modelA); + when(dispatcher.getModel(existingIdentB)).thenReturn(modelB); + + when(dispatcher.deleteModel(existingIdentA)).thenReturn(true); + when(dispatcher.deleteModel(notExistingIdent)).thenReturn(false); + + when(dispatcher.listModels(namespace)) + .thenReturn(new NameIdentifier[] {existingIdentA, existingIdentB}); + + when(dispatcher.getModelVersion(existingIdentA, 1)).thenReturn(firstModelVersion); + when(dispatcher.getModelVersion(existingIdentB, "aliasTest")).thenReturn(secondModelVersion); + + when(dispatcher.deleteModelVersion(existingIdentA, 1)).thenReturn(true); + when(dispatcher.deleteModelVersion(existingIdentB, "aliasTest")).thenReturn(true); + when(dispatcher.deleteModelVersion(existingIdentA, 3)).thenReturn(false); + + when(dispatcher.listModelVersions(existingIdentA)).thenReturn(new int[] {1, 2}); + + return dispatcher; + } + + /** + * Mock a model with given name and comment. + * + * @param name name of the model + * @param comment comment of the model + * @return a mock model + */ + private Model mockModel(String name, String comment) { + Model model = mock(Model.class); + when(model.name()).thenReturn(name); + when(model.comment()).thenReturn(comment); + when(model.properties()).thenReturn(ImmutableMap.of("color", "#FFFFFF")); + return model; + } + + /** + * Mock a model version with given uri, aliases, and comment. + * + * @param uri uri of the model version + * @param aliases aliases of the model version + * @param comment comment of the model version, added "model version " prefix. + * @return a mock model version + */ + private ModelVersion mockModelVersion(String uri, String[] aliases, String comment) { + ModelVersion modelVersion = mock(ModelVersion.class); + when(modelVersion.version()).thenReturn(1); + when(modelVersion.uri()).thenReturn(uri); + when(modelVersion.aliases()).thenReturn(aliases); + when(modelVersion.comment()).thenReturn("model version " + comment); + when(modelVersion.properties()).thenReturn(ImmutableMap.of("color", "#FFFFFF")); + + return modelVersion; } }