Skip to content

Commit

Permalink
Added queue metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Robban1980 committed Dec 3, 2024
1 parent fe40298 commit d06262b
Showing 1 changed file with 5 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties;
import com.netflix.conductor.metrics.Monitors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -164,12 +165,16 @@ public Observable<Message> observe() {
}
}

Monitors.recordEventQueueMessagesProcessed(
QUEUE_TYPE, this.topic, messages.size());
return Observable.from(messages);
} catch (Exception e) {
LOGGER.error(
"Error while polling Kafka for topic: {}",
topic,
e);
Monitors.recordObservableQMessageReceivedErrors(
QUEUE_TYPE);
return Observable.error(e);
}
})
Expand Down

0 comments on commit d06262b

Please sign in to comment.