diff --git a/quine-endpoints/src/main/scala/com/thatdot/quine/routes/StandingQueryRoutes.scala b/quine-endpoints/src/main/scala/com/thatdot/quine/routes/StandingQueryRoutes.scala index 28e0a42a..0e3a9c19 100644 --- a/quine-endpoints/src/main/scala/com/thatdot/quine/routes/StandingQueryRoutes.scala +++ b/quine-endpoints/src/main/scala/com/thatdot/quine/routes/StandingQueryRoutes.scala @@ -156,7 +156,11 @@ object StandingQueryResultOutputUserDef { final case class WriteToKafka( topic: String, bootstrapServers: String, - format: OutputFormat = OutputFormat.JSON + format: OutputFormat = OutputFormat.JSON, + @docs( + "Map of Kafka producer properties. See " + ) + kafkaProperties: Map[String, String] = Map.empty[String, String] ) extends StandingQueryResultOutputUserDef @unnamed diff --git a/quine/src/main/scala/com/thatdot/quine/app/Recipe.scala b/quine/src/main/scala/com/thatdot/quine/app/Recipe.scala index 9636c8b8..d76e828f 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/Recipe.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/Recipe.scala @@ -122,11 +122,11 @@ object Recipe { ( url.subs ).map(PostToEndpoint(_, parallelism, onlyPositiveMatchData)) - case WriteToKafka(topic, bootstrapServers, format) => + case WriteToKafka(topic, bootstrapServers, format, properties) => ( topic.subs, bootstrapServers.subs - ).mapN(WriteToKafka(_, _, format)) + ).mapN(WriteToKafka(_, _, format, properties)) case WriteToSNS(credentialsOpt, regionOpt, topic) => ( credentialsOpt.traverse(_.subs), diff --git a/quine/src/main/scala/com/thatdot/quine/app/StandingQueryResultOutput.scala b/quine/src/main/scala/com/thatdot/quine/app/StandingQueryResultOutput.scala index 805bffaa..3690df39 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/StandingQueryResultOutput.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/StandingQueryResultOutput.scala @@ -134,13 +134,14 @@ object StandingQueryResultOutput extends LazyLogging { }(system.dispatcher) } - case WriteToKafka(topic, bootstrapServers, format) => + case WriteToKafka(topic, bootstrapServers, format, properties) => val settings = ProducerSettings( graph.system, new ByteArraySerializer, new ByteArraySerializer ).withBootstrapServers(bootstrapServers) - + .withProperties(properties) + logger.info(s"Writing to kafka with properties $properties") serialized(name, format, graph) .map(bytes => ProducerMessage.single(new ProducerRecord[Array[Byte], Array[Byte]](topic, bytes))) .via(KafkaProducer.flexiFlow(settings).named(s"sq-output-kafka-producer-for-$name")) diff --git a/quine/src/main/scala/com/thatdot/quine/app/ingest/KafkaSrcDef.scala b/quine/src/main/scala/com/thatdot/quine/app/ingest/KafkaSrcDef.scala index fcca48d2..47de50de 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/ingest/KafkaSrcDef.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/ingest/KafkaSrcDef.scala @@ -121,8 +121,8 @@ object KafkaSrcDef { ) val complaintsFromValidator: ValidatedNel[String, Unit] = - KafkaSettingsValidator(consumerSettings.properties) - .validate(assumeConfigIsFinal = true) + KafkaSettingsValidator + .validateInput(consumerSettings.properties, assumeConfigIsFinal = true) .toInvalid(()) complaintsFromValidator.map { _ => diff --git a/quine/src/main/scala/com/thatdot/quine/app/ingest/util/KafkaSettingsValidator.scala b/quine/src/main/scala/com/thatdot/quine/app/ingest/util/KafkaSettingsValidator.scala index 4550ca60..ffc1e0d3 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/ingest/util/KafkaSettingsValidator.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/ingest/util/KafkaSettingsValidator.scala @@ -9,82 +9,59 @@ import cats.data.NonEmptyList import com.typesafe.scalalogging.LazyLogging import org.apache.kafka.clients.CommonClientConfigs import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.config.ConfigDef +import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigValue} -import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.{ErrorString, underlyingValidator} +import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.ErrorString import com.thatdot.quine.routes.KafkaIngest.KafkaProperties import com.thatdot.quine.routes.KafkaOffsetCommitting -case class KafkaSettingsValidator( - properties: KafkaProperties, - explicitGroupId: Option[String] = None, - explicitOffsetCommitting: Option[KafkaOffsetCommitting] = None -) extends LazyLogging { +object KafkaSettingsValidator extends LazyLogging { + type ErrorString = String - /** Variables that have analogues in kafka properties. Settings in both properties - * and the direct setting via the api should generate errors. Use this when the - * setting must be provided via EITHER the API or the properties object, but not - * both - */ - private def findConflict( - keys: Set[String], - ingestField: Option[_] - ): Option[ErrorString] = ingestField match { - case Some(_) => - val usedKeys: Set[ErrorString] = properties.keySet.intersect(keys) - if (usedKeys.nonEmpty) Some(f"Property value conflicts with property ${usedKeys.mkString(",")}") else None - case _ => None + private def underlyingValidator[C <: AbstractConfig](c: Class[C]): ConfigDef = Try { + val config: Field = c.getDeclaredField("CONFIG") + config.setAccessible(true) + config.get(null).asInstanceOf[ConfigDef] + } match { + case Failure(e) => + // Should be impossible. + logger.error( + s"Expected Kafka settings validator to be available at ${c.getName}.CONFIG -- " + + s"did you override your classpath with a custom kafka JAR? Kafka config validation will now fail." + ) + throw e + case Success(validator) => validator } - private def disallowSubstring(forbiddenValue: String)(key: String): Option[ErrorString] = - if (properties.get(key).exists((userSetValue: String) => userSetValue.contains(forbiddenValue))) - Some(s"$key may not be set to: ${properties(key)}, as it contains: $forbiddenValue") - else None - - /** Field conflicts with an explicitly set property on the ingest. Use this when - * the setting MUST be provided via the API - */ - private def disallowField(key: String, errorString: String): Option[ErrorString] = - if (properties.keySet.contains(key)) Some(s"$key is not allowed in the kafkaProperties Map. $errorString") else None - /** Will return error strings or None. * If [[assumeConfigIsFinal]] is true, the properties will also be checked against kafka's internal property * validator (additional checks include things like verifying that values fall within enumerated options and that * all required fields to construct a Kafka Consumer are present) */ - def validate(assumeConfigIsFinal: Boolean = false): Option[NonEmptyList[ErrorString]] = { - val underlyingKnownKeys: Set[String] = underlyingValidator match { - case Failure(e) => - // Should be impossible. - logger.error( - s"Expected Kafka settings validator to be available at ${classOf[ConsumerConfig].getName}.CONFIG -- " + - s"did you override your classpath with a custom kafka JAR? Kafka config validation will now fail." - ) - throw e - case Success(validator) => validator.configKeys.values.asScala.map(_.name).toSet - } - val validator: ConfigDef = underlyingValidator.get - - val unrecognizedPropertiesError: List[String] = properties.keySet.diff(underlyingKnownKeys) match { - case s if s.isEmpty => Nil - case s @ _ => - List(s"Unrecognized properties: ${s.mkString(",")}") - } + def validateInput( + properties: KafkaProperties, + explicitGroupId: Option[String] = None, + explicitOffsetCommitting: Option[KafkaOffsetCommitting] = None, + assumeConfigIsFinal: Boolean = false + ): Option[NonEmptyList[String]] = { + val v = new KafkaSettingsValidator(underlyingValidator(classOf[ConsumerConfig]), properties) /* - these values have no direct analogues in Kafka settings: + these values have no direct analogues in Kafka settings: - - parallelism: Int - - ingest.topics - - ingest.format + - parallelism: Int + - ingest.topics + - ingest.format */ + val errors: Seq[String] = if (assumeConfigIsFinal) { // config is already merged, so we can rely on the kafka-provided validator for any errors for { - validatedConfigEntry <- validator.validate(properties.asJava).asScala.toVector + validatedConfigEntry <- v.underlyingValues configName = validatedConfigEntry.name() // TODO why does a finalized config not have key.deserializer set? // Does pekko tack it on in settings.consumerFactory? @@ -94,62 +71,124 @@ case class KafkaSettingsValidator( } yield s"Error in Kafka setting $configName: $err" } else { // config is not yet merged (multiple sources of truth), so we can look for conflicts between the parts of config - ( - List( - findConflict(Set(CommonClientConfigs.GROUP_ID_CONFIG), explicitGroupId), - findConflict( - Set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), - Some(explicitOffsetCommitting) - ), - //boostrap servers is mandatory on ingest. If it is set in properties that's a conflict - disallowField( - CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, - "Please use the Kafka ingest `bootstrapServers` field." - ), - disallowField( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, - "Please use one of the `format` field cypher options, which rely on their hard-coded deserializers." - ), - disallowField( - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, - "Please use one of the `format` field cypher options, which rely on their hard-coded deserializers." - ), - disallowField( - CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, - "Please use the Kafka ingest `securityProtocol` field." - ), - disallowField( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, - "Please use the Kafka ingest `autoOffsetReset` field." - ) - ) ::: { // Conservative fix for CVE-2023-25194: disable keys including ${SaslConfigs.SASL_JAAS_CONFIG} - val forbiddenJaasModule = "com.sun.security.auth.module.JndiLoginModule" - (disallowSubstring(forbiddenJaasModule)(SASL_JAAS_CONFIG) :: List( - // these 3 config scopes may allow "overrides" -- the security advisory at https://archive.ph/P6q2A - // recommends blacklisting the `override` subkey for each scope. These are already considered - // invalid by `unrecognizedProperties`, but better safe than sorry. - "producer", - "consumer", - "admin" - ) - .map(scope => s"$scope.override.$SASL_JAAS_CONFIG") - .map(disallowSubstring(forbiddenJaasModule))) - } + List( + v.findConflict(Set(CommonClientConfigs.GROUP_ID_CONFIG), explicitGroupId), + v.findConflict( + Set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), + Some(explicitOffsetCommitting) + ), + //boostrap servers is mandatory on ingest. If it is set in properties that's a conflict + v.disallowField( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + "Please use the Kafka ingest `bootstrapServers` field." + ), + v.disallowField( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "Please use one of the `format` field cypher options, which rely on their hard-coded deserializers." + ), + v.disallowField( + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "Please use one of the `format` field cypher options, which rely on their hard-coded deserializers." + ), + v.disallowField( + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, + "Please use the Kafka ingest `securityProtocol` field." + ), + v.disallowField( + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, + "Please use the Kafka ingest `autoOffsetReset` field." + ), + // + // --- if any of these keys points to something containing "com.sun.security.auth.module.JndiLoginModule" + // + // Conservative fix for CVE-2023-25194: disable keys including ${SaslConfigs.SASL_JAAS_CONFIG} + v.disallowJaasSubstring(SASL_JAAS_CONFIG), + // these 3 config scopes may allow "overrides" -- the security advisory at https://archive.ph/P6q2A + // recommends blacklisting the `override` subkey for each scope. These are already considered + // invalid by `unrecognizedProperties`, but better safe than sorry. + v.disallowJaasSubstring(s"producer.override.$SASL_JAAS_CONFIG"), + v.disallowJaasSubstring(s"consumer.override.$SASL_JAAS_CONFIG"), + v.disallowJaasSubstring(s"admin.override.$SASL_JAAS_CONFIG") ).flatten } - NonEmptyList.fromList(unrecognizedPropertiesError ++ errors) + v.withUnrecognizedErrors(errors) } + def validateOutput(properties: KafkaProperties): Option[NonEmptyList[String]] = { + val v = new KafkaSettingsValidator(underlyingValidator(classOf[ProducerConfig]), properties) + + val errors: Seq[ErrorString] = List( + //boostrap servers is mandatory. If it is set in properties that's a conflict + v.disallowField( + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, + "Please use the result output `bootstrapServers` field." + ), + v.disallowField( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "Please use one of the `format` field cypher options, which rely on their hard-coded deserializers." + ), + // + // --- if any of these keys points to something containing "com.sun.security.auth.module.JndiLoginModule" + // + // Conservative fix for CVE-2023-25194: disable keys including ${SaslConfigs.SASL_JAAS_CONFIG} + v.disallowJaasSubstring(SASL_JAAS_CONFIG), + // these 3 config scopes may allow "overrides" -- the security advisory at https://archive.ph/P6q2A + // recommends blacklisting the `override` subkey for each scope. These are already considered + // invalid by `unrecognizedProperties`, but better safe than sorry. + v.disallowJaasSubstring(s"producer.override.$SASL_JAAS_CONFIG"), + v.disallowJaasSubstring(s"consumer.override.$SASL_JAAS_CONFIG"), + v.disallowJaasSubstring(s"admin.override.$SASL_JAAS_CONFIG") + ).flatten + + v.withUnrecognizedErrors(errors) + } } -object KafkaSettingsValidator { - type ErrorString = String +class KafkaSettingsValidator( + validator: ConfigDef, + properties: KafkaProperties +) extends LazyLogging { + + private val underlyingKnownKeys: Set[String] = validator.configKeys.values.asScala.map(_.name).toSet + def underlyingValues: Seq[ConfigValue] = validator.validate(properties.asJava).asScala.toVector - // Reflectively load the ConfigDef at ConsumerConfig.CONFIG, which is what Kafka uses for validation - val underlyingValidator: Try[ConfigDef] = Try { - val consumerConfig: Field = classOf[ConsumerConfig].getDeclaredField("CONFIG") - consumerConfig.setAccessible(true) - consumerConfig.get(null).asInstanceOf[ConfigDef] + /** Variables that have analogues in kafka properties. Settings in both properties + * and the direct setting via the api should generate errors. Use this when the + * setting must be provided via EITHER the API or the properties object, but not + * both + */ + protected def findConflict( + keys: Set[String], + ingestField: Option[_] + ): Option[ErrorString] = ingestField match { + case Some(_) => + val usedKeys: Set[ErrorString] = properties.keySet.intersect(keys) + if (usedKeys.nonEmpty) Some(f"Property value conflicts with property ${usedKeys.mkString(",")}") else None + case _ => None + } + + protected def disallowJaasSubstring(key: String): Option[ErrorString] = { + val forbiddenJaasModule = "com.sun.security.auth.module.JndiLoginModule" + if (properties.get(key).exists((userSetValue: String) => userSetValue.contains(forbiddenJaasModule))) + Some(s"$key may not be set to: ${properties(key)}, as it contains: $forbiddenJaasModule") + else None } + + /** Field conflicts with an explicitly set property on the ingest. Use this when + * the setting MUST be provided via the API + */ + + protected def disallowField(key: String, errorString: String): Option[ErrorString] = + if (properties.keySet.contains(key)) Some(s"$key is not allowed in the kafkaProperties Map. $errorString") else None + + val unrecognizedPropertiesError: List[String] = properties.keySet.diff(underlyingKnownKeys) match { + case s if s.isEmpty => Nil + case s @ _ => + List(s"Unrecognized properties: ${s.mkString(",")}") + } + + def withUnrecognizedErrors(errors: Seq[String]): Option[NonEmptyList[String]] = + NonEmptyList.fromList(unrecognizedPropertiesError ++ errors) + } diff --git a/quine/src/main/scala/com/thatdot/quine/app/routes/IngestRoutesImpl.scala b/quine/src/main/scala/com/thatdot/quine/app/routes/IngestRoutesImpl.scala index e4e8a6e0..158f0b03 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/routes/IngestRoutesImpl.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/routes/IngestRoutesImpl.scala @@ -82,7 +82,7 @@ final private[thatdot] case class IngestStreamWithControl[+Conf]( case Some(Success(Done)) => IngestStreamStatus.Completed case Some(Failure(e)) => // If exception occurs, it means that the ingest stream has failed - logger.warn(s"Ingest stream: ${settings} failed.", e) + logger.warn(s"Ingest stream: $settings failed.", e) IngestStreamStatus.Failed case None => IngestStreamStatus.Running } @@ -231,7 +231,11 @@ trait IngestRoutesImpl ingestStreamStart.implementedBy { case (ingestName, namespaceParam, settings: KafkaIngest) => val namespace = namespaceFromParam(namespaceParam) - KafkaSettingsValidator(settings.kafkaProperties, settings.groupId, settings.offsetCommitting).validate() match { + KafkaSettingsValidator.validateInput( + settings.kafkaProperties, + settings.groupId, + settings.offsetCommitting + ) match { case Some(errors) => http400( endpoints4s.Invalid( @@ -250,7 +254,7 @@ trait IngestRoutesImpl private val ingestStreamStopRoute = ingestStreamStop.implementedByAsync { case (ingestName, namespaceParam) => quineApp.removeIngestStream(ingestName, namespaceFromParam(namespaceParam)) match { case None => Future.successful(None) - case Some(control @ IngestStreamWithControl(settings, metrics, valve @ _, terminated, close, _, _)) => + case Some(control @ IngestStreamWithControl(settings, metrics, _, terminated, close, _, _)) => val finalStatus = control.status.map { previousStatus => import IngestStreamStatus._ previousStatus match { @@ -343,9 +347,9 @@ trait IngestRoutesImpl private def mkPauseOperationError(operation: String): PartialFunction[Throwable, Either[Invalid, Nothing]] = { case _: StreamDetachedException => // A StreamDetachedException always occurs when the ingest has failed - Left(endpoints4s.Invalid(s"Cannot ${operation} a failed ingest.")) + Left(endpoints4s.Invalid(s"Cannot $operation a failed ingest.")) case e: PauseOperationException => - Left(endpoints4s.Invalid(s"Cannot ${operation} a ${e.statusMsg} ingest.")) + Left(endpoints4s.Invalid(s"Cannot $operation a ${e.statusMsg} ingest.")) } private val ingestStreamPauseRoute = ingestStreamPause.implementedByAsync { case (ingestName, namespaceParam) => diff --git a/quine/src/main/scala/com/thatdot/quine/app/routes/StandingQueryRoutesImpl.scala b/quine/src/main/scala/com/thatdot/quine/app/routes/StandingQueryRoutesImpl.scala index 0c4f9c6f..0d50a69b 100644 --- a/quine/src/main/scala/com/thatdot/quine/app/routes/StandingQueryRoutesImpl.scala +++ b/quine/src/main/scala/com/thatdot/quine/app/routes/StandingQueryRoutesImpl.scala @@ -11,9 +11,12 @@ import org.apache.pekko.stream.scaladsl.{Flow, Sink} import org.apache.pekko.stream.{Materializer, OverflowStrategy} import org.apache.pekko.util.Timeout +import cats.data.NonEmptyList import endpoints4s.{Invalid, Valid} import com.thatdot.quine.app.NamespaceNotFoundException +import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator +import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.ErrorString import com.thatdot.quine.graph.cypher.CypherException import com.thatdot.quine.graph.{ InvalidQueryPattern, @@ -22,6 +25,7 @@ import com.thatdot.quine.graph.{ StandingQueryOpsGraph, StandingQueryResult } +import com.thatdot.quine.routes.StandingQueryResultOutputUserDef.WriteToKafka import com.thatdot.quine.routes._ trait StandingQueryStore { @@ -66,6 +70,12 @@ trait StandingQueryRoutesImpl def quineApp: StandingQueryStore + private def validateOutputDef(outputDef: StandingQueryResultOutputUserDef): Option[NonEmptyList[ErrorString]] = + outputDef match { + case k: WriteToKafka => KafkaSettingsValidator.validateOutput(k.kafkaProperties) + case _ => None + } + private val standingIssueRoute = standingIssue.implementedByAsync { case (name, namespaceParam, query) => try quineApp .addStandingQuery(name, namespaceFromParam(namespaceParam), query) @@ -95,15 +105,22 @@ trait StandingQueryRoutesImpl } private val standingAddOutRoute = standingAddOut.implementedByAsync { + case (name, outputName, namespaceParam, sqResultOutput) => - quineApp - .addStandingQueryOutput(name, outputName, namespaceFromParam(namespaceParam), sqResultOutput) - .map { - _.map { - case false => Left(endpoints4s.Invalid(s"There is already a standing query output named '$outputName'")) - case true => Right(()) - } - }(graph.shardDispatcherEC) + validateOutputDef(sqResultOutput) match { + case Some(errors) => + Future.successful(Some(Left(Invalid(s"Cannot create output `$outputName`: ${errors.toList.mkString(",")}")))) + + case None => + quineApp + .addStandingQueryOutput(name, outputName, namespaceFromParam(namespaceParam), sqResultOutput) + .map { + _.map { + case false => Left(endpoints4s.Invalid(s"There is already a standing query output named '$outputName'")) + case true => Right(()) + } + }(graph.shardDispatcherEC) + } } private val standingGetWebsocketRoute = diff --git a/quine/src/test/scala/com/thatdot/quine/app/ingest/KafkaSettingsValidatorTest.scala b/quine/src/test/scala/com/thatdot/quine/app/ingest/KafkaSettingsValidatorTest.scala index 35604e72..37d837d1 100644 --- a/quine/src/test/scala/com/thatdot/quine/app/ingest/KafkaSettingsValidatorTest.scala +++ b/quine/src/test/scala/com/thatdot/quine/app/ingest/KafkaSettingsValidatorTest.scala @@ -11,81 +11,96 @@ import com.thatdot.quine.routes.KafkaOffsetCommitting.ExplicitCommit */ class KafkaSettingsValidatorTest extends AnyFunSuite { - test("Underlying Kafka ConfigDef is accessible") { - KafkaSettingsValidator.underlyingValidator.isSuccess + test("empty input settings map accepted") { + assert(KafkaSettingsValidator.validateInput(Map()).isEmpty) } - test("empty settings map accepted") { - assert(KafkaSettingsValidator(Map()).validate().isEmpty) - } - test("final empty settings map accepted") { - assert(KafkaSettingsValidator(Map()).validate(assumeConfigIsFinal = true).isEmpty) + test("final empty input settings map accepted") { + assert(KafkaSettingsValidator.validateInput(Map(), assumeConfigIsFinal = true).isEmpty) } - test("Unrecognized setting disallowed") { + test("Unrecognized input setting disallowed") { assert( - KafkaSettingsValidator(Map("Unrecognized.property.name" -> "anything")).validate().get.size == 1 + KafkaSettingsValidator.validateInput(Map("Unrecognized.property.name" -> "anything")).get.size == 1 ) } - test("Conflicting settings disallowed") { + test("Conflicting input settings disallowed") { //group.id assert( - KafkaSettingsValidator(Map("group.id" -> "a"), explicitGroupId = Some("group")) - .validate() - .get - .size == 1 + KafkaSettingsValidator.validateInput(Map("group.id" -> "a"), explicitGroupId = Some("group")).get.size == 1 ) //enable.auto.commit assert( - KafkaSettingsValidator( - Map("enable.auto.commit" -> "a"), - explicitOffsetCommitting = Some(ExplicitCommit(1000, 1000, 1100)) - ).validate(false).get.size == 1 + KafkaSettingsValidator + .validateInput( + Map("enable.auto.commit" -> "a"), + explicitOffsetCommitting = Some(ExplicitCommit(1000, 1000, 1100)) + ) + .get + .size == 1 ) //auto.commit.interval.ms assert( - KafkaSettingsValidator( - Map("auto.commit.interval.ms" -> "true"), - explicitOffsetCommitting = Some(ExplicitCommit(1000, 1000, 1100)) - ).validate(false).get.size == 1 + KafkaSettingsValidator + .validateInput( + Map("auto.commit.interval.ms" -> "true"), + explicitOffsetCommitting = Some(ExplicitCommit(1000, 1000, 1100)) + ) + .get + .size == 1 ) } - test("Unsupported settings disallowed") { + test("Unsupported input settings disallowed") { //value.deserializer - assert(KafkaSettingsValidator(Map("value.deserializer" -> "a")).validate(false).get.size == 1) + assert(KafkaSettingsValidator.validateInput(Map("value.deserializer" -> "a")).get.size == 1) //bootstrap.servers - assert(KafkaSettingsValidator(Map("bootstrap.servers" -> "a")).validate(false).get.size == 1) + assert(KafkaSettingsValidator.validateInput(Map("bootstrap.servers" -> "a")).get.size == 1) //security.protocol - assert(KafkaSettingsValidator(Map("security.protocol" -> "a")).validate(false).get.size == 1) + assert(KafkaSettingsValidator.validateInput(Map("security.protocol" -> "a")).get.size == 1) + + //completely made up + assert(KafkaSettingsValidator.validateInput(Map("my.super.cool.property" -> "false")).get.size == 1) + + } + test("Unsupported output settings disallowed") { + //value.deserializer + assert(KafkaSettingsValidator.validateOutput(Map("value.deserializer" -> "a")).get.size == 1) + + //bootstrap.servers + assert(KafkaSettingsValidator.validateOutput(Map("bootstrap.servers" -> "a")).get.size == 1) //completely made up - assert(KafkaSettingsValidator(Map("my.super.cool.property" -> "false")).validate(false).get.size == 1) + assert(KafkaSettingsValidator.validateOutput(Map("my.super.cool.property" -> "false")).get.size == 1) } test("non-member settings disallowed") { - assert(KafkaSettingsValidator(Map("auto.offset.reset" -> "a")).validate(false).get.size == 1) + assert(KafkaSettingsValidator.validateOutput(Map("auto.offset.reset" -> "a")).get.size == 1) } test("SSL selections allowed") { // truststore assert( - KafkaSettingsValidator( - Map("ssl.truststore.location" -> "alpha", "ssl.truststore.password" -> "beta") - ).validate(false).isEmpty + KafkaSettingsValidator + .validateInput( + Map("ssl.truststore.location" -> "alpha", "ssl.truststore.password" -> "beta") + ) + .isEmpty ) // keystore assert( - KafkaSettingsValidator( - Map("ssl.keystore.location" -> "gamma", "ssl.keystore.password" -> "delta") - ).validate(false).isEmpty + KafkaSettingsValidator + .validateInput( + Map("ssl.keystore.location" -> "gamma", "ssl.keystore.password" -> "delta") + ) + .isEmpty ) // key - assert(KafkaSettingsValidator(Map("ssl.key.password" -> "epsilon")).validate(false).isEmpty) + assert(KafkaSettingsValidator.validateInput(Map("ssl.key.password" -> "epsilon")).isEmpty) } test("Spooky SASL selections disallowed") { // CVE-2023-25194 @@ -99,8 +114,9 @@ class KafkaSettingsValidatorTest extends AnyFunSuite { // Each of these settings should be rejected for at least 1 reason forAll(bannedSettings) { setting => assert( - KafkaSettingsValidator(Map(setting)).validate(false).nonEmpty + KafkaSettingsValidator.validateInput(Map(setting)).nonEmpty ) + assert(KafkaSettingsValidator.validateOutput(Map(setting)).nonEmpty) } }