diff --git a/CHANGELOG.md b/CHANGELOG.md index 0578f57b60..afbe59e3f9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.2.2 +* Detects and handle silent and errored SSEStream. [#632](https://github.com/stellar/java-stellar-anchor-sdk/issues/632) +* When the health status is RED, set the status code to 500. + ## 1.2.1 * Fix gson serialization error of the Refunds object. [#626](https://github.com/stellar/java-stellar-anchor-sdk/issues/626) diff --git a/anchor-reference-server/src/main/java/org/stellar/anchor/reference/event/KafkaListener.java b/anchor-reference-server/src/main/java/org/stellar/anchor/reference/event/KafkaListener.java index a35426ff1e..0b9e1437e7 100644 --- a/anchor-reference-server/src/main/java/org/stellar/anchor/reference/event/KafkaListener.java +++ b/anchor-reference-server/src/main/java/org/stellar/anchor/reference/event/KafkaListener.java @@ -43,8 +43,8 @@ Consumer createKafkaConsumer() { props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerSettings.getBootStrapServer()); props.put(ConsumerConfig.GROUP_ID_CONFIG, "group_one1"); - // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // start reading from - // earliest available message + // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + // start reading from the earliest available message props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(JsonDeserializer.TRUSTED_PACKAGES, "*"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); @@ -141,7 +141,7 @@ public HealthCheckResult check() { return KafkaHealthCheckResult.builder() .name(getName()) - .status(status.getName()) + .status(status) .kafkaAvailable(kafkaAvailable) .running(!executor.isTerminated()) .build(); @@ -162,9 +162,9 @@ boolean validateKafka() { class KafkaHealthCheckResult implements HealthCheckResult { transient String name; - List statuses = List.of(GREEN.getName(), RED.getName()); + List statuses = List.of(GREEN, RED); - String status; + HealthCheckStatus status; boolean running; diff --git a/api-schema/src/main/java/org/stellar/anchor/api/platform/HealthCheckResult.java b/api-schema/src/main/java/org/stellar/anchor/api/platform/HealthCheckResult.java index d22ca3717b..747b1916ba 100644 --- a/api-schema/src/main/java/org/stellar/anchor/api/platform/HealthCheckResult.java +++ b/api-schema/src/main/java/org/stellar/anchor/api/platform/HealthCheckResult.java @@ -10,12 +10,12 @@ public interface HealthCheckResult { * * @return the list of health check status. */ - List getStatuses(); + List getStatuses(); /** * the current status of the health check result. * * @return the status. */ - String getStatus(); + HealthCheckStatus getStatus(); } diff --git a/build.gradle.kts b/build.gradle.kts index 82f10f304e..4ad3137142 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -118,7 +118,7 @@ subprojects { allprojects { group = "org.stellar.anchor-sdk" - version = "1.2.1" + version = "1.2.2" tasks.jar { manifest { diff --git a/core/src/main/java/org/stellar/anchor/util/ExponentialBackoffTimer.java b/core/src/main/java/org/stellar/anchor/util/ExponentialBackoffTimer.java index ebf0059361..fccedc780e 100644 --- a/core/src/main/java/org/stellar/anchor/util/ExponentialBackoffTimer.java +++ b/core/src/main/java/org/stellar/anchor/util/ExponentialBackoffTimer.java @@ -42,4 +42,12 @@ public void reset() { public void sleep() throws InterruptedException { Thread.sleep(sleepSeconds * 1000); } + + public long currentTimer() { + return sleepSeconds; + } + + public boolean isTimerMaxed() { + return sleepSeconds >= maxSleepSeconds; + } } diff --git a/docs/resources/docker-examples/kafka/docker-compose.yaml b/docs/resources/docker-examples/kafka/docker-compose.yaml index b01b3f3238..5b26dfe082 100644 --- a/docs/resources/docker-examples/kafka/docker-compose.yaml +++ b/docs/resources/docker-examples/kafka/docker-compose.yaml @@ -1,4 +1,4 @@ -version: '2' +version: '2.4' services: zookeeper: platform: linux/x86_64 diff --git a/helm-charts/sep-service/templates/ingress.yaml b/helm-charts/sep-service/templates/ingress.yaml index 56c22d6027..8f01de0792 100644 --- a/helm-charts/sep-service/templates/ingress.yaml +++ b/helm-charts/sep-service/templates/ingress.yaml @@ -46,56 +46,3 @@ spec: number: {{ tpl .backend.servicePort $ }} {{- end }} {{- end }} - -# Stellar Observer -{{- $root := . }} -{{- $stellarObserver := (.Values.stellarObserver) }} -{{- if $stellarObserver.enabled }} ---- -apiVersion: networking.k8s.io/v1 -kind: Ingress -metadata: - name: {{ .Values.fullName }}-ing-{{ .Values.service.name }}-observer - {{- if .Values.ingress.metadata }} - {{- range $key, $value := .Values.ingress.metadata }} - {{- end }} - {{- end }} - {{- if .Values.ingress.annotations }} - annotations: - {{- range $key, $value := .Values.ingress.annotations }} - {{ $key }}: {{ $value | quote }} - {{- end }} - {{- end }} - labels: - app.kubernetes.io/name: {{ .Values.fullName }}-ing-{{ .Values.service.name }}-observer - helm.sh/chart: {{ $.Chart.Name }}-{{ $.Chart.Version }} - app.kubernetes.io/instance: {{ .Release.Name }} - app.kubernetes.io/managed-by: {{ .Release.Service }} -spec: - {{- if .Values.ingress.ingressClassName }} - ingressClassName: {{ .Values.ingress.ingressClassName }} - {{- end }} - {{- if .Values.ingress.tls }} - tls: - - hosts: - - {{ $stellarObserver.ingress.tls.host }} - {{- if .Values.ingress.tls.secretName }} - secretName: {{ .Values.ingress.tls.secretName }} - {{- end }} - {{- end }} - {{- if .Values.ingress.rules }} - rules: - {{- range $stellarObserver.ingress.rules.hosts }} - - host: {{ tpl .host $ }} - http: - paths: - - path: {{ .path }} - pathType: {{ .pathType | quote }} - backend: - service: - name: {{ $root.Values.fullName }}-svc-{{ $root.Values.service.name }}-observer - port: - number: {{ $stellarObserver.deployment.port | default 8083 }} - {{- end }} - {{- end }} -{{- end }} \ No newline at end of file diff --git a/integration-tests/src/test/kotlin/org/stellar/anchor/platform/AnchorPlatformIntegrationTest.kt b/integration-tests/src/test/kotlin/org/stellar/anchor/platform/AnchorPlatformIntegrationTest.kt index 6d14bf866f..0aebac8494 100644 --- a/integration-tests/src/test/kotlin/org/stellar/anchor/platform/AnchorPlatformIntegrationTest.kt +++ b/integration-tests/src/test/kotlin/org/stellar/anchor/platform/AnchorPlatformIntegrationTest.kt @@ -320,7 +320,6 @@ class AnchorPlatformIntegrationTest { "sep38.quoteIntegrationEndPoint" to "http://localhost:8081", "payment-gateway.circle.name" to "circle", "payment-gateway.circle.enabled" to "true", - "spring.jpa.database-platform" to "org.stellar.anchor.platform.sqlite.SQLiteDialect", "logging.level.root" to "INFO" ) @@ -393,16 +392,16 @@ class AnchorPlatformIntegrationTest { val stellarPaymentObserverCheck = checks["stellar_payment_observer"] as Map<*, *> assertEquals(2, stellarPaymentObserverCheck.size) - assertEquals("green", stellarPaymentObserverCheck["status"]) + assertEquals("GREEN", stellarPaymentObserverCheck["status"]) val observerStreams = stellarPaymentObserverCheck["streams"] as List<*> assertEquals(1, observerStreams.size) val stream1 = observerStreams[0] as Map<*, *> - assertEquals(4, stream1.size) + assertEquals(5, stream1.size) assertEquals(false, stream1["thread_shutdown"]) assertEquals(false, stream1["thread_terminated"]) assertEquals(false, stream1["stopped"]) - assertNotNull(stream1["lastEventId"]) + assertNotNull(stream1["last_event_id"]) } } diff --git a/integration-tests/src/test/resources/integration-test.anchor-config.yaml b/integration-tests/src/test/resources/integration-test.anchor-config.yaml index 5686c27fcb..98f79653db 100644 --- a/integration-tests/src/test/resources/integration-test.anchor-config.yaml +++ b/integration-tests/src/test/resources/integration-test.anchor-config.yaml @@ -8,7 +8,7 @@ stellar: settings: app-config # The location of the configuration data data-access: type: data-spring-jdbc # Activate [config-spring-jdbc] module. - settings: data-spring-jdbc-sqlite # The location of the configuration data in this file. + settings: data-spring-jdbc-h2 # The location of the configuration data in this file. logging: type: logging-logback settings: logging-logback-settings diff --git a/platform/src/main/java/org/stellar/anchor/platform/controller/HealthController.java b/platform/src/main/java/org/stellar/anchor/platform/controller/HealthController.java index 57fedf862a..bf28bc962f 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/controller/HealthController.java +++ b/platform/src/main/java/org/stellar/anchor/platform/controller/HealthController.java @@ -1,8 +1,11 @@ package org.stellar.anchor.platform.controller; import java.util.List; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; import org.stellar.anchor.api.platform.HealthCheckResponse; +import org.stellar.anchor.api.platform.HealthCheckStatus; import org.stellar.anchor.platform.service.HealthCheckService; @RestController @@ -16,10 +19,20 @@ public class HealthController { } @RequestMapping(method = {RequestMethod.GET}) - public HealthCheckResponse health(@RequestParam(required = false) List checks) { + public ResponseEntity health( + @RequestParam(required = false) List checks) { if (checks == null) { checks = List.of("all"); } - return healthCheckService.check(checks); + HealthCheckResponse healthCheckResponse = healthCheckService.check(checks); + + boolean unhealthy = + healthCheckResponse.getChecks().values().stream() + .anyMatch(result -> result.getStatus() == HealthCheckStatus.RED); + if (unhealthy) { + return new ResponseEntity<>(healthCheckResponse, HttpStatus.INTERNAL_SERVER_ERROR); + } else { + return ResponseEntity.ok(healthCheckResponse); + } } } diff --git a/platform/src/main/java/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserver.java b/platform/src/main/java/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserver.java index 56473ab9a8..327659a164 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserver.java +++ b/platform/src/main/java/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserver.java @@ -1,13 +1,20 @@ package org.stellar.anchor.platform.payment.observer.stellar; -import static org.stellar.anchor.api.platform.HealthCheckStatus.GREEN; -import static org.stellar.anchor.api.platform.HealthCheckStatus.RED; +import static org.stellar.anchor.api.platform.HealthCheckStatus.*; +import static org.stellar.anchor.platform.payment.observer.stellar.ObserverStatus.*; +import static org.stellar.anchor.util.Log.debugF; +import static org.stellar.anchor.util.Log.infoF; import static org.stellar.anchor.util.ReflectionUtil.getField; import com.google.gson.annotations.SerializedName; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.util.*; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import lombok.Builder; @@ -40,49 +47,198 @@ public class StellarPaymentObserver implements HealthCheckable { /** The minimum number of results the Stellar Blockchain can return. */ private static final int MIN_RESULTS = 1; + // If the observer had been silent for longer than SILENC_TIMEOUT, a SilenceTimeoutException will + // be thrown to trigger reconnections. + private static final long SILENCE_TIMEOUT = 90; + // If the observer has more than 2 SILENCE_TIMEOUT_RETRIES, it will be marked unhealthy + private static final long SILENCE_TIMEOUT_RETRIES = 2; + + // The time interval between silence checks + private static final long SILENCE_CHECK_INTERVAL = 5; + final Server server; - final Set observers; + final Set paymentListeners; final StellarPaymentStreamerCursorStore paymentStreamerCursorStore; final Map, String> mapStreamToAccount = new HashMap<>(); final PaymentObservingAccountsManager paymentObservingAccountsManager; SSEStream stream; - private final ExponentialBackoffTimer exponentialBackoffTimer = new ExponentialBackoffTimer(); + final ExponentialBackoffTimer publishingBackoffTimer = new ExponentialBackoffTimer(); + final ExponentialBackoffTimer streamBackoffTimer = new ExponentialBackoffTimer(); + int silenceTimeoutCount = 0; + + ObserverStatus status = RUNNING; + + Instant lastActivityTime; + + ScheduledExecutorService silenceWatcher = Executors.newSingleThreadScheduledExecutor(); + ScheduledExecutorService statusWatcher = Executors.newSingleThreadScheduledExecutor(); StellarPaymentObserver( String horizonServer, - Set observers, + Set paymentListeners, PaymentObservingAccountsManager paymentObservingAccountsManager, StellarPaymentStreamerCursorStore paymentStreamerCursorStore) { this.server = new Server(horizonServer); - this.observers = observers; + this.paymentListeners = paymentListeners; this.paymentObservingAccountsManager = paymentObservingAccountsManager; this.paymentStreamerCursorStore = paymentStreamerCursorStore; } - /** Start watching the accounts. */ + /** Start the observer. */ public void start() { - this.stream = watch(); + infoF("Starting the SSEStream"); + startStream(); + + infoF("Starting the observer silence watcher"); + silenceWatcher.scheduleAtFixedRate( + this::checkSilence, + 1, + SILENCE_CHECK_INTERVAL, + TimeUnit.SECONDS); // TODO: The period should be made configurable in version 2.x + + infoF("Starting the status watcher"); + statusWatcher.scheduleWithFixedDelay(this::checkStatus, 1, 1, TimeUnit.SECONDS); + + setStatus(RUNNING); } - /** Graceful shutdown. */ + /** Graceful shut down the observer */ public void shutdown() { - this.stream.close(); - this.stream = null; + infoF("Shutting down the SSEStream"); + stopStream(); + + infoF("Stopping the silence watcher"); + silenceWatcher.shutdown(); + + infoF("Stopping the status watcher"); + statusWatcher.shutdown(); + setStatus(SHUTDOWN); + } + + void startStream() { + this.stream = startSSEStream(); + } + + SSEStream startSSEStream() { + String latestCursor = fetchStreamingCursor(); + debugF("SSEStream last cursor={}", latestCursor); + + PaymentsRequestBuilder paymentsRequest = + server + .payments() + .includeTransactions(true) + .cursor(latestCursor) + .order(RequestBuilder.Order.ASC) + .limit(MAX_RESULTS); + return paymentsRequest.stream( + new EventListener<>() { + @Override + public void onEvent(OperationResponse operationResponse) { + debugF("received event {}", operationResponse.getId()); + // clear stream timeout/reconnect status + lastActivityTime = Instant.now(); + silenceTimeoutCount = 0; + streamBackoffTimer.reset(); + handleEvent(operationResponse); + } + + @Override + public void onFailure(Optional exception, Optional statusCode) { + handleFailure(exception); + } + }); } - private void restart() { - Log.info("Restarting the Stellar observer."); + void stopStream() { if (this.stream != null) { - this.shutdown(); + this.stream.close(); + this.stream = null; } + } - try { - exponentialBackoffTimer.sleep(); - exponentialBackoffTimer.increase(); - this.start(); - } catch (InterruptedException ex) { - Log.errorEx(ex); + void checkSilence() { + if (status != NEEDS_SHUTDOWN && status != SHUTDOWN) { + Instant now = Instant.now(); + if (lastActivityTime != null) { + Duration silenceDuration = Duration.between(lastActivityTime, now); + if (silenceDuration.getSeconds() > SILENCE_TIMEOUT) { + infoF("The observer had been silent for {} seconds.", silenceDuration.getSeconds()); + setStatus(SILENCE_ERROR); + } else { + debugF("The observer had been silent for {} seconds.", silenceDuration.getSeconds()); + } + } + } + } + + void restartStream() { + infoF("Restarting the stream"); + stopStream(); + startStream(); + setStatus(RUNNING); + } + + void checkStatus() { + switch (status) { + case NEEDS_SHUTDOWN: + infoF("shut down the observer"); + shutdown(); + break; + case STREAM_ERROR: + // We got stream connection error. We will use the backoff timer to reconnect. + // If the backoff timer reaches max, we will shut down the observer + if (streamBackoffTimer.isTimerMaxed()) { + infoF("The streamer backoff timer is maxed. Shutdown the observer"); + setStatus(NEEDS_SHUTDOWN); + } else { + try { + infoF( + "The streamer needs restart. Start backoff timer: {} seconds", + streamBackoffTimer.currentTimer()); + streamBackoffTimer.sleep(); + streamBackoffTimer.increase(); + restartStream(); + } catch (InterruptedException e) { + // if this thread is interrupted, we are shutting down the status watcher. + infoF("The status watcher is interrupted. Shutdown the observer"); + setStatus(NEEDS_SHUTDOWN); + } + } + break; + case SILENCE_ERROR: + infoF("The silence reconnection count: {}", silenceTimeoutCount); + // We got the silence error. If silence reconnect too many times, we will shut down the + // observer. + if (silenceTimeoutCount >= SILENCE_TIMEOUT_RETRIES) { + infoF( + "The silence error has happened for too many times:{}. Shutdown the observer", + silenceTimeoutCount); + setStatus(NEEDS_SHUTDOWN); + } else { + restartStream(); + lastActivityTime = Instant.now(); + silenceTimeoutCount++; + } + break; + case PUBLISHER_ERROR: + try { + infoF( + "Start the publishing backoff timer: {} seconds", + publishingBackoffTimer.currentTimer()); + publishingBackoffTimer.sleep(); + publishingBackoffTimer.increase(); + restartStream(); + } catch (InterruptedException e) { + // if this thread is interrupted, we are shutting down the status watcher. + setStatus(NEEDS_SHUTDOWN); + } + break; + case RUNNING: + case SHUTDOWN: + default: + // NOOP + break; } } @@ -118,85 +274,74 @@ String fetchStreamingCursor() { return pageOpResponse.getRecords().get(0).getPagingToken(); } - SSEStream watch() { - String latestCursor = fetchStreamingCursor(); - PaymentsRequestBuilder paymentsRequest = - server - .payments() - .includeTransactions(true) - .cursor(latestCursor) - .order(RequestBuilder.Order.ASC) - .limit(MAX_RESULTS); + void handleEvent(OperationResponse operationResponse) { + if (!operationResponse.isTransactionSuccessful()) { + paymentStreamerCursorStore.save(operationResponse.getPagingToken()); + return; + } - return paymentsRequest.stream( - new EventListener<>() { - @Override - public void onEvent(OperationResponse operationResponse) { - if (!operationResponse.isTransactionSuccessful()) { - paymentStreamerCursorStore.save(operationResponse.getPagingToken()); - return; - } - - ObservedPayment observedPayment = null; - try { - if (operationResponse instanceof PaymentOperationResponse) { - PaymentOperationResponse payment = (PaymentOperationResponse) operationResponse; - observedPayment = ObservedPayment.fromPaymentOperationResponse(payment); - } else if (operationResponse instanceof PathPaymentBaseOperationResponse) { - PathPaymentBaseOperationResponse pathPayment = - (PathPaymentBaseOperationResponse) operationResponse; - observedPayment = ObservedPayment.fromPathPaymentOperationResponse(pathPayment); - } - } catch (SepException ex) { - Log.warn( - String.format( - "Payment of id %s contains unsupported memo %s.", - operationResponse.getId(), - operationResponse.getTransaction().get().getMemo().toString())); - Log.warnEx(ex); - } - - if (observedPayment != null) { - try { - if (paymentObservingAccountsManager.lookupAndUpdate(observedPayment.getTo())) { - for (PaymentListener listener : observers) { - listener.onReceived(observedPayment); - } - } - - if (paymentObservingAccountsManager.lookupAndUpdate(observedPayment.getFrom()) - && !observedPayment.getTo().equals(observedPayment.getFrom())) { - final ObservedPayment finalObservedPayment = observedPayment; - observers.forEach(observer -> observer.onSent(finalObservedPayment)); - } - - } catch (EventPublishException ex) { - // restart the observer from where it stopped, in case the queue fails to - // publish the message. - Log.errorEx("Failed to send event to observer.", ex); - restart(); - return; - } catch (Throwable t) { - Log.errorEx("Something went wrong in the streamer", t); - if (!Thread.interrupted()) { - restart(); - } - return; - } - - exponentialBackoffTimer.reset(); - } - - paymentStreamerCursorStore.save(operationResponse.getPagingToken()); - } + ObservedPayment observedPayment = null; + try { + if (operationResponse instanceof PaymentOperationResponse) { + PaymentOperationResponse payment = (PaymentOperationResponse) operationResponse; + observedPayment = ObservedPayment.fromPaymentOperationResponse(payment); + } else if (operationResponse instanceof PathPaymentBaseOperationResponse) { + PathPaymentBaseOperationResponse pathPayment = + (PathPaymentBaseOperationResponse) operationResponse; + observedPayment = ObservedPayment.fromPathPaymentOperationResponse(pathPayment); + } + } catch (SepException ex) { + Log.warn( + String.format( + "Payment of id %s contains unsupported memo %s.", + operationResponse.getId(), + operationResponse.getTransaction().get().getMemo().toString())); + Log.warnEx(ex); + } - @Override - public void onFailure(Optional exception, Optional statusCode) { - Log.errorEx("stellar payment observer error: ", exception.get()); - // TODO: The stream seems closed when failure happens. Improve the reliability of the - // stream. + if (observedPayment == null) { + paymentStreamerCursorStore.save(operationResponse.getPagingToken()); + } else { + try { + if (paymentObservingAccountsManager.lookupAndUpdate(observedPayment.getTo())) { + for (PaymentListener listener : paymentListeners) { + listener.onReceived(observedPayment); } - }); + } + + if (paymentObservingAccountsManager.lookupAndUpdate(observedPayment.getFrom()) + && !observedPayment.getTo().equals(observedPayment.getFrom())) { + final ObservedPayment finalObservedPayment = observedPayment; + paymentListeners.forEach(observer -> observer.onSent(finalObservedPayment)); + } + + publishingBackoffTimer.reset(); + paymentStreamerCursorStore.save(operationResponse.getPagingToken()); + } catch (EventPublishException ex) { + // restart the observer from where it stopped, in case the queue fails to + // publish the message. + Log.errorEx("Failed to send event to payment listeners.", ex); + setStatus(PUBLISHER_ERROR); + } catch (Throwable t) { + Log.errorEx("Something went wrong in the observer while sending the event", t); + setStatus(PUBLISHER_ERROR); + } + } + } + + void handleFailure(Optional exception) { + // The SSEStreamer has internal errors. We will give up and let the container + // manager to + // restart. + Log.errorEx("stellar payment observer stream error: ", exception.get()); + + // Mark the observer unhealthy + setStatus(STREAM_ERROR); + } + + void setStatus(ObserverStatus status) { + debugF("Setting status to {}", status); + this.status = status; } public static Builder builder() { @@ -258,40 +403,65 @@ public List getTags() { @Override public HealthCheckResult check() { List results = new ArrayList<>(); - HealthCheckStatus status = GREEN; + + HealthCheckStatus status; + switch (this.status) { + case STREAM_ERROR: + case SILENCE_ERROR: + case PUBLISHER_ERROR: + status = YELLOW; + break; + case NEEDS_SHUTDOWN: + case SHUTDOWN: + status = RED; + break; + case RUNNING: + default: + status = GREEN; + break; + } StreamHealth.StreamHealthBuilder healthBuilder = StreamHealth.builder(); healthBuilder.account(mapStreamToAccount.get(stream)); // populate executorService information - ExecutorService executorService = getField(stream, "executorService", null); - if (executorService != null) { - healthBuilder.threadShutdown(executorService.isShutdown()); - healthBuilder.threadTerminated(executorService.isTerminated()); - if (executorService.isShutdown() || executorService.isTerminated()) { + if (stream != null) { + ExecutorService executorService = getField(stream, "executorService", null); + if (executorService != null) { + healthBuilder.threadShutdown(executorService.isShutdown()); + healthBuilder.threadTerminated(executorService.isTerminated()); + if (executorService.isShutdown() || executorService.isTerminated()) { + status = RED; + } + } else { status = RED; } - } else { - status = RED; - } - boolean isStopped = getField(stream, "isStopped", new AtomicBoolean(false)).get(); - healthBuilder.stopped(isStopped); - if (isStopped) { - status = RED; + AtomicBoolean isStopped = getField(stream, "isStopped", new AtomicBoolean(false)); + if (isStopped != null) { + healthBuilder.stopped(isStopped.get()); + if (isStopped.get()) { + status = RED; + } + } + + AtomicReference lastEventId = getField(stream, "lastEventId", null); + if (lastEventId != null && lastEventId.get() != null) { + healthBuilder.lastEventId(lastEventId.get()); + } else { + healthBuilder.lastEventId("-1"); + } } - AtomicReference lastEventId = getField(stream, "lastEventId", null); - if (lastEventId != null) { - healthBuilder.lastEventId(lastEventId.get()); + if (lastActivityTime == null) { + healthBuilder.silenceSinceLastEvent("0"); + } else { + healthBuilder.silenceSinceLastEvent( + String.valueOf(Duration.between(lastActivityTime, Instant.now()).getSeconds())); } results.add(healthBuilder.build()); - return SPOHealthCheckResult.builder() - .name(getName()) - .streams(results) - .status(status.getName()) - .build(); + return SPOHealthCheckResult.builder().name(getName()).streams(results).status(status).build(); } } @@ -301,9 +471,9 @@ public HealthCheckResult check() { class SPOHealthCheckResult implements HealthCheckResult { transient String name; - List statuses; + List statuses; - String status; + HealthCheckStatus status; List streams; @@ -324,5 +494,19 @@ class StreamHealth { boolean threadTerminated; boolean stopped; + + @SerializedName("last_event_id") String lastEventId; + + @SerializedName("seconds_since_last_event") + String silenceSinceLastEvent; +} + +enum ObserverStatus { + RUNNING, + STREAM_ERROR, + SILENCE_ERROR, + PUBLISHER_ERROR, + NEEDS_SHUTDOWN, + SHUTDOWN, } diff --git a/platform/src/main/java/org/stellar/anchor/platform/service/HealthCheckService.java b/platform/src/main/java/org/stellar/anchor/platform/service/HealthCheckService.java index 4df8dd2cba..4bb49c296b 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/service/HealthCheckService.java +++ b/platform/src/main/java/org/stellar/anchor/platform/service/HealthCheckService.java @@ -1,17 +1,22 @@ package org.stellar.anchor.platform.service; import java.util.List; +import java.util.Optional; import org.springframework.stereotype.Service; import org.stellar.anchor.api.platform.HealthCheckResponse; import org.stellar.anchor.healthcheck.HealthCheckProcessor; -import org.stellar.anchor.healthcheck.HealthCheckable; +import org.stellar.anchor.platform.payment.observer.stellar.StellarPaymentObserver; @Service public class HealthCheckService { HealthCheckProcessor processor; - public HealthCheckService(List checkables) { - processor = new HealthCheckProcessor(checkables); + public HealthCheckService(Optional stellarPaymentObserver) { + if (!stellarPaymentObserver.isEmpty()) { + processor = new HealthCheckProcessor(List.of(stellarPaymentObserver.get())); + } else { + processor = new HealthCheckProcessor(List.of()); + } } public HealthCheckResponse check(List checks) { diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/controller/HealthControllerTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/controller/HealthControllerTest.kt new file mode 100644 index 0000000000..e0a013c33f --- /dev/null +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/controller/HealthControllerTest.kt @@ -0,0 +1,72 @@ +package org.stellar.anchor.platform.controller + +import io.mockk.MockKAnnotations +import io.mockk.clearAllMocks +import io.mockk.every +import io.mockk.impl.annotations.MockK +import io.mockk.unmockkAll +import java.util.* +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.springframework.http.HttpStatus +import org.stellar.anchor.api.platform.HealthCheckResult +import org.stellar.anchor.api.platform.HealthCheckStatus +import org.stellar.anchor.api.platform.HealthCheckStatus.* +import org.stellar.anchor.platform.payment.observer.stellar.StellarPaymentObserver +import org.stellar.anchor.platform.service.HealthCheckService + +class HealthControllerTest { + @MockK private lateinit var stellarPaymentObserver: StellarPaymentObserver + + @BeforeEach + fun setup() { + MockKAnnotations.init(this, relaxed = true) + } + + @AfterEach + fun teardown() { + clearAllMocks() + unmockkAll() + } + + @Test + fun `health controller and stellar payment observer status code checks`() { + every { stellarPaymentObserver.tags } returns listOf("all", "observer") + + val healthCheckService = HealthCheckService(Optional.of(stellarPaymentObserver)) + val healthController = HealthController(healthCheckService) + + // RED should result 500 + every { stellarPaymentObserver.check() } returns PojoHealthCheckResult("observer", RED) + var response = healthController.health(listOf("all")) + assertEquals(HttpStatus.INTERNAL_SERVER_ERROR, response.statusCode) + + // GREEN should result 200 + every { stellarPaymentObserver.check() } returns PojoHealthCheckResult("observer", GREEN) + response = healthController.health(listOf("all")) + assertEquals(HttpStatus.OK, response.statusCode) + + // YELLOW should result 200 + every { stellarPaymentObserver.check() } returns PojoHealthCheckResult("observer", YELLOW) + response = healthController.health(listOf("all")) + assertEquals(HttpStatus.OK, response.statusCode) + } +} + +class PojoHealthCheckResult(private val name: String, private val status: HealthCheckStatus) : + HealthCheckResult { + + override fun name(): String { + return name + } + + override fun getStatuses(): MutableList? { + return mutableListOf(GREEN, RED) + } + + override fun getStatus(): HealthCheckStatus { + return status + } +} diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserverTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserverTest.kt index f4a22dfc78..4fc8d26b5e 100644 --- a/platform/src/test/kotlin/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserverTest.kt +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/payment/observer/stellar/StellarPaymentObserverTest.kt @@ -4,15 +4,20 @@ import com.google.gson.reflect.TypeToken import io.mockk.* import io.mockk.impl.annotations.MockK import java.io.IOException +import javax.net.ssl.SSLProtocolException import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.Assertions.* +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertNull import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.stellar.anchor.api.platform.HealthCheckStatus.RED import org.stellar.sdk.Server import org.stellar.sdk.requests.RequestBuilder +import org.stellar.sdk.requests.SSEStream import org.stellar.sdk.responses.GsonSingleton import org.stellar.sdk.responses.Page import org.stellar.sdk.responses.operations.OperationResponse +import shadow.com.google.common.base.Optional class StellarPaymentObserverTest { companion object { @@ -125,4 +130,18 @@ class StellarPaymentObserverTest { } assertEquals("4322708489777153", gotCursor) } + + @Test + fun `test if SSEStream exception will leave the observer in STREAM_ERROR state`() { + val stream: SSEStream = mockk(relaxed = true) + val observer = + spyk(StellarPaymentObserver(TEST_HORIZON_URI, null, null, paymentStreamerCursorStore)) + every { observer.startSSEStream() } returns stream + observer.start() + observer.handleFailure(Optional.of(SSLProtocolException(""))) + assertEquals(ObserverStatus.STREAM_ERROR, observer.status) + + val checkResult = observer.check() + assertEquals(RED, checkResult.status) + } }