diff --git a/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventHandler.java b/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventHandler.java index c7aef7f629..ba9abf2325 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventHandler.java +++ b/platform/src/main/java/org/stellar/anchor/platform/event/CallbackApiEventHandler.java @@ -6,7 +6,6 @@ import org.stellar.anchor.api.callback.SendEventRequest; import org.stellar.anchor.api.callback.SendEventResponse; import org.stellar.anchor.api.event.AnchorEvent; -import org.stellar.anchor.api.exception.AnchorException; import org.stellar.anchor.api.exception.InvalidConfigException; import org.stellar.anchor.apiclient.CallbackApiClient; import org.stellar.anchor.platform.config.CallbackApiConfig; @@ -33,7 +32,7 @@ boolean handleEvent(AnchorEvent event) throws IOException { sendEventResponse.getCode(), sendEventResponse.getMessage()); return true; - } catch (AnchorException e) { + } catch (Exception e) { errorEx("Failed to send event to callback API. Error code: {}", e); return false; } diff --git a/platform/src/main/java/org/stellar/anchor/platform/event/KafkaSession.java b/platform/src/main/java/org/stellar/anchor/platform/event/KafkaSession.java index b7b1f47b2f..837cb46e5f 100644 --- a/platform/src/main/java/org/stellar/anchor/platform/event/KafkaSession.java +++ b/platform/src/main/java/org/stellar/anchor/platform/event/KafkaSession.java @@ -10,6 +10,7 @@ import static org.stellar.anchor.util.Log.debugF; import static org.stellar.anchor.util.StringHelper.isEmpty; +import com.google.gson.JsonSyntaxException; import io.micrometer.core.instrument.Metrics; import java.io.IOException; import java.time.Duration; @@ -54,12 +55,12 @@ public KafkaSession(KafkaConfig kafkaConfig, String sessionName, EventQueue queu this.sessionName = sessionName; this.topic = queue.name(); - if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL) { - // If the keystore and truststore files exist, use them, otherwise, use the resources - sslKeystoreLocation = - findFileThenResource(kafkaConfig.getSslKeystoreLocation()).getAbsolutePath(); - sslTruststoreLocation = - findFileThenResource(kafkaConfig.getSslTruststoreLocation()).getAbsolutePath(); + if (kafkaConfig.getSslVerifyCert()) { + if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL) { + // If the keystore and truststore files exist, use them, otherwise, use the resources + sslKeystoreLocation = find(kafkaConfig.getSslKeystoreLocation()); + sslTruststoreLocation = find(kafkaConfig.getSslTruststoreLocation()); + } } } @@ -108,9 +109,17 @@ public EventService.ReadResponse read() throws AnchorException { } else { Log.infoF("Received {} Kafka records", consumerRecords.count()); for (ConsumerRecord record : consumerRecords) { - AnchorEvent deserialized = - GsonUtils.getInstance().fromJson(record.value(), AnchorEvent.class); - events.add(deserialized); + try { + AnchorEvent deserialized = + GsonUtils.getInstance().fromJson(record.value(), AnchorEvent.class); + if (deserialized.getType() == null) { + throw new EventPublishException("null event type"); + } + events.add(deserialized); + } catch (JsonSyntaxException | AnchorException ex) { + Log.debugF( + "Skipping mal-formatted event from Kafka. ex={}, message={}", ex, ex.getMessage()); + } } // TOOD: emit metrics here. } @@ -258,4 +267,8 @@ void configureAuth(Properties props) throws InvalidConfigException { throw new IllegalStateException("Unexpected value: " + kafkaConfig.getSecurityProtocol()); } } + + String find(String sslKeystoreLocation) throws IOException { + return findFileThenResource(sslKeystoreLocation).getAbsolutePath(); + } } diff --git a/platform/src/test/kotlin/org/stellar/anchor/platform/event/KafkaSessionTest.kt b/platform/src/test/kotlin/org/stellar/anchor/platform/event/KafkaSessionTest.kt index b390ce1d14..2af9ebc451 100644 --- a/platform/src/test/kotlin/org/stellar/anchor/platform/event/KafkaSessionTest.kt +++ b/platform/src/test/kotlin/org/stellar/anchor/platform/event/KafkaSessionTest.kt @@ -4,6 +4,7 @@ import io.mockk.MockKAnnotations import io.mockk.every import io.mockk.impl.annotations.MockK import io.mockk.spyk +import io.mockk.verify import java.util.* import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG import org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM @@ -14,7 +15,6 @@ import org.junit.jupiter.api.Test import org.junit.jupiter.api.extension.ExtendWith import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.ValueSource -import org.springframework.context.ConfigurableApplicationContext import org.stellar.anchor.LockAndMockStatic import org.stellar.anchor.LockAndMockTest import org.stellar.anchor.event.EventService.EventQueue @@ -23,6 +23,7 @@ import org.stellar.anchor.platform.config.KafkaConfig.SecurityProtocol.PLAINTEXT import org.stellar.anchor.platform.config.PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_PASSWORD import org.stellar.anchor.platform.config.PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_USERNAME import org.stellar.anchor.platform.configurator.SecretManager +import org.stellar.anchor.platform.utils.ResourceHelper import org.stellar.anchor.platform.utils.TrustAllSslEngineFactory @ExtendWith(LockAndMockTest::class) @@ -34,7 +35,6 @@ class KafkaSessionTest { @MockK(relaxed = true) lateinit var kafkaConfig: KafkaConfig @MockK(relaxed = true) lateinit var eventQueue: EventQueue - @MockK(relaxed = true) lateinit var appContext: ConfigurableApplicationContext private lateinit var kafkaSession: KafkaSession @BeforeEach @@ -71,7 +71,7 @@ class KafkaSessionTest { ) } - @LockAndMockStatic([SecretManager::class]) + @LockAndMockStatic([SecretManager::class, ResourceHelper::class]) @ParameterizedTest @ValueSource(booleans = [true, false]) fun `test security-protocol SASL_SSL `(sslVerifyCert: Boolean) { @@ -102,6 +102,9 @@ class KafkaSessionTest { assertEquals("", properties.getProperty(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)) val cls = properties.get(SSL_ENGINE_FACTORY_CLASS_CONFIG) as Class assertEquals("org.stellar.anchor.platform.utils.TrustAllSslEngineFactory", cls.name) + + // make sure we don't look for keystore and truststore + verify(exactly = 0) { kafkaSession.find(any()) } } } }