Skip to content

Commit

Permalink
feat(webserver): use a shared queue consumer from the log follow endp…
Browse files Browse the repository at this point in the history
…oint
  • Loading branch information
loicmathieu committed Feb 7, 2025
1 parent 129ba6a commit 2a95aee
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,22 +1,14 @@
package io.kestra.core.services;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.kestra.core.repositories.LogRepositoryInterface;
import io.micronaut.http.sse.Event;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -25,56 +17,9 @@
*/
@Singleton
public class ExecutionLogService {

@Inject
private LogRepositoryInterface logRepository;

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;

public Flux<Event<LogEntry>> streamExecutionLogs(final String tenantId,
final String executionId,
final Level minLevel,
final boolean withAccessControl) {

final AtomicReference<Runnable> disposable = new AtomicReference<>();

return Flux.<Event<LogEntry>>create(emitter -> {

// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
emitter.next(Event.of(LogEntry.builder().build()).id("start"));

// fetch repository first
getExecutionLogs(tenantId, executionId, minLevel, List.of(), withAccessControl)
.forEach(logEntry -> emitter.next(Event.of(logEntry).id("progress")));

final List<String> levels = LogEntry.findLevelsByMin(minLevel).stream().map(Enum::name).toList();

// consume in realtime
disposable.set(this.logQueue.receive(either -> {
if (either.isRight()) {
return;
}

LogEntry current = either.getLeft();

if (current.getExecutionId() != null && current.getExecutionId().equals(executionId)) {
if (levels.contains(current.getLevel().name())) {
emitter.next(Event.of(current).id("progress"));
}
}
}));
}, FluxSink.OverflowStrategy.BUFFER)
.doFinally(ignored -> {
Schedulers.boundedElastic().schedule(() -> {
if (disposable.get() != null) {
disposable.get().run();
}
});
});
}

public InputStream getExecutionLogsAsStream(String tenantId,
String executionId,
Level minLevel,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.kestra.core.tenant.TenantService;
import io.kestra.webserver.converters.QueryFilterFormat;
import io.kestra.webserver.responses.PagedResults;
import io.kestra.webserver.services.LogStreamingService;
import io.kestra.webserver.utils.PageableUtils;
import io.kestra.webserver.utils.QueryFilterUtils;
import io.kestra.webserver.utils.RequestUtils;
Expand All @@ -28,11 +29,13 @@
import jakarta.validation.constraints.Min;
import org.slf4j.event.Level;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

import java.io.InputStream;
import java.time.ZonedDateTime;
import java.util.List;
import java.util.Optional;
import java.util.UUID;

import static io.kestra.core.utils.DateUtils.validateTimeline;

Expand All @@ -50,6 +53,9 @@ public class LogController {
@Inject
private TenantService tenantService;

@Inject
private LogStreamingService logStreamingService;

@ExecuteOn(TaskExecutors.IO)
@Get(uri = "logs/search")
@Operation(tags = {"Logs"}, summary = "Search for logs")
Expand Down Expand Up @@ -152,7 +158,21 @@ public Flux<Event<LogEntry>> follow(
@Parameter(description = "The execution id") @PathVariable String executionId,
@Parameter(description = "The min log level filter") @Nullable @QueryValue Level minLevel
) {
return logService.streamExecutionLogs(tenantService.resolveTenant(), executionId, minLevel, true);
String subscriberId = UUID.randomUUID().toString();
final List<String> levels = LogEntry.findLevelsByMin(minLevel).stream().map(Enum::name).toList();

return Flux.<Event<LogEntry>>create(emitter -> {
// send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs
emitter.next(Event.of(LogEntry.builder().build()).id("start"));

// fetch repository first
logService.getExecutionLogs(tenantService.resolveTenant(), executionId, minLevel, List.of(), true)
.forEach(logEntry -> emitter.next(Event.of(logEntry).id("progress")));

// consume in realtime
logStreamingService.registerSubscriber(executionId, subscriberId, emitter, levels);
}, FluxSink.OverflowStrategy.BUFFER)
.doFinally(ignored -> logStreamingService.unregisterSubscriber(executionId, subscriberId));
}

@ExecuteOn(TaskExecutors.IO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public boolean isStopFollow(Flow flow, Execution execution) {
}

@PreDestroy
public void shutdown() {
void shutdown() {
if (queueConsumer != null) {
queueConsumer.run();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.kestra.webserver.services;

import io.kestra.core.models.executions.LogEntry;
import io.kestra.core.queues.QueueFactoryInterface;
import io.kestra.core.queues.QueueInterface;
import io.micronaut.http.sse.Event;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import reactor.core.publisher.FluxSink;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* This service offers a fanout mechanism so a single consumer of the log queue can dispatch log messages to multiple consumers.
* It is designed to be used for 'follow' endpoints that using SSE to follow a flow logs.
* <p>
* Consumers need first to register themselves via {@link #registerSubscriber(String, String, FluxSink, List)},
* then unregister (ideally in a finally block to avoid any memory leak) via {@link #unregisterSubscriber(String, String)}.
*/
@Slf4j
@Singleton
public class LogStreamingService {
private final Map<String, Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>>> subscribers = new ConcurrentHashMap<>();
private final Object subscriberLock = new Object();

@Inject
@Named(QueueFactoryInterface.WORKERTASKLOG_NAMED)
protected QueueInterface<LogEntry> logQueue;

private Runnable queueConsumer;

@PostConstruct
void startQueueConsumer() {
this.queueConsumer = logQueue.receive(either -> {
if (either.isRight()) {
log.error("Unable to deserialize log: {}", either.getRight().getMessage());
return;
}

LogEntry current = either.getLeft();
// Get all subscribers for this execution
Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>> executionSubscribers = subscribers.get(current.getExecutionId());

if (executionSubscribers != null && !executionSubscribers.isEmpty()) {
executionSubscribers.values().forEach(pair -> {
var sink = pair.getLeft();
var levels = pair.getRight();

if (levels.contains(current.getLevel().name())) {
sink.next(Event.of(current).id("progress"));
}
});
}
});
}

/**
* Register a subscriber to an execution logs.
* All subscribers must ensure to call {@link #unregisterSubscriber(String, String)} to avoid any memory leak.
*/
public void registerSubscriber(String executionId, String subscriberId, FluxSink<Event<LogEntry>> sink, List<String> levels) {
// it needs to be synchronized as we get and remove if empty, so we must be sure that nobody else is adding a new one in-between
synchronized (subscriberLock) {
subscribers.computeIfAbsent(executionId, k -> new ConcurrentHashMap<>())
.put(subscriberId, Pair.of(sink, levels));
}
}

/**
* Unregister a subscribers.
* This is advised to do it in a finally block to be sure to free resources.
*/
public void unregisterSubscriber(String executionId, String subscriberId) {
// it needs to be synchronized as we get and remove if empty, so we must be sure that nobody else is adding a new one in-between
synchronized (subscriberLock) {
Map<String, Pair<FluxSink<Event<LogEntry>>, List<String>>> executionSubscribers = subscribers.get(executionId);
if (executionSubscribers != null) {
executionSubscribers.remove(subscriberId);
if (executionSubscribers.isEmpty()) {
subscribers.remove(executionId);
}
}
}
}

@PreDestroy
void shutdown() {
if (queueConsumer != null) {
queueConsumer.run();
}
}
}

0 comments on commit 2a95aee

Please sign in to comment.