Skip to content

Commit

Permalink
Qu 1512 avro decoding for v2 ingest
Browse files Browse the repository at this point in the history
# Description

GitOrigin-RevId: ad6aefac0505dfd2090ca1773ccd8ea2df6fb8d1
  • Loading branch information
david-masters authored and thatbot-copy[bot] committed Sep 23, 2024
1 parent b8b88f6 commit 8fdf09c
Show file tree
Hide file tree
Showing 14 changed files with 314 additions and 13 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ lazy val `quine`: Project = project
"org.webjars.npm" % "sugar-date" % sugarV,
"org.webjars.npm" % "vis-network" % visNetworkV,
"org.xerial.snappy" % "snappy-java" % snappyV,
"org.apache.avro" % "avro" % avroV,
),
)
.enablePlugins(WebScalaJSBundlerPlugin)
Expand Down
1 change: 1 addition & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,5 @@ object Dependencies {
val circeOpticsV = "0.15.0"
val visNetworkV = "8.2.0"
val webjarsLocatorV = "0.52"
val avroV = "1.12.0"
}
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ object FileIngestFormat {
require(delimiter != escapeChar, "Different characters must be used for `delimiter` and `escapeChar`.")
require(quoteChar != escapeChar, "Different characters must be used for `quoteChar` and `escapeChar`.")
}

}

@title("File Ingest Mode")
Expand Down
5 changes: 3 additions & 2 deletions quine/src/main/scala/com/thatdot/quine/app/QuineApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import com.thatdot.quine.app.ingest.serialization.{CypherParseProtobuf, CypherTo
import com.thatdot.quine.app.ingest.{IngestSrcDef, QuineIngestSource}
import com.thatdot.quine.app.ingest2.source.DecodedSource
import com.thatdot.quine.app.routes._
import com.thatdot.quine.app.serialization.ProtobufSchemaCache
import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache}
import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestConfiguration => V2IngestConfiguration}
import com.thatdot.quine.compiler.cypher
import com.thatdot.quine.compiler.cypher.{CypherStandingWiretap, registerUserDefinedProcedure}
Expand Down Expand Up @@ -406,6 +406,7 @@ final class QuineApp(graph: GraphService)(implicit val logConfig: LogConfig)
}

private[this] val protobufSchemaCache: ProtobufSchemaCache = new ProtobufSchemaCache.AsyncLoading(graph.dispatchers)
private[this] val avroSchemaCache: AvroSchemaCache = new AvroSchemaCache.AsyncLoading(graph.dispatchers)

def addIngestStream(
name: String,
Expand Down Expand Up @@ -521,7 +522,7 @@ final class QuineApp(graph: GraphService)(implicit val logConfig: LogConfig)
metrics,
meter,
graph,
)(protobufSchemaCache, logConfig)
)(protobufSchemaCache, avroSchemaCache, logConfig)

trySource.map { quineIngestSrc =>
val streamSource = quineIngestSrc.stream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,15 @@ import scala.util.{Success, Try}

import com.google.protobuf.{Descriptors, DynamicMessage}
import io.circe.{Json, parser}
import org.apache.avro.Schema
import org.apache.avro.file.SeekableByteArrayInput
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
import org.apache.avro.io.DecoderFactory
import org.apache.commons.csv.CSVFormat

import com.thatdot.quine.app.ingest2.core.{DataFoldableFrom, DataFolderTo}
import com.thatdot.quine.app.ingest2.sources.DEFAULT_CHARSET
import com.thatdot.quine.app.serialization.ProtobufSchemaCache
import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache}
import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities
import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestFormat => V2IngestFormat}
import com.thatdot.quine.graph.cypher
Expand Down Expand Up @@ -88,6 +92,32 @@ case class ProtobufDecoder(query: String, parameter: String = "that", schemaUrl:

}

case class AvroDecoder(schemaUrl: String)(implicit schemaCache: AvroSchemaCache) extends FrameDecoder[GenericRecord] {

// this is a blocking call, but it should only actually block until the first time a type is successfully
// loaded.
//
// This was left as blocking because lifting the effect to a broader context would mean either:
// - making ingest startup async, which would require extensive changes to QuineApp, startup, and potentially
// clustering protocols, OR
// - making the decode bytes step of ingest async, which violates the Kafka APIs expectation that a
// `org.apache.kafka.common.serialization.Deserializer` is synchronous.
val schema: Schema = Await.result(
schemaCache.getSchema(filenameOrUrl(schemaUrl)),
Duration.Inf,
)

val foldable: DataFoldableFrom[GenericRecord] = DataFoldableFrom.avroDataFoldable

def decode(bytes: Array[Byte]): Try[GenericRecord] = Try {
val datumReader = new GenericDatumReader[GenericRecord](schema)
val inputStream = new SeekableByteArrayInput(bytes)
val decoder = DecoderFactory.get.binaryDecoder(inputStream, null)
datumReader.read(null, decoder)
}

}

case class CsvVecDecoder(delimiterChar: Char, quoteChar: Char, escapeChar: Char, charset: Charset = DEFAULT_CHARSET)
extends FrameDecoder[Iterable[String]] {

Expand Down Expand Up @@ -132,7 +162,9 @@ case class CsvMapDecoder(
}
object FrameDecoder {

def apply(format: V2IngestFormat)(implicit protobufCache: ProtobufSchemaCache): FrameDecoder[_] = format match {
def apply(
format: V2IngestFormat,
)(implicit protobufCache: ProtobufSchemaCache, avroCache: AvroSchemaCache): FrameDecoder[_] = format match {
case V2IngestEntities.JsonIngestFormat => JsonDecoder
case V2IngestEntities.CsvIngestFormat(headers, delimiter, quote, escape) =>
headers match {
Expand All @@ -152,6 +184,7 @@ object FrameDecoder {
ProtobufDecoder("query TBD", "paramter TBD", schemaUrl, typeName) //Query,Parameter tbd
case V2IngestEntities.RawIngestFormat => CypherRawDecoder
case V2IngestEntities.DropFormat => DropDecoder
case V2IngestEntities.AvroIngestFormat(schemaUrl) => AvroDecoder(schemaUrl = schemaUrl)
}

def apply(format: StreamedRecordFormat)(implicit protobufCache: ProtobufSchemaCache): FrameDecoder[_] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.thatdot.quine.app.ingest2.core
import scala.collection.{SeqView, View, mutable}

import io.circe.{Json, JsonNumber, JsonObject}
import org.apache.avro.generic.{GenericArray, GenericEnumSymbol, GenericFixed, GenericRecord}

import com.thatdot.quine.graph.cypher.Expr
import com.thatdot.quine.util.Log.{LazySafeLogging, Safe, SafeLoggableInterpolator}
Expand Down Expand Up @@ -213,4 +214,46 @@ object DataFoldableFrom {
}
}

implicit val avroDataFoldable: DataFoldableFrom[GenericRecord] = new DataFoldableFrom[GenericRecord] {

private def foldMapLike[B](kv: Iterable[(String, Any)], folder: DataFolderTo[B]): B = {
val mapBuilder = folder.mapBuilder()
kv.foreach { case (k, v) => mapBuilder.add(k, foldField(v, folder)) }
mapBuilder.finish()
}

//All of the underlying types for avro were taken from here: https://stackoverflow.com/questions/34070028/get-a-typed-value-from-an-avro-genericrecord/34234039#34234039
private def foldField[B](field: Any, folder: DataFolderTo[B]): B = field match {
case b: java.lang.Boolean if b => folder.trueValue
case b: java.lang.Boolean if !b => folder.falseValue
case i: java.lang.Integer => folder.integer(i.longValue)
case i: java.lang.Long => folder.integer(i)
case f: java.lang.Float => folder.floating(f.doubleValue)
case d: java.lang.Double => folder.floating(d)
case bytes: java.nio.ByteBuffer => folder.bytes(bytes.array)
case str: CharSequence => folder.string(str.toString)
case record: GenericRecord =>
foldMapLike(
record.getSchema.getFields.asScala.collect {
case k if record.hasField(k.name) => (k.name, record.get(k.name))
},
folder,
)
case map: java.util.Map[_, _] => foldMapLike(map.asScala.map { case (k, v) => (k.toString, v) }, folder)
case symbol: GenericEnumSymbol[_] => folder.string(symbol.toString)
case array: GenericArray[_] =>
val vector = folder.vectorBuilder()
array.forEach(elem => vector.add(foldField(elem, folder)))
vector.finish()
case fixed: GenericFixed => folder.bytes(fixed.bytes)
case n if n == null => folder.nullValue
case other =>
throw new IllegalArgumentException(
s"Got an unexpected value: ${other} of type: ${other.getClass.getName} from avro. This shouldn't happen...",
)
}

override def fold[B](record: GenericRecord, folder: DataFolderTo[B]): B = foldField(record, folder)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.thatdot.quine.app.ingest2.sources.S3Source.s3Source
import com.thatdot.quine.app.ingest2.sources.StandardInputSource.stdInSource
import com.thatdot.quine.app.ingest2.sources._
import com.thatdot.quine.app.routes.{IngestMeter, IngestMetered}
import com.thatdot.quine.app.serialization.ProtobufSchemaCache
import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache}
import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{
CsvIngestFormat,
IngestFormat,
Expand Down Expand Up @@ -353,15 +353,23 @@ object DecodedSource extends LazySafeLogging {
import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities._

//V2 configuration
def apply(src: FramedSource, format: IngestFormat)(implicit protobufCache: ProtobufSchemaCache): DecodedSource =
def apply(src: FramedSource, format: IngestFormat)(implicit
protobufCache: ProtobufSchemaCache,
avroCache: AvroSchemaCache,
): DecodedSource =
src.toDecoded(FrameDecoder(format))

def apply(
name: String,
config: IngestConfiguration,
meter: IngestMeter,
system: ActorSystem,
)(implicit protobufCache: ProtobufSchemaCache, ec: ExecutionContext, logConfig: LogConfig): DecodedSource =
)(implicit
protobufCache: ProtobufSchemaCache,
avroCache: AvroSchemaCache,
ec: ExecutionContext,
logConfig: LogConfig,
): DecodedSource =
config.source match {
case FileIngest(path, mode, maximumLineSize, startOffset, limit, charset, recordDecoders) =>
FileSource.decodedSourceFromFileStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ object FileSource extends LazyLogging {
).decodedSource

// TODO Protobuf, Raw, Drop not supported on file types since there is no way to delimit them:
case V2IngestEntities.ProtobufIngestFormat(_, _) | V2IngestEntities.RawIngestFormat |
V2IngestEntities.DropFormat =>
case V2IngestEntities.AvroIngestFormat(_) | V2IngestEntities.ProtobufIngestFormat(_, _) |
V2IngestEntities.RawIngestFormat | V2IngestEntities.DropFormat =>
throw new UnsupportedOperationException(s"Ingest format $format not supported on file-like sources")

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import org.apache.pekko.util.Timeout
import com.thatdot.quine.app.NamespaceNotFoundException
import com.thatdot.quine.app.ingest.QuineIngestSource
import com.thatdot.quine.app.ingest2.source.{DecodedSource, QuineValueIngestQuery}
import com.thatdot.quine.app.serialization.ProtobufSchemaCache
import com.thatdot.quine.app.serialization.{AvroSchemaCache, ProtobufSchemaCache}
import com.thatdot.quine.app.v2api.endpoints.V2IngestEntities.{IngestConfiguration => V2IngestConfiguration}
import com.thatdot.quine.graph.{CypherOpsGraph, MemberIdx, NamespaceId, defaultNamespaceId, namespaceToString}
import com.thatdot.quine.routes._
Expand Down Expand Up @@ -121,7 +121,11 @@ trait IngestStreamState {
metrics: IngestMetrics,
meter: IngestMeter,
graph: CypherOpsGraph,
)(implicit protobufCache: ProtobufSchemaCache, logConfig: LogConfig): Try[QuineIngestSource] =
)(implicit
protobufCache: ProtobufSchemaCache,
avroCache: AvroSchemaCache,
logConfig: LogConfig,
): Try[QuineIngestSource] =
ingestStreams.get(intoNamespace) match {
// TODO Note for review comparison: v1 version fails silently here.
// TODO Also, shouldn't this just add the namespace if it's not found?
Expand All @@ -138,6 +142,7 @@ trait IngestStreamState {
//TODO should return ValidatedNel[IngestName, DecodedSource]
val decodedSource: DecodedSource = DecodedSource.apply(name, settings, meter, graph.system)(
protobufCache,
avroCache,
graph.materializer.executionContext,
logConfig,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.thatdot.quine.app.serialization

import java.net.URL

import scala.concurrent.{ExecutionContext, Future, blocking}
import scala.util.Using

import com.github.blemale.scaffeine.{AsyncLoadingCache, Scaffeine}
import org.apache.avro.Schema

import com.thatdot.quine.app.serialization.AvroSchemaError.{InvalidAvroSchema, UnreachableAvroSchema}
import com.thatdot.quine.util.ComputeAndBlockingExecutionContext

/** Provides common utilities for its inheritors to parse avro objects.
*/
trait AvroSchemaCache {
def getSchema(schemaUrl: URL): Future[Schema]

}
object AvroSchemaCache {
class AsyncLoading(val ecs: ComputeAndBlockingExecutionContext) extends AvroSchemaCache {
private val avroSchemaCache: AsyncLoadingCache[URL, Schema] =
Scaffeine()
.maximumSize(5)
.buildAsyncFuture { schemaUrl =>
// NB if this Future fails (with an error), the cache will not store the schema.
// This allows the user to retry the schema resolution after updating their environment
resolveSchema(schemaUrl)(ecs.blockingDispatcherEC)
}

/** Invalidate the schema for the given URI. This will cause the next call to [[avroSchemaCache.get]]
* to re-parse the schema. This may be desirable when, for example, a message type lookup fails, even if the
* schema lookup succeeds (so that the user can update their schema file to include the missing type).
*/
def flush(uri: URL): Unit =
avroSchemaCache.put(uri, Future.successful(null))

def getSchema(schemaUrl: URL): Future[Schema] =
avroSchemaCache.get(schemaUrl)

val parser = new org.apache.avro.Schema.Parser()

private[this] def resolveSchema(uri: URL)(blockingEc: ExecutionContext): Future[Schema] =
Future(blocking {
Using.resource(uri.openStream())(parser.parse)
})(blockingEc).recoverWith {
case e: org.apache.avro.SchemaParseException => Future.failed(new InvalidAvroSchema(uri, e))
case e: java.io.IOException => Future.failed(new UnreachableAvroSchema(uri, e))
}(blockingEc)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object ProtobufSchemaCache {
* to re-parse the schema. This may be desirable when, for example, a message type lookup fails, even if the
* schema lookup succeeds (so that the user can update their schema file to include the missing type).
*/
def flush(uri: URL): Unit = parsedDescriptorCache.put(uri, Future.successful(null))
def flush(uri: URL): Unit =
parsedDescriptorCache.put(uri, Future.successful(null))

def getSchema(schemaUrl: URL): Future[DynamicSchema] =
parsedDescriptorCache.get(schemaUrl)
Expand Down Expand Up @@ -108,5 +109,4 @@ object ProtobufSchemaCache {
new AmbiguousMessageType(messageType, messagesFoundByShortName)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,21 @@ package com.thatdot.quine.app.serialization
import java.net.URL

sealed trait ProtobufSchemaError extends IllegalArgumentException
sealed trait AvroSchemaError extends IllegalArgumentException
sealed trait ProtobufSchemaMessageTypeException extends ProtobufSchemaError {
def typeName: String
}

object AvroSchemaError {
class UnreachableAvroSchema(val fileUri: URL, cause: java.io.IOException)
extends IllegalArgumentException(s"Unreachable avro schema file: $fileUri", cause)
with AvroSchemaError
class InvalidAvroSchema(val fileUri: URL, cause: Throwable)
extends IllegalArgumentException(s"Invalid avro schema file: $fileUri", cause)
with AvroSchemaError

}

object ProtobufSchemaError {
class UnreachableProtobufSchema(val fileUri: URL, cause: java.io.IOException)
extends IllegalArgumentException(s"Unreachable protobuf schema file: $fileUri", cause)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ object V2IngestEntities {
typeName: String,
) extends IngestFormat

@title("Avro format")
case class AvroIngestFormat(
@description(
"URL (or local filename) of the file to load to parse the avro schema.",
)
schemaUrl: String,
) extends IngestFormat

case object RawIngestFormat extends IngestFormat

case object DropFormat extends IngestFormat
Expand Down
Loading

0 comments on commit 8fdf09c

Please sign in to comment.