-
Notifications
You must be signed in to change notification settings - Fork 19
Description
Hi there!
We are currently working on migration several of our applications from kafkajs to @platformatic/kafka.
While it's going great for the most part, we're struggling with how to handle a specific behaviour related to AVRO deserialization and fallbackMode.
Take the following example, where we have a consumer consuming from a newly created topic (implementation taken from AVRO example, adapted to use confluent-schema-registry):
import { SchemaRegistry } from "@kafkajs/confluent-schema-registry";
import { Type, type Schema } from "avsc";
import {
Consumer,
MessagesStreamFallbackModes,
MessagesStreamModes,
UserError,
} from "@platformatic/kafka";
const localSchemas: Record<string, Type> = {};
let srInstance: SchemaRegistry;
const getSchemaRegistry = () => {
if (!srInstance) {
srInstance = new SchemaRegistry({
host: "http://localhost:8081",
});
}
return srInstance;
};
const fetchSchema = async (schemaId: number): Promise<void> => {
console.log(`Fetching schema with ID ${schemaId} from schema registry`);
const schemaRegistry = getSchemaRegistry();
const schema = (await schemaRegistry.getSchema(schemaId)) as Schema;
const type = Type.forSchema(schema);
localSchemas[schemaId] = type;
};
function registryDeserializer(buffer?: Buffer): object | null {
if (!buffer) {
return null;
}
const schemaId = buffer.readInt32BE(1);
if (!localSchemas[schemaId]) {
throw new UserError(`Schema with ID ${schemaId} not found.`, {
missingSchema: schemaId,
});
}
return localSchemas[schemaId].fromBuffer(buffer.subarray(5));
}
const consumeMessages = async (abortSignal: AbortSignal) => {
abortSignal.onabort = function () {
consumer.close(true);
};
const consumer = new Consumer({
groupId: "group-id",
clientId: "client-id",
bootstrapBrokers: ["localhost:9092"],
deserializers: {
value: registryDeserializer,
},
});
while (true) {
if (abortSignal.aborted) {
break;
}
const stream = await consumer.consume({
topics: ["topic-name"],
maxWaitTime: 500,
mode: MessagesStreamModes.COMMITTED,
fallbackMode: MessagesStreamFallbackModes.LATEST,
});
try {
for await (const message of stream) {
console.log(message.value);
}
} catch (error) {
if (typeof error.cause.missingSchema !== "number") {
throw error;
}
await fetchSchema(error.cause.missingSchema);
}
}
};
async function main() {
const abortController = new AbortController();
consumeMessages(abortController.signal).catch((error) => {
console.error("Error in consumeMessages:", error);
process.exit(1);
});
}
await main();The issue is as follows:
- The consumer connects and starts consuming messages.
- As the first message is produced to the topic, the deserializer will throw an error and the the schema associated with the message will be fetched from the schema registry.
- The stream will resume, now in fallback mode (as far as I am able to tell). The previously produced message will never be retried unless
fallbackModeis set toEARLIEST.
This issue would become especially apparent to us in a distributed system with dynamic/random consumer group IDs: since a new consumer group is created every time an application/pod is restarted, all applications would effectively run in fallback mode as the first consumed message will always result in a schema fetch / deserialization failure.
(we often use randomly assigned group IDs to ensure all instances of our application receives a message)
Is there a built in way to handle this?
Using onCorruptedMessage and manually committing seems like something that could work, but I would like to retain the autocommit functionality if possible.