-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Schema types with references are hard to work with. #203
Comments
Haven't tested it, but a rough example that should describe how to work with referenced types today: // example.ts
import { Kafka } from 'kafkajs';
import { initializeSend, EventType, send } from './send';
const EventTypes: { [key: string]: EventType } = {
Example: {
refs: ['com.company.rick@1', 'com.company.morty@3'],
subject: 'com.company.tv_show_references@4',
topic: 'com.company.random',
}
};
async function main() {
const kafka = new Kafka({ /* ... */ });
const producer = kafka.producer();
await producer.connect();
await initializeSend(producer, { host: 'confluent host', /* ... */ });
const meta = await send(EventTypes.Example, { "payload": "here" });
console.log(meta);
}
main(); // send.ts
import { SchemaRegistry, SchemaType } from '@kafkajs/confluent-schema-registry';
import { SchemaRegistryAPIClientArgs } from "@kafkajs/confluent-schema-registry/dist/api";
import { Schema, Type } from 'avsc';
import { randomUUID } from 'crypto';
import type { Producer, RecordMetadata } from 'kafkajs';
let producer: Producer;
let registry: SchemaRegistry;
const subjectMap: {[key: string]: number} = {};
const typeMap: {[key: string]: Type} = {};
export type EventType = { topic: string, subject: string, refs: string[] };
export function initializeSend(clientProducer: Producer, registryOpts: SchemaRegistryAPIClientArgs) {
producer = clientProducer;
registry = new SchemaRegistry(registryOpts, {
[SchemaType.AVRO]: { registry: typeMap }
});
}
export async function send(event: EventType, ...payloads: object[]): Promise<RecordMetadata[]> {
// Load referenced types if we haven't already
for (const subject of event.refs) {
if (!typeMap[subject]) {
const id = await getRegistryId(subject);
const schema = await registry.getSchema(id) as Schema;
typeMap[subject] = Type.forSchema(schema, { registry: typeMap });
}
}
const [name] = event.subject.split('@');
const id = await getRegistryId(event.subject);
return producer.send({
topic: event.topic,
messages: await Promise.all(payloads.map(async (payload) => ({
key: randomUUID(),
headers: { subject: name },
value: await registry.encode(id, payload),
}))),
});
}
async function getRegistryId(subject: string): Promise<number> {
if (!subjectMap[subject]) {
const [name, version = 'latest'] = subject.split('@');
subjectMap[subject] = version === 'latest'
? await registry.getLatestSchemaId(name)
: await registry.getRegistryId(name, +version);
}
return subjectMap[subject];
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a very useful library, except there are strong usability concerns with subjects that have referenced types.
What I want to do
com.company.tv_show_references (version 4, now at 10) references:
com.company.rick
@ version 1com.company.morty
@ version 3 (now at version 6)This will not work of course
Only
encode(registryId, payload)
exists which needs a bunch of configuration to work with referenced subjects, which sucks, because I don't really care about registry ids, only the subject and version I configured in confluent cloud. The website has literally no examples on how to do this, leading to a lot of research into learning the underlyingavsc
library.I first have to lookup the registry id for the subject I mentioned (getRegistryId). The registry would be the perfect place to keep a subject@version -> registryId mapping internally. As a developer, I just want to send the message.
Registry exposes no means of getting the registry ids for the subject's referenced types. This is problematic because in the web interface, the subject references are tied to explicit versions. Not that I necessarily want to deal with the registry ids for referenced types, I'd rather do the example above and have it do all the work for me. Again, the registry would be a great place for it to store subject@version -> registry id (and vice-versa) mapping.
I should not need to touch the
avsc
package directly to deal with subjects with references. Right now I have to have a hard coded list of referenced types and version for each subject that I have to pray doesn't diverge from what is deployed to confluent cloud, get the schema id for the type (kept in a subject -> registryId map), fetch the schema, then add it to the registry mapping passed to the registry during construction, after usingavsc
sType.forSchema
so that registry.encode could work for me.Also annoyingly, it's hard to know what subject was used when consuming messages decoded from confluent cloud unless I explicitly add headers for subject, which is annoying because part of decode has the registry id (which can be mapped back to the subject name), but doesn't pass it along to eachMessage as part of the payload.
The text was updated successfully, but these errors were encountered: