Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #1663 from Netflix/event_processing_metrics
Browse files Browse the repository at this point in the history
added metrics for event execution
  • Loading branch information
apanicker-nflx authored Apr 29, 2020
2 parents 3cafdf9 + 6de3e62 commit 232eddd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 Netflix, Inc.
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,11 +31,6 @@
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import com.spotify.futures.CompletableFutures;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.inject.Inject;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -50,6 +45,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Viren
Expand Down Expand Up @@ -167,6 +166,7 @@ private void handle(ObservableQueue queue, Message msg) {
}
} catch (Exception e) {
logger.error("Error handling message: {} on queue:{}", msg, queue.getName(), e);
Monitors.recordEventQueueMessagesError(queue.getType(), queue.getName());
} finally {
Monitors.recordEventQueueMessagesHandled(queue.getType(), queue.getName());
}
Expand Down Expand Up @@ -261,12 +261,14 @@ EventExecution execute(EventExecution eventExecution, Action action, Object payl
eventExecution.getOutput().putAll(output);
}
eventExecution.setStatus(Status.COMPLETED);
Monitors.recordEventExecutionSuccess(eventExecution.getEvent(), eventExecution.getName(), eventExecution.getAction().name());
} catch (RuntimeException e) {
logger.error("Error executing action: {} for event: {} with messageId: {}", action.getAction(), eventExecution.getEvent(), eventExecution.getMessageId(), e);
if (!isTransientException(e.getCause())) {
// not a transient error, fail the event execution
eventExecution.setStatus(Status.FAILED);
eventExecution.getOutput().put("exception", e.getMessage());
Monitors.recordEventExecutionError(eventExecution.getEvent(), eventExecution.getName(), eventExecution.getAction().name(), e.getClass().getSimpleName());
}
}
return eventExecution;
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/com/netflix/conductor/metrics/Monitors.java
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,18 @@ public static void recordEventQueueMessagesHandled(String queueType, String queu
counter(classQualifier, "event_queue_messages_handled", "queueType", queueType, "queueName", queueName);
}

public static void recordEventQueueMessagesError(String queueType, String queueName) {
counter(classQualifier, "event_queue_messages_error", "queueType", queueType, "queueName", queueName);
}

public static void recordEventExecutionSuccess(String event, String handler, String action) {
counter(classQualifier, "event_execution_success", "event", event, "handler", handler, "action", action);
}

public static void recordEventExecutionError(String event, String handler, String action, String exceptionClazz) {
counter(classQualifier, "event_execution_error", "event", event, "handler", handler, "action", action, "exception", exceptionClazz);
}

public static void recordEventActionError(String action, String entityName, String event) {
counter(classQualifier, "event_action_error", "action", action, "entityName", entityName, "event", event);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 Netflix, Inc.
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,20 @@
*/
package com.netflix.conductor.core.events;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.Uninterruptibles;
import com.netflix.conductor.common.metadata.events.EventExecution;
Expand All @@ -36,11 +50,6 @@
import com.netflix.conductor.core.utils.JsonUtils;
import com.netflix.conductor.service.ExecutionService;
import com.netflix.conductor.service.MetadataService;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import rx.Observable;

import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
Expand All @@ -49,22 +58,10 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.anyMapOf;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.junit.Before;
import org.junit.Test;
import org.mockito.stubbing.Answer;
import rx.Observable;

/**
* @author Viren
Expand Down Expand Up @@ -312,9 +309,11 @@ public void testExecuteInvalidAction() {

SimpleEventProcessor eventProcessor = new SimpleEventProcessor(executionService, metadataService, actionProcessor, eventQueues, jsonUtils, new TestConfiguration(), objectMapper);
EventExecution eventExecution = new EventExecution("id", "messageId");
eventExecution.setName("handler");
eventExecution.setStatus(EventExecution.Status.IN_PROGRESS);
eventExecution.setEvent("event");
Action action = new Action();
eventExecution.setAction(Type.start_workflow);

eventProcessor.execute(eventExecution, action, "payload");
assertEquals(1, executeInvoked.get());
Expand All @@ -334,8 +333,10 @@ public void testExecuteNonRetriableApplicationException() {
EventExecution eventExecution = new EventExecution("id", "messageId");
eventExecution.setStatus(EventExecution.Status.IN_PROGRESS);
eventExecution.setEvent("event");
eventExecution.setName("handler");
Action action = new Action();
action.setAction(Type.start_workflow);
eventExecution.setAction(Type.start_workflow);

eventProcessor.execute(eventExecution, action, "payload");
assertEquals(1, executeInvoked.get());
Expand Down

0 comments on commit 232eddd

Please sign in to comment.