Replies: 3 comments 1 reply
-
Good discussion, thanks. On balance my own disposition is that the "hybrid" approach as discussed above is the right path. One other wrench to throw in is that the discovery workflow should be able to introspect topics mapped through the schema registry to AVRO schemas, and from there produce equivalent JSON Schemas. This also is something that's not 100% coupled to Kafka, since there are other uses of AVRO (afaik) which might want this. Probably this is also an invocation subcommand of the Also, a pragmatic rationale for doing more in the parser is that working with JSON documents is IMO easier and more efficient as compared to Go.
My understanding is that records under Kafka+AVRO are exclusively single documents. Am I wrong here? If I'm not, a reasonable strategy is to track the number of input documents (as delimited AVRO) vs output documents (JSONL). FWIW I'm less concerned with the really, really weird edge cases of crazy things that people do with Kafka. There's always an "out" here of taking the Kafka connector and forking / salting it to taste. |
Beta Was this translation helpful? Give feedback.
-
Thanks for the great write up. My own disposition is that the hybrid approach seems best, but I'm a little hesitant about implementing it that way right off the bat. How easy would it be to support most of the common use cases using a kafka-specific library like schema_registry_converter? It kinda seems like it might be really easy to get off the ground with that, and then switch to a hybrid approach later if we still see a reason to. The benefit being that we could delay introducing a framed format for parser stdin/out. I don't really have an opinion on whether that would be the best approach right now, but I'd like to hear @saterus take on it, since you've been thinking about how all these pieces fit together. Are you leaning toward one direction or another? |
Beta Was this translation helpful? Give feedback.
-
That's a nice feature. That would be really slick if we can do it. I'll toss that wrench back to you: Kafka allows you to configure the schema rules on a per topic, per record type, or per topic-record type. Using many schemas per topic is fully documented and encouraged for certain use cases. Using the Confluent serializers, Messages embed the schema id at the beginning of the payload. Now, that said, the default behavior is to have a single schema per topic, nudging users in the direction of treating a topic like a table with a single schema. The default subject-naming strategy uses This is probably going to be a common theme for this stuff. There's a completely reasonable 80% path, and the other 20% is going to be a long road. There are quite a lot of schema/dataformat/versioning features that all interact here.
That makes sense to me. Avro schemas are defined in JSON, but they themselves are not a JSON schema definition. I could also see wanting to take a Protobuf definition (also supported by Schema Registry) and produce a JSON schema definition from that.
With some further research, I think you're correct. At least when considering the Confluent Avro serializer. I'm less sure about what people may do with Protobuf or JSON encoded messages.
Yeah, I hear that. I think "weird edge cases" were what we had in mind when thinking about multiple documents per Message, so I think we'll shelve this concern.
Yeah, it looks like we could use
Yeah, I think it probably mostly depends on whether we think the
Maybe? I was thinking more about the "schema-per-message" problem I mentioned above. Framing might give us a place to include a schema definition. Might be simpler than trying to keep a |
Beta Was this translation helpful? Give feedback.
-
This is a bit of a braindump from a few conversations I had with Phil on Friday. I don't think any of these things are particularly time sensitive. I'm mostly just trying to give context to everyone else about a few things on our mind.
Background
Kafka Topics are streams of individual Messages published by a Producer and read by many Consumers. To Kafka, each Message payload is an opaque blob of bytes. Kafka attaches its own metadata so as to ensure reliable delivery, but it is not opinionated about the payload content. Producers are free to serialize this data however they wish, but Consumers must be able to deserialize them.
Given this freedom, Messages do not need to represent a single record. A single Message payload could include a tabular or compound document. Connectors must output valid json documents to adhere to the Airbyte Spec. This puts the Connector in the position of needing to deserialize the Message payload and chunk it into one or more output documents.
Every organization using Kafka may have its own unique way of serializing Messages, the
source-kafka
connector will need to be flexible in how it deserialize the Message payload. Avro appears to be the most popular data format within the Kafka user community, but not exclusively. Fivetran supports Avro, Protobuf, and JSON.Confluent also offers their Schema Registry product as a way to help manage Message schemas. This is a REST API that can store/lookup schemas and has strong support within the Kafka ecosystem. This can be accessed from Confluent's cloud platform or self-hosted, so we're likely to encounter it sooner or later.
That said, theoretically any format could be used. We will inevitably find a user who is sticking gzipped CSV data into a Kafka Message. We'll want to come up with a solution that allows for this flexibility, even if we're not optimizing for it or building support for it now.
Schema Registry
Kafka Messages serialized using the Schema Registry begin with a header byte sequence that includes the Schema ID. This is used to lookup the schema in Schema Registry, which is then used to deserialize the Avro document.
The takeaway here is that we'll need to inspect the payload to even use the Schema Registry. Luckily, we'll have to be given connection info for the Schema Registry beforehand, so we won't need to guess at whether to use it.
Parser
The
estuary/connectors/parser
project was designed to help manage a lot of these same concerns. S3 files are a wide variety of data formats which we need to parse into a meaningful json document stream. Ideally, we'd be able to use the Parser with Kafka, but there are currently a few complications.Avro Support
We'll need a way to parse Avro documents. We know we'll want support for this, it's just a matter of prioritization and details about the behavior. Avro documents are not self-describing, so parsing requires a schema. We'll need a way to configure the document schema the Parser is using.
Parsing Options
This gives us a few options for how adding Avro/SchemaRegistry support to the Parser and/or
source-kafka
Connector might work:Entirely within the Connector
The Connector will fetch/cache schemas using the Schema Registry, and deserialize Message payloads without invoking the Parser.
Pros:
Cons:
Entirely within the Parser
The Connector does not attempt to inspect any payloads at all. The Parser will fetch/cache schemas using the Schema Registry and deserialize the payload.
Pros:
source-kafka
Connector.Cons:
Hybrid: Pass the Schema to the Parser
The Connector will fetch/cache the schemas using the Schema Registry. It then passes the schema to the Parser and the Parser is responsible for deserializing the payload.
Pros:
Cons:
Many Parser Processes
We currently feed the Parser individual files from S3. We spawn a separate Parser process for each file and we know we're finished parsing that file when the Parser process exits. This works well as these files tend to be large(ish) tabular data files and we're generally batch processing them.
For Kafka's neverending stream of relatively small Messages, we may not want to launch a new Parser process for each Message. It would be more efficient to spawn the Parser process once, feed Message payloads into its stdin and read json documents from stdout. This eliminates the need to create a new Parser process each time we receive a Message.
The complication this adds is knowing when a single input Message has finished being parsed and exactly how many output documents it produced.
A simple addition to the parser would be to add framing headers to the input/output stream. This could be used by the Parser to communicate when a single input Message/frame has finished emitting documents. It's probably easiest to make the framing protocol a cli flag so the parser can still be used without it in simple cases.
Notes:
Header Byte Sequence
I couldn't find this information on the Schema Registry documentation, but it was quick to find within the
schema_registry_converter
source code.https://github.com/gklijs/schema_registry_converter/blob/95f48fbea5f3c6ed5fd3c937689517313eeb1702/src/schema_registry_common.rs#L191-L205
Beta Was this translation helpful? Give feedback.
All reactions