diff --git a/core/src/main/java/io/kestra/core/services/ExecutionLogService.java b/core/src/main/java/io/kestra/core/services/ExecutionLogService.java index 2d070a28fb2..838ab40dd29 100644 --- a/core/src/main/java/io/kestra/core/services/ExecutionLogService.java +++ b/core/src/main/java/io/kestra/core/services/ExecutionLogService.java @@ -1,22 +1,14 @@ package io.kestra.core.services; import io.kestra.core.models.executions.LogEntry; -import io.kestra.core.queues.QueueFactoryInterface; -import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.LogRepositoryInterface; -import io.micronaut.http.sse.Event; import jakarta.inject.Inject; -import jakarta.inject.Named; import jakarta.inject.Singleton; import org.slf4j.event.Level; -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; -import reactor.core.scheduler.Schedulers; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -25,56 +17,9 @@ */ @Singleton public class ExecutionLogService { - @Inject private LogRepositoryInterface logRepository; - @Inject - @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) - protected QueueInterface logQueue; - - public Flux> streamExecutionLogs(final String tenantId, - final String executionId, - final Level minLevel, - final boolean withAccessControl) { - - final AtomicReference disposable = new AtomicReference<>(); - - return Flux.>create(emitter -> { - - // send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs - emitter.next(Event.of(LogEntry.builder().build()).id("start")); - - // fetch repository first - getExecutionLogs(tenantId, executionId, minLevel, List.of(), withAccessControl) - .forEach(logEntry -> emitter.next(Event.of(logEntry).id("progress"))); - - final List levels = LogEntry.findLevelsByMin(minLevel).stream().map(Enum::name).toList(); - - // consume in realtime - disposable.set(this.logQueue.receive(either -> { - if (either.isRight()) { - return; - } - - LogEntry current = either.getLeft(); - - if (current.getExecutionId() != null && current.getExecutionId().equals(executionId)) { - if (levels.contains(current.getLevel().name())) { - emitter.next(Event.of(current).id("progress")); - } - } - })); - }, FluxSink.OverflowStrategy.BUFFER) - .doFinally(ignored -> { - Schedulers.boundedElastic().schedule(() -> { - if (disposable.get() != null) { - disposable.get().run(); - } - }); - }); - } - public InputStream getExecutionLogsAsStream(String tenantId, String executionId, Level minLevel, diff --git a/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java b/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java index cf1b4bc3759..1b57b823516 100644 --- a/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java +++ b/webserver/src/main/java/io/kestra/webserver/controllers/api/LogController.java @@ -7,6 +7,7 @@ import io.kestra.core.tenant.TenantService; import io.kestra.webserver.converters.QueryFilterFormat; import io.kestra.webserver.responses.PagedResults; +import io.kestra.webserver.services.LogStreamingService; import io.kestra.webserver.utils.PageableUtils; import io.kestra.webserver.utils.QueryFilterUtils; import io.kestra.webserver.utils.RequestUtils; @@ -28,11 +29,13 @@ import jakarta.validation.constraints.Min; import org.slf4j.event.Level; import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import java.io.InputStream; import java.time.ZonedDateTime; import java.util.List; import java.util.Optional; +import java.util.UUID; import static io.kestra.core.utils.DateUtils.validateTimeline; @@ -50,6 +53,9 @@ public class LogController { @Inject private TenantService tenantService; + @Inject + private LogStreamingService logStreamingService; + @ExecuteOn(TaskExecutors.IO) @Get(uri = "logs/search") @Operation(tags = {"Logs"}, summary = "Search for logs") @@ -152,7 +158,21 @@ public Flux> follow( @Parameter(description = "The execution id") @PathVariable String executionId, @Parameter(description = "The min log level filter") @Nullable @QueryValue Level minLevel ) { - return logService.streamExecutionLogs(tenantService.resolveTenant(), executionId, minLevel, true); + String subscriberId = UUID.randomUUID().toString(); + final List levels = LogEntry.findLevelsByMin(minLevel).stream().map(Enum::name).toList(); + + return Flux.>create(emitter -> { + // send a first "empty" event so the SSE is correctly initialized in the frontend in case there are no logs + emitter.next(Event.of(LogEntry.builder().build()).id("start")); + + // fetch repository first + logService.getExecutionLogs(tenantService.resolveTenant(), executionId, minLevel, List.of(), true) + .forEach(logEntry -> emitter.next(Event.of(logEntry).id("progress"))); + + // consume in realtime + logStreamingService.registerSubscriber(executionId, subscriberId, emitter, levels); + }, FluxSink.OverflowStrategy.BUFFER) + .doFinally(ignored -> logStreamingService.unregisterSubscriber(executionId, subscriberId)); } @ExecuteOn(TaskExecutors.IO) diff --git a/webserver/src/main/java/io/kestra/webserver/services/ExecutionStreamingService.java b/webserver/src/main/java/io/kestra/webserver/services/ExecutionStreamingService.java index 6917935ee17..ccfce6ee643 100644 --- a/webserver/src/main/java/io/kestra/webserver/services/ExecutionStreamingService.java +++ b/webserver/src/main/java/io/kestra/webserver/services/ExecutionStreamingService.java @@ -120,7 +120,7 @@ public boolean isStopFollow(Flow flow, Execution execution) { } @PreDestroy - public void shutdown() { + void shutdown() { if (queueConsumer != null) { queueConsumer.run(); } diff --git a/webserver/src/main/java/io/kestra/webserver/services/LogStreamingService.java b/webserver/src/main/java/io/kestra/webserver/services/LogStreamingService.java new file mode 100644 index 00000000000..bfab2f66867 --- /dev/null +++ b/webserver/src/main/java/io/kestra/webserver/services/LogStreamingService.java @@ -0,0 +1,99 @@ +package io.kestra.webserver.services; + +import io.kestra.core.models.executions.LogEntry; +import io.kestra.core.queues.QueueFactoryInterface; +import io.kestra.core.queues.QueueInterface; +import io.micronaut.http.sse.Event; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import jakarta.inject.Singleton; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import reactor.core.publisher.FluxSink; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This service offers a fanout mechanism so a single consumer of the log queue can dispatch log messages to multiple consumers. + * It is designed to be used for 'follow' endpoints that using SSE to follow a flow logs. + *

+ * Consumers need first to register themselves via {@link #registerSubscriber(String, String, FluxSink, List)}, + * then unregister (ideally in a finally block to avoid any memory leak) via {@link #unregisterSubscriber(String, String)}. + */ +@Slf4j +@Singleton +public class LogStreamingService { + private final Map>, List>>> subscribers = new ConcurrentHashMap<>(); + private final Object subscriberLock = new Object(); + + @Inject + @Named(QueueFactoryInterface.WORKERTASKLOG_NAMED) + protected QueueInterface logQueue; + + private Runnable queueConsumer; + + @PostConstruct + void startQueueConsumer() { + this.queueConsumer = logQueue.receive(either -> { + if (either.isRight()) { + log.error("Unable to deserialize log: {}", either.getRight().getMessage()); + return; + } + + LogEntry current = either.getLeft(); + // Get all subscribers for this execution + Map>, List>> executionSubscribers = subscribers.get(current.getExecutionId()); + + if (executionSubscribers != null && !executionSubscribers.isEmpty()) { + executionSubscribers.values().forEach(pair -> { + var sink = pair.getLeft(); + var levels = pair.getRight(); + + if (levels.contains(current.getLevel().name())) { + sink.next(Event.of(current).id("progress")); + } + }); + } + }); + } + + /** + * Register a subscriber to an execution logs. + * All subscribers must ensure to call {@link #unregisterSubscriber(String, String)} to avoid any memory leak. + */ + public void registerSubscriber(String executionId, String subscriberId, FluxSink> sink, List levels) { + // it needs to be synchronized as we get and remove if empty, so we must be sure that nobody else is adding a new one in-between + synchronized (subscriberLock) { + subscribers.computeIfAbsent(executionId, k -> new ConcurrentHashMap<>()) + .put(subscriberId, Pair.of(sink, levels)); + } + } + + /** + * Unregister a subscribers. + * This is advised to do it in a finally block to be sure to free resources. + */ + public void unregisterSubscriber(String executionId, String subscriberId) { + // it needs to be synchronized as we get and remove if empty, so we must be sure that nobody else is adding a new one in-between + synchronized (subscriberLock) { + Map>, List>> executionSubscribers = subscribers.get(executionId); + if (executionSubscribers != null) { + executionSubscribers.remove(subscriberId); + if (executionSubscribers.isEmpty()) { + subscribers.remove(executionId); + } + } + } + } + + @PreDestroy + void shutdown() { + if (queueConsumer != null) { + queueConsumer.run(); + } + } +}