Skip to content

Commit

Permalink
Qu 1753 kafka producer config
Browse files Browse the repository at this point in the history
* initial commit

* savepoint

* savepoint

* savepoint

* kafka settings validate on route

* formatting

* make properties map optional

* added docstring

* simplify method call

* adddressed pr comments

* import

GitOrigin-RevId: 319fa861acd51fc00f8d35430fc3cc4823238de4
  • Loading branch information
stevenbenjamin authored and thatbot-copy[bot] committed Mar 4, 2024
1 parent 31beaa3 commit 401304f
Show file tree
Hide file tree
Showing 8 changed files with 240 additions and 159 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,11 @@ object StandingQueryResultOutputUserDef {
final case class WriteToKafka(
topic: String,
bootstrapServers: String,
format: OutputFormat = OutputFormat.JSON
format: OutputFormat = OutputFormat.JSON,
@docs(
"Map of Kafka producer properties. See <https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html>"
)
kafkaProperties: Map[String, String] = Map.empty[String, String]
) extends StandingQueryResultOutputUserDef

@unnamed
Expand Down
4 changes: 2 additions & 2 deletions quine/src/main/scala/com/thatdot/quine/app/Recipe.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ object Recipe {
(
url.subs
).map(PostToEndpoint(_, parallelism, onlyPositiveMatchData))
case WriteToKafka(topic, bootstrapServers, format) =>
case WriteToKafka(topic, bootstrapServers, format, properties) =>
(
topic.subs,
bootstrapServers.subs
).mapN(WriteToKafka(_, _, format))
).mapN(WriteToKafka(_, _, format, properties))
case WriteToSNS(credentialsOpt, regionOpt, topic) =>
(
credentialsOpt.traverse(_.subs),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,14 @@ object StandingQueryResultOutput extends LazyLogging {
}(system.dispatcher)
}

case WriteToKafka(topic, bootstrapServers, format) =>
case WriteToKafka(topic, bootstrapServers, format, properties) =>
val settings = ProducerSettings(
graph.system,
new ByteArraySerializer,
new ByteArraySerializer
).withBootstrapServers(bootstrapServers)

.withProperties(properties)
logger.info(s"Writing to kafka with properties $properties")
serialized(name, format, graph)
.map(bytes => ProducerMessage.single(new ProducerRecord[Array[Byte], Array[Byte]](topic, bytes)))
.via(KafkaProducer.flexiFlow(settings).named(s"sq-output-kafka-producer-for-$name"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ object KafkaSrcDef {
)

val complaintsFromValidator: ValidatedNel[String, Unit] =
KafkaSettingsValidator(consumerSettings.properties)
.validate(assumeConfigIsFinal = true)
KafkaSettingsValidator
.validateInput(consumerSettings.properties, assumeConfigIsFinal = true)
.toInvalid(())

complaintsFromValidator.map { _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,82 +9,59 @@ import cats.data.NonEmptyList
import com.typesafe.scalalogging.LazyLogging
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigValue}

import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.{ErrorString, underlyingValidator}
import com.thatdot.quine.app.ingest.util.KafkaSettingsValidator.ErrorString
import com.thatdot.quine.routes.KafkaIngest.KafkaProperties
import com.thatdot.quine.routes.KafkaOffsetCommitting

case class KafkaSettingsValidator(
properties: KafkaProperties,
explicitGroupId: Option[String] = None,
explicitOffsetCommitting: Option[KafkaOffsetCommitting] = None
) extends LazyLogging {
object KafkaSettingsValidator extends LazyLogging {
type ErrorString = String

/** Variables that have analogues in kafka properties. Settings in both properties
* and the direct setting via the api should generate errors. Use this when the
* setting must be provided via EITHER the API or the properties object, but not
* both
*/
private def findConflict(
keys: Set[String],
ingestField: Option[_]
): Option[ErrorString] = ingestField match {
case Some(_) =>
val usedKeys: Set[ErrorString] = properties.keySet.intersect(keys)
if (usedKeys.nonEmpty) Some(f"Property value conflicts with property ${usedKeys.mkString(",")}") else None
case _ => None
private def underlyingValidator[C <: AbstractConfig](c: Class[C]): ConfigDef = Try {
val config: Field = c.getDeclaredField("CONFIG")
config.setAccessible(true)
config.get(null).asInstanceOf[ConfigDef]
} match {
case Failure(e) =>
// Should be impossible.
logger.error(
s"Expected Kafka settings validator to be available at ${c.getName}.CONFIG -- " +
s"did you override your classpath with a custom kafka JAR? Kafka config validation will now fail."
)
throw e
case Success(validator) => validator
}

private def disallowSubstring(forbiddenValue: String)(key: String): Option[ErrorString] =
if (properties.get(key).exists((userSetValue: String) => userSetValue.contains(forbiddenValue)))
Some(s"$key may not be set to: ${properties(key)}, as it contains: $forbiddenValue")
else None

/** Field conflicts with an explicitly set property on the ingest. Use this when
* the setting MUST be provided via the API
*/
private def disallowField(key: String, errorString: String): Option[ErrorString] =
if (properties.keySet.contains(key)) Some(s"$key is not allowed in the kafkaProperties Map. $errorString") else None

/** Will return error strings or None.
* If [[assumeConfigIsFinal]] is true, the properties will also be checked against kafka's internal property
* validator (additional checks include things like verifying that values fall within enumerated options and that
* all required fields to construct a Kafka Consumer are present)
*/
def validate(assumeConfigIsFinal: Boolean = false): Option[NonEmptyList[ErrorString]] = {
val underlyingKnownKeys: Set[String] = underlyingValidator match {
case Failure(e) =>
// Should be impossible.
logger.error(
s"Expected Kafka settings validator to be available at ${classOf[ConsumerConfig].getName}.CONFIG -- " +
s"did you override your classpath with a custom kafka JAR? Kafka config validation will now fail."
)
throw e
case Success(validator) => validator.configKeys.values.asScala.map(_.name).toSet
}
val validator: ConfigDef = underlyingValidator.get

val unrecognizedPropertiesError: List[String] = properties.keySet.diff(underlyingKnownKeys) match {
case s if s.isEmpty => Nil
case s @ _ =>
List(s"Unrecognized properties: ${s.mkString(",")}")
}
def validateInput(
properties: KafkaProperties,
explicitGroupId: Option[String] = None,
explicitOffsetCommitting: Option[KafkaOffsetCommitting] = None,
assumeConfigIsFinal: Boolean = false
): Option[NonEmptyList[String]] = {
val v = new KafkaSettingsValidator(underlyingValidator(classOf[ConsumerConfig]), properties)

/*
these values have no direct analogues in Kafka settings:
these values have no direct analogues in Kafka settings:
- parallelism: Int
- ingest.topics
- ingest.format
- parallelism: Int
- ingest.topics
- ingest.format
*/

val errors: Seq[String] =
if (assumeConfigIsFinal) {
// config is already merged, so we can rely on the kafka-provided validator for any errors
for {
validatedConfigEntry <- validator.validate(properties.asJava).asScala.toVector
validatedConfigEntry <- v.underlyingValues
configName = validatedConfigEntry.name()
// TODO why does a finalized config not have key.deserializer set?
// Does pekko tack it on in settings.consumerFactory?
Expand All @@ -94,62 +71,124 @@ case class KafkaSettingsValidator(
} yield s"Error in Kafka setting $configName: $err"
} else {
// config is not yet merged (multiple sources of truth), so we can look for conflicts between the parts of config
(
List(
findConflict(Set(CommonClientConfigs.GROUP_ID_CONFIG), explicitGroupId),
findConflict(
Set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
Some(explicitOffsetCommitting)
),
//boostrap servers is mandatory on ingest. If it is set in properties that's a conflict
disallowField(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"Please use the Kafka ingest `bootstrapServers` field."
),
disallowField(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
disallowField(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
disallowField(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"Please use the Kafka ingest `securityProtocol` field."
),
disallowField(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"Please use the Kafka ingest `autoOffsetReset` field."
)
) ::: { // Conservative fix for CVE-2023-25194: disable keys including ${SaslConfigs.SASL_JAAS_CONFIG}
val forbiddenJaasModule = "com.sun.security.auth.module.JndiLoginModule"
(disallowSubstring(forbiddenJaasModule)(SASL_JAAS_CONFIG) :: List(
// these 3 config scopes may allow "overrides" -- the security advisory at https://archive.ph/P6q2A
// recommends blacklisting the `override` subkey for each scope. These are already considered
// invalid by `unrecognizedProperties`, but better safe than sorry.
"producer",
"consumer",
"admin"
)
.map(scope => s"$scope.override.$SASL_JAAS_CONFIG")
.map(disallowSubstring(forbiddenJaasModule)))
}
List(
v.findConflict(Set(CommonClientConfigs.GROUP_ID_CONFIG), explicitGroupId),
v.findConflict(
Set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
Some(explicitOffsetCommitting)
),
//boostrap servers is mandatory on ingest. If it is set in properties that's a conflict
v.disallowField(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"Please use the Kafka ingest `bootstrapServers` field."
),
v.disallowField(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
v.disallowField(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
v.disallowField(
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
"Please use the Kafka ingest `securityProtocol` field."
),
v.disallowField(
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"Please use the Kafka ingest `autoOffsetReset` field."
),
//
// --- if any of these keys points to something containing "com.sun.security.auth.module.JndiLoginModule"
//
// Conservative fix for CVE-2023-25194: disable keys including ${SaslConfigs.SASL_JAAS_CONFIG}
v.disallowJaasSubstring(SASL_JAAS_CONFIG),
// these 3 config scopes may allow "overrides" -- the security advisory at https://archive.ph/P6q2A
// recommends blacklisting the `override` subkey for each scope. These are already considered
// invalid by `unrecognizedProperties`, but better safe than sorry.
v.disallowJaasSubstring(s"producer.override.$SASL_JAAS_CONFIG"),
v.disallowJaasSubstring(s"consumer.override.$SASL_JAAS_CONFIG"),
v.disallowJaasSubstring(s"admin.override.$SASL_JAAS_CONFIG")
).flatten
}

NonEmptyList.fromList(unrecognizedPropertiesError ++ errors)
v.withUnrecognizedErrors(errors)
}

def validateOutput(properties: KafkaProperties): Option[NonEmptyList[String]] = {
val v = new KafkaSettingsValidator(underlyingValidator(classOf[ProducerConfig]), properties)

val errors: Seq[ErrorString] = List(
//boostrap servers is mandatory. If it is set in properties that's a conflict
v.disallowField(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"Please use the result output `bootstrapServers` field."
),
v.disallowField(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"Please use one of the `format` field cypher options, which rely on their hard-coded deserializers."
),
//
// --- if any of these keys points to something containing "com.sun.security.auth.module.JndiLoginModule"
//
// Conservative fix for CVE-2023-25194: disable keys including ${SaslConfigs.SASL_JAAS_CONFIG}
v.disallowJaasSubstring(SASL_JAAS_CONFIG),
// these 3 config scopes may allow "overrides" -- the security advisory at https://archive.ph/P6q2A
// recommends blacklisting the `override` subkey for each scope. These are already considered
// invalid by `unrecognizedProperties`, but better safe than sorry.
v.disallowJaasSubstring(s"producer.override.$SASL_JAAS_CONFIG"),
v.disallowJaasSubstring(s"consumer.override.$SASL_JAAS_CONFIG"),
v.disallowJaasSubstring(s"admin.override.$SASL_JAAS_CONFIG")
).flatten

v.withUnrecognizedErrors(errors)
}
}

object KafkaSettingsValidator {
type ErrorString = String
class KafkaSettingsValidator(
validator: ConfigDef,
properties: KafkaProperties
) extends LazyLogging {

private val underlyingKnownKeys: Set[String] = validator.configKeys.values.asScala.map(_.name).toSet
def underlyingValues: Seq[ConfigValue] = validator.validate(properties.asJava).asScala.toVector

// Reflectively load the ConfigDef at ConsumerConfig.CONFIG, which is what Kafka uses for validation
val underlyingValidator: Try[ConfigDef] = Try {
val consumerConfig: Field = classOf[ConsumerConfig].getDeclaredField("CONFIG")
consumerConfig.setAccessible(true)
consumerConfig.get(null).asInstanceOf[ConfigDef]
/** Variables that have analogues in kafka properties. Settings in both properties
* and the direct setting via the api should generate errors. Use this when the
* setting must be provided via EITHER the API or the properties object, but not
* both
*/
protected def findConflict(
keys: Set[String],
ingestField: Option[_]
): Option[ErrorString] = ingestField match {
case Some(_) =>
val usedKeys: Set[ErrorString] = properties.keySet.intersect(keys)
if (usedKeys.nonEmpty) Some(f"Property value conflicts with property ${usedKeys.mkString(",")}") else None
case _ => None
}

protected def disallowJaasSubstring(key: String): Option[ErrorString] = {
val forbiddenJaasModule = "com.sun.security.auth.module.JndiLoginModule"
if (properties.get(key).exists((userSetValue: String) => userSetValue.contains(forbiddenJaasModule)))
Some(s"$key may not be set to: ${properties(key)}, as it contains: $forbiddenJaasModule")
else None
}

/** Field conflicts with an explicitly set property on the ingest. Use this when
* the setting MUST be provided via the API
*/

protected def disallowField(key: String, errorString: String): Option[ErrorString] =
if (properties.keySet.contains(key)) Some(s"$key is not allowed in the kafkaProperties Map. $errorString") else None

val unrecognizedPropertiesError: List[String] = properties.keySet.diff(underlyingKnownKeys) match {
case s if s.isEmpty => Nil
case s @ _ =>
List(s"Unrecognized properties: ${s.mkString(",")}")
}

def withUnrecognizedErrors(errors: Seq[String]): Option[NonEmptyList[String]] =
NonEmptyList.fromList(unrecognizedPropertiesError ++ errors)

}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ final private[thatdot] case class IngestStreamWithControl[+Conf](
case Some(Success(Done)) => IngestStreamStatus.Completed
case Some(Failure(e)) =>
// If exception occurs, it means that the ingest stream has failed
logger.warn(s"Ingest stream: ${settings} failed.", e)
logger.warn(s"Ingest stream: $settings failed.", e)
IngestStreamStatus.Failed
case None => IngestStreamStatus.Running
}
Expand Down Expand Up @@ -231,7 +231,11 @@ trait IngestRoutesImpl
ingestStreamStart.implementedBy {
case (ingestName, namespaceParam, settings: KafkaIngest) =>
val namespace = namespaceFromParam(namespaceParam)
KafkaSettingsValidator(settings.kafkaProperties, settings.groupId, settings.offsetCommitting).validate() match {
KafkaSettingsValidator.validateInput(
settings.kafkaProperties,
settings.groupId,
settings.offsetCommitting
) match {
case Some(errors) =>
http400(
endpoints4s.Invalid(
Expand All @@ -250,7 +254,7 @@ trait IngestRoutesImpl
private val ingestStreamStopRoute = ingestStreamStop.implementedByAsync { case (ingestName, namespaceParam) =>
quineApp.removeIngestStream(ingestName, namespaceFromParam(namespaceParam)) match {
case None => Future.successful(None)
case Some(control @ IngestStreamWithControl(settings, metrics, valve @ _, terminated, close, _, _)) =>
case Some(control @ IngestStreamWithControl(settings, metrics, _, terminated, close, _, _)) =>
val finalStatus = control.status.map { previousStatus =>
import IngestStreamStatus._
previousStatus match {
Expand Down Expand Up @@ -343,9 +347,9 @@ trait IngestRoutesImpl
private def mkPauseOperationError(operation: String): PartialFunction[Throwable, Either[Invalid, Nothing]] = {
case _: StreamDetachedException =>
// A StreamDetachedException always occurs when the ingest has failed
Left(endpoints4s.Invalid(s"Cannot ${operation} a failed ingest."))
Left(endpoints4s.Invalid(s"Cannot $operation a failed ingest."))
case e: PauseOperationException =>
Left(endpoints4s.Invalid(s"Cannot ${operation} a ${e.statusMsg} ingest."))
Left(endpoints4s.Invalid(s"Cannot $operation a ${e.statusMsg} ingest."))
}

private val ingestStreamPauseRoute = ingestStreamPause.implementedByAsync { case (ingestName, namespaceParam) =>
Expand Down
Loading

0 comments on commit 401304f

Please sign in to comment.