Skip to content

Commit

Permalink
[ANCHOR-858] Do not require keystore and truststore if ssl_cert_verif…
Browse files Browse the repository at this point in the history
…y is false (#1554)

### Description

Do not search for the `keystore` and `truststore` files if
`ssl_cert_verify` is to `false`.

When ` ssl_cert_verify` is set to `false`, the `KafkaSession` still look
for the file and cause IOException.
  • Loading branch information
lijamie98 authored Oct 28, 2024
2 parents 755e2c6 + 33f28d1 commit 5e1e174
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import org.stellar.anchor.api.callback.SendEventRequest;
import org.stellar.anchor.api.callback.SendEventResponse;
import org.stellar.anchor.api.event.AnchorEvent;
import org.stellar.anchor.api.exception.AnchorException;
import org.stellar.anchor.api.exception.InvalidConfigException;
import org.stellar.anchor.apiclient.CallbackApiClient;
import org.stellar.anchor.platform.config.CallbackApiConfig;
Expand All @@ -33,7 +32,7 @@ boolean handleEvent(AnchorEvent event) throws IOException {
sendEventResponse.getCode(),
sendEventResponse.getMessage());
return true;
} catch (AnchorException e) {
} catch (Exception e) {
errorEx("Failed to send event to callback API. Error code: {}", e);
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static org.stellar.anchor.util.Log.debugF;
import static org.stellar.anchor.util.StringHelper.isEmpty;

import com.google.gson.JsonSyntaxException;
import io.micrometer.core.instrument.Metrics;
import java.io.IOException;
import java.time.Duration;
Expand Down Expand Up @@ -54,12 +55,12 @@ public KafkaSession(KafkaConfig kafkaConfig, String sessionName, EventQueue queu
this.sessionName = sessionName;
this.topic = queue.name();

if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL) {
// If the keystore and truststore files exist, use them, otherwise, use the resources
sslKeystoreLocation =
findFileThenResource(kafkaConfig.getSslKeystoreLocation()).getAbsolutePath();
sslTruststoreLocation =
findFileThenResource(kafkaConfig.getSslTruststoreLocation()).getAbsolutePath();
if (kafkaConfig.getSslVerifyCert()) {
if (kafkaConfig.getSecurityProtocol() == KafkaConfig.SecurityProtocol.SASL_SSL) {
// If the keystore and truststore files exist, use them, otherwise, use the resources
sslKeystoreLocation = find(kafkaConfig.getSslKeystoreLocation());
sslTruststoreLocation = find(kafkaConfig.getSslTruststoreLocation());
}
}
}

Expand Down Expand Up @@ -108,9 +109,17 @@ public EventService.ReadResponse read() throws AnchorException {
} else {
Log.infoF("Received {} Kafka records", consumerRecords.count());
for (ConsumerRecord<String, String> record : consumerRecords) {
AnchorEvent deserialized =
GsonUtils.getInstance().fromJson(record.value(), AnchorEvent.class);
events.add(deserialized);
try {
AnchorEvent deserialized =
GsonUtils.getInstance().fromJson(record.value(), AnchorEvent.class);
if (deserialized.getType() == null) {
throw new EventPublishException("null event type");
}
events.add(deserialized);
} catch (JsonSyntaxException | AnchorException ex) {
Log.debugF(
"Skipping mal-formatted event from Kafka. ex={}, message={}", ex, ex.getMessage());
}
}
// TOOD: emit metrics here.
}
Expand Down Expand Up @@ -258,4 +267,8 @@ void configureAuth(Properties props) throws InvalidConfigException {
throw new IllegalStateException("Unexpected value: " + kafkaConfig.getSecurityProtocol());
}
}

String find(String sslKeystoreLocation) throws IOException {
return findFileThenResource(sslKeystoreLocation).getAbsolutePath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import io.mockk.MockKAnnotations
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.spyk
import io.mockk.verify
import java.util.*
import org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG
import org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM
Expand All @@ -14,7 +15,6 @@ import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.springframework.context.ConfigurableApplicationContext
import org.stellar.anchor.LockAndMockStatic
import org.stellar.anchor.LockAndMockTest
import org.stellar.anchor.event.EventService.EventQueue
Expand All @@ -23,6 +23,7 @@ import org.stellar.anchor.platform.config.KafkaConfig.SecurityProtocol.PLAINTEXT
import org.stellar.anchor.platform.config.PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_PASSWORD
import org.stellar.anchor.platform.config.PropertySecretConfig.SECRET_EVENTS_QUEUE_KAFKA_USERNAME
import org.stellar.anchor.platform.configurator.SecretManager
import org.stellar.anchor.platform.utils.ResourceHelper
import org.stellar.anchor.platform.utils.TrustAllSslEngineFactory

@ExtendWith(LockAndMockTest::class)
Expand All @@ -34,7 +35,6 @@ class KafkaSessionTest {

@MockK(relaxed = true) lateinit var kafkaConfig: KafkaConfig
@MockK(relaxed = true) lateinit var eventQueue: EventQueue
@MockK(relaxed = true) lateinit var appContext: ConfigurableApplicationContext
private lateinit var kafkaSession: KafkaSession

@BeforeEach
Expand Down Expand Up @@ -71,7 +71,7 @@ class KafkaSessionTest {
)
}

@LockAndMockStatic([SecretManager::class])
@LockAndMockStatic([SecretManager::class, ResourceHelper::class])
@ParameterizedTest
@ValueSource(booleans = [true, false])
fun `test security-protocol SASL_SSL `(sslVerifyCert: Boolean) {
Expand Down Expand Up @@ -102,6 +102,9 @@ class KafkaSessionTest {
assertEquals("", properties.getProperty(SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG))
val cls = properties.get(SSL_ENGINE_FACTORY_CLASS_CONFIG) as Class<TrustAllSslEngineFactory>
assertEquals("org.stellar.anchor.platform.utils.TrustAllSslEngineFactory", cls.name)

// make sure we don't look for keystore and truststore
verify(exactly = 0) { kafkaSession.find(any()) }
}
}
}

0 comments on commit 5e1e174

Please sign in to comment.