diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java index 0cd5770bcd..46916fa2d0 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleEventProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 Netflix, Inc. + * Copyright 2020 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,11 +31,6 @@ import com.netflix.conductor.service.ExecutionService; import com.netflix.conductor.service.MetadataService; import com.spotify.futures.CompletableFutures; -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.inject.Inject; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -50,6 +45,10 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import javax.inject.Inject; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author Viren @@ -167,6 +166,7 @@ private void handle(ObservableQueue queue, Message msg) { } } catch (Exception e) { logger.error("Error handling message: {} on queue:{}", msg, queue.getName(), e); + Monitors.recordEventQueueMessagesError(queue.getType(), queue.getName()); } finally { Monitors.recordEventQueueMessagesHandled(queue.getType(), queue.getName()); } @@ -261,12 +261,14 @@ EventExecution execute(EventExecution eventExecution, Action action, Object payl eventExecution.getOutput().putAll(output); } eventExecution.setStatus(Status.COMPLETED); + Monitors.recordEventExecutionSuccess(eventExecution.getEvent(), eventExecution.getName(), eventExecution.getAction().name()); } catch (RuntimeException e) { logger.error("Error executing action: {} for event: {} with messageId: {}", action.getAction(), eventExecution.getEvent(), eventExecution.getMessageId(), e); if (!isTransientException(e.getCause())) { // not a transient error, fail the event execution eventExecution.setStatus(Status.FAILED); eventExecution.getOutput().put("exception", e.getMessage()); + Monitors.recordEventExecutionError(eventExecution.getEvent(), eventExecution.getName(), eventExecution.getAction().name(), e.getClass().getSimpleName()); } } return eventExecution; diff --git a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java index 191ea5c258..a9c05a4c61 100644 --- a/core/src/main/java/com/netflix/conductor/metrics/Monitors.java +++ b/core/src/main/java/com/netflix/conductor/metrics/Monitors.java @@ -242,6 +242,18 @@ public static void recordEventQueueMessagesHandled(String queueType, String queu counter(classQualifier, "event_queue_messages_handled", "queueType", queueType, "queueName", queueName); } + public static void recordEventQueueMessagesError(String queueType, String queueName) { + counter(classQualifier, "event_queue_messages_error", "queueType", queueType, "queueName", queueName); + } + + public static void recordEventExecutionSuccess(String event, String handler, String action) { + counter(classQualifier, "event_execution_success", "event", event, "handler", handler, "action", action); + } + + public static void recordEventExecutionError(String event, String handler, String action, String exceptionClazz) { + counter(classQualifier, "event_execution_error", "event", event, "handler", handler, "action", action, "exception", exceptionClazz); + } + public static void recordEventActionError(String action, String entityName, String event) { counter(classQualifier, "event_action_error", "action", action, "entityName", entityName, "event", event); } diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java index 6d82a8cf96..b08be5389a 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleEventProcessor.java @@ -1,5 +1,5 @@ /* - * Copyright 2017 Netflix, Inc. + * Copyright 2020 Netflix, Inc. *

* Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,6 +15,20 @@ */ package com.netflix.conductor.core.events; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atMost; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.Uninterruptibles; import com.netflix.conductor.common.metadata.events.EventExecution; @@ -36,11 +50,6 @@ import com.netflix.conductor.core.utils.JsonUtils; import com.netflix.conductor.service.ExecutionService; import com.netflix.conductor.service.MetadataService; -import org.junit.Before; -import org.junit.Test; -import org.mockito.stubbing.Answer; -import rx.Observable; - import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; @@ -49,22 +58,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyMap; -import static org.mockito.ArgumentMatchers.anyMapOf; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.atMost; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import org.junit.Before; +import org.junit.Test; +import org.mockito.stubbing.Answer; +import rx.Observable; /** * @author Viren @@ -312,9 +309,11 @@ public void testExecuteInvalidAction() { SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration(), objectMapper); EventExecution eventExecution = new EventExecution("id", "messageId"); + eventExecution.setName("handler"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); Action action = new Action(); + eventExecution.setAction(Type.start_workflow); eventProcessor.execute(eventExecution, action, "payload"); assertEquals(1, executeInvoked.get()); @@ -334,8 +333,10 @@ public void testExecuteNonRetriableApplicationException() { EventExecution eventExecution = new EventExecution("id", "messageId"); eventExecution.setStatus(EventExecution.Status.IN_PROGRESS); eventExecution.setEvent("event"); + eventExecution.setName("handler"); Action action = new Action(); action.setAction(Type.start_workflow); + eventExecution.setAction(Type.start_workflow); eventProcessor.execute(eventExecution, action, "payload"); assertEquals(1, executeInvoked.get());