From 6f8701bf165dd52081a5d9cf1bc6eb25527887ce Mon Sep 17 00:00:00 2001 From: ivan-klass Date: Fri, 23 Jul 2021 13:05:39 +0700 Subject: [PATCH 1/6] Support reader schema when decoding avro --- docs/introduction.md | 6 ++++++ src/@types.ts | 5 ++++- src/SchemaRegistry.newApi.spec.ts | 27 ++++++++++++++++----------- src/SchemaRegistry.ts | 19 ++++++++++++++++--- 4 files changed, 42 insertions(+), 15 deletions(-) diff --git a/docs/introduction.md b/docs/introduction.md index 1d95909..ee2f567 100755 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -43,4 +43,10 @@ const encodedPayload = await registry.encode(id, payload) // Decode the payload const decodedPayload = await registry.decode(encodedPayload) + +// Decode with resolving to a reader schema (avro-only) +// Note: avsc opts, if needed, should be set to readerSchema +const avsc = require('avsc') +const readerSchema = avsc.Type.forSchema(/* schema, opts */) +const resolvedPayload = await registry.decode(encodedPayload, readerSchema) ``` \ No newline at end of file diff --git a/src/@types.ts b/src/@types.ts index 944418f..b5eaab1 100644 --- a/src/@types.ts +++ b/src/@types.ts @@ -48,7 +48,10 @@ export interface RawAvroSchema { fields: any[] } -export interface AvroSchema extends Schema, RawAvroSchema {} +export interface AvroSchema extends Schema, RawAvroSchema { + createResolver(writerSchema: AvroSchema): Resolver + equals(other: Schema): Boolean +} export interface ConfluentSubject { name: string diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index 75f1d9a..965b783 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -1,15 +1,9 @@ -import { v4 as uuid } from 'uuid' +import {v4 as uuid} from 'uuid' import SchemaRegistry from './SchemaRegistry' -import { - ConfluentSubject, - ConfluentSchema, - SchemaType, - AvroConfluentSchema, - JsonConfluentSchema, -} from './@types' -import API, { SchemaRegistryAPIClient } from './api' -import { COMPATIBILITY, DEFAULT_API_CLIENT_ID } from './constants' +import {AvroSchema, ConfluentSchema, ConfluentSubject, SchemaType,} from './@types' +import API, {SchemaRegistryAPIClient} from './api' +import {COMPATIBILITY, DEFAULT_API_CLIENT_ID} from './constants' import encodedAnotherPersonV2Avro from '../fixtures/avro/encodedAnotherPersonV2' import encodedAnotherPersonV2Json from '../fixtures/json/encodedAnotherPersonV2' import encodedAnotherPersonV2Proto from '../fixtures/proto/encodedAnotherPersonV2' @@ -18,6 +12,7 @@ import wrongMagicByte from '../fixtures/wrongMagicByte' import Ajv2020 from 'ajv8/dist/2020' import Ajv from 'ajv' import { ConfluentSchemaRegistryValidationError } from './errors' +import {Type} from "avsc"; const REGISTRY_HOST = 'http://localhost:8982' const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST } @@ -365,7 +360,17 @@ describe('SchemaRegistry - new Api', () => { await schemaRegistry.decode(buffer) expect(schemaRegistry.cache.getSchema(registryId)).toBeTruthy() - }) + }); + + (type == SchemaType.AVRO ? it : it.skip)( + 'uses reader schema if specified (avro-only)', async () => { + const writerBuffer = Buffer.from(await schemaRegistry.encode(registryId, payload)) + const readerSchema = Type.forSchema(JSON.parse(schemaStringsByType[type].v2)) as any as AvroSchema + await expect(schemaRegistry.decode(writerBuffer, readerSchema)).resolves.toHaveProperty( + 'city', + 'Stockholm' + ) + }) it('creates a single origin request for a schema cache-miss', async () => { const buffer = Buffer.from(await schemaRegistry.encode(registryId, payload)) diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 7a93437..7c76bc0 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -200,7 +200,7 @@ export default class SchemaRegistry { return paths } - public async decode(buffer: Buffer): Promise { + public async decode(buffer: Buffer, readerSchema?: AvroSchema): Promise { if (!Buffer.isBuffer(buffer)) { throw new ConfluentSchemaRegistryArgumentError('Invalid buffer') } @@ -214,8 +214,21 @@ export default class SchemaRegistry { ) } - const schema = await this.getSchema(registryId) - return schema.fromBuffer(payload) + const writerSchema = await this.getSchema(registryId) + if (readerSchema) { + if (readerSchema.equals(writerSchema)){ + /* Even when schemas are considered equal by `avsc`, + * they still aren't interchangeable: + * provided `readerSchema` may have different `opts` (e.g. logicalTypes / unionWrap flags) + * see https://github.com/mtth/avsc/issues/362 */ + return readerSchema.fromBuffer(payload) + } else { + // decode using a resolver from writer type into reader type + return readerSchema.fromBuffer(payload, readerSchema.createResolver(writerSchema as AvroSchema)) + } + } else { + return writerSchema.fromBuffer(payload) + } } public async getRegistryId(subject: string, version: number | string): Promise { From 718a6dde3018fde5a40311c22442bba896901f11 Mon Sep 17 00:00:00 2001 From: ivan-klass Date: Fri, 23 Jul 2021 14:09:36 +0700 Subject: [PATCH 2/6] Adjust formatting, better type annotations --- src/@types.ts | 5 +---- src/SchemaRegistry.newApi.spec.ts | 22 ++++++++++++++-------- src/SchemaRegistry.ts | 7 ++++--- 3 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/@types.ts b/src/@types.ts index b5eaab1..944418f 100644 --- a/src/@types.ts +++ b/src/@types.ts @@ -48,10 +48,7 @@ export interface RawAvroSchema { fields: any[] } -export interface AvroSchema extends Schema, RawAvroSchema { - createResolver(writerSchema: AvroSchema): Resolver - equals(other: Schema): Boolean -} +export interface AvroSchema extends Schema, RawAvroSchema {} export interface ConfluentSubject { name: string diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index 965b783..e088167 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -1,9 +1,16 @@ -import {v4 as uuid} from 'uuid' +import { Type } from 'avsc' +import { v4 as uuid } from 'uuid' import SchemaRegistry from './SchemaRegistry' -import {AvroSchema, ConfluentSchema, ConfluentSubject, SchemaType,} from './@types' -import API, {SchemaRegistryAPIClient} from './api' -import {COMPATIBILITY, DEFAULT_API_CLIENT_ID} from './constants' +import { + ConfluentSubject, + ConfluentSchema, + SchemaType, + AvroConfluentSchema, + JsonConfluentSchema, +} from './@types' +import API, { SchemaRegistryAPIClient } from './api' +import { COMPATIBILITY, DEFAULT_API_CLIENT_ID } from './constants' import encodedAnotherPersonV2Avro from '../fixtures/avro/encodedAnotherPersonV2' import encodedAnotherPersonV2Json from '../fixtures/json/encodedAnotherPersonV2' import encodedAnotherPersonV2Proto from '../fixtures/proto/encodedAnotherPersonV2' @@ -12,7 +19,6 @@ import wrongMagicByte from '../fixtures/wrongMagicByte' import Ajv2020 from 'ajv8/dist/2020' import Ajv from 'ajv' import { ConfluentSchemaRegistryValidationError } from './errors' -import {Type} from "avsc"; const REGISTRY_HOST = 'http://localhost:8982' const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST } @@ -365,10 +371,10 @@ describe('SchemaRegistry - new Api', () => { (type == SchemaType.AVRO ? it : it.skip)( 'uses reader schema if specified (avro-only)', async () => { const writerBuffer = Buffer.from(await schemaRegistry.encode(registryId, payload)) - const readerSchema = Type.forSchema(JSON.parse(schemaStringsByType[type].v2)) as any as AvroSchema + const readerSchema = Type.forSchema(JSON.parse(schemaStringsByType[type].v2)) await expect(schemaRegistry.decode(writerBuffer, readerSchema)).resolves.toHaveProperty( - 'city', - 'Stockholm' + 'city', + 'Stockholm' ) }) diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 7c76bc0..210310a 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -1,3 +1,4 @@ +import { Type } from 'avsc' import { Response } from 'mappersmith' import { encode, MAGIC_BYTE } from './wireEncoder' @@ -200,7 +201,7 @@ export default class SchemaRegistry { return paths } - public async decode(buffer: Buffer, readerSchema?: AvroSchema): Promise { + public async decode(buffer: Buffer, readerSchema?: Type): Promise { if (!Buffer.isBuffer(buffer)) { throw new ConfluentSchemaRegistryArgumentError('Invalid buffer') } @@ -216,7 +217,7 @@ export default class SchemaRegistry { const writerSchema = await this.getSchema(registryId) if (readerSchema) { - if (readerSchema.equals(writerSchema)){ + if (readerSchema.equals(writerSchema as Type)){ /* Even when schemas are considered equal by `avsc`, * they still aren't interchangeable: * provided `readerSchema` may have different `opts` (e.g. logicalTypes / unionWrap flags) @@ -224,7 +225,7 @@ export default class SchemaRegistry { return readerSchema.fromBuffer(payload) } else { // decode using a resolver from writer type into reader type - return readerSchema.fromBuffer(payload, readerSchema.createResolver(writerSchema as AvroSchema)) + return readerSchema.fromBuffer(payload, readerSchema.createResolver(writerSchema as Type)) } } else { return writerSchema.fromBuffer(payload) From 442872e96d67d8bb0f3948e1939ade00ef41884c Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 4 Nov 2021 08:56:11 +0100 Subject: [PATCH 3/6] Add per schema type decoding options --- docs/usage.md | 23 +++++++++++++++ src/@types.ts | 9 ++++-- src/AvroHelper.ts | 15 ++++++++-- src/SchemaRegistry.newApi.spec.ts | 46 +++++++++++++++++------------ src/SchemaRegistry.ts | 49 +++++++++++++++++++++++-------- src/cache.ts | 12 ++++---- 6 files changed, 112 insertions(+), 42 deletions(-) diff --git a/docs/usage.md b/docs/usage.md index 9751aef..e56f2cc 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -188,6 +188,29 @@ const payload = await registry.decode(buffer) // { full_name: 'John Doe' } ``` +`registry.decode` has an optional second `options` argument with options +specific to each schema type. + +### Avro + +With Avro you can specify a specific reader schema to use to decode the +message, rather than using the schema registered in the registry. This can +be useful if you need a projection that is different from the writer schema, +or if you want to decode a message with a different version than was +used to encode the message. + +```js +import avro from 'avsc' +import { readAVSCAsync } from '@kafkajs/confluent-schema-registry' + +const rawSchema = await readAVSCAsync('path/to/protocol.avdl') +const readerSchema = avro.Type.forSchema(rawSchema) + +const payload = await registry.decode(buffer, { + [SchemaType.AVRO]: { readerSchema } +}) +``` + ## Configuration ### Retry diff --git a/src/@types.ts b/src/@types.ts index 944418f..dc6c8bc 100644 --- a/src/@types.ts +++ b/src/@types.ts @@ -1,4 +1,4 @@ -import { Resolver, ForSchemaOptions } from 'avsc' +import { Resolver, ForSchemaOptions, Type } from 'avsc' import { ValidateFunction } from './JsonSchema' import Ajv from 'ajv' @@ -48,7 +48,10 @@ export interface RawAvroSchema { fields: any[] } -export interface AvroSchema extends Schema, RawAvroSchema {} +export interface AvroSchema + extends Schema, + RawAvroSchema, + Pick {} export interface ConfluentSubject { name: string @@ -56,7 +59,7 @@ export interface ConfluentSubject { export interface AvroConfluentSchema { type: SchemaType.AVRO - schema: string + schema: string | RawAvroSchema } export interface ProtoConfluentSchema { diff --git a/src/AvroHelper.ts b/src/AvroHelper.ts index 6e713f5..af35ba5 100644 --- a/src/AvroHelper.ts +++ b/src/AvroHelper.ts @@ -11,11 +11,15 @@ import avro from 'avsc' export default class AvroHelper implements SchemaHelper { private getRawAvroSchema(schema: ConfluentSchema): RawAvroSchema { - return JSON.parse(schema.schema) as RawAvroSchema + return (typeof schema.schema === 'string' + ? JSON.parse(schema.schema) + : schema.schema) as RawAvroSchema } - public getAvroSchema(schema: ConfluentSchema, opts?: AvroOptions) { - const rawSchema: RawAvroSchema = this.getRawAvroSchema(schema) + public getAvroSchema(schema: ConfluentSchema | RawAvroSchema, opts?: AvroOptions) { + const rawSchema: RawAvroSchema = this.isRawAvroSchema(schema) + ? schema + : this.getRawAvroSchema(schema) // @ts-ignore TODO: Fix typings for Schema... const avroSchema: AvroSchema = avro.Type.forSchema(rawSchema, opts) return avroSchema @@ -44,4 +48,9 @@ export default class AvroHelper implements SchemaHelper { } return subject } + + private isRawAvroSchema(schema: ConfluentSchema | RawAvroSchema): schema is RawAvroSchema { + const asRawAvroSchema = schema as RawAvroSchema + return asRawAvroSchema.name != null && asRawAvroSchema.type != null + } } diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index e088167..0138844 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -2,13 +2,7 @@ import { Type } from 'avsc' import { v4 as uuid } from 'uuid' import SchemaRegistry from './SchemaRegistry' -import { - ConfluentSubject, - ConfluentSchema, - SchemaType, - AvroConfluentSchema, - JsonConfluentSchema, -} from './@types' +import { ConfluentSubject, ConfluentSchema, SchemaType } from './@types' import API, { SchemaRegistryAPIClient } from './api' import { COMPATIBILITY, DEFAULT_API_CLIENT_ID } from './constants' import encodedAnotherPersonV2Avro from '../fixtures/avro/encodedAnotherPersonV2' @@ -366,17 +360,7 @@ describe('SchemaRegistry - new Api', () => { await schemaRegistry.decode(buffer) expect(schemaRegistry.cache.getSchema(registryId)).toBeTruthy() - }); - - (type == SchemaType.AVRO ? it : it.skip)( - 'uses reader schema if specified (avro-only)', async () => { - const writerBuffer = Buffer.from(await schemaRegistry.encode(registryId, payload)) - const readerSchema = Type.forSchema(JSON.parse(schemaStringsByType[type].v2)) - await expect(schemaRegistry.decode(writerBuffer, readerSchema)).resolves.toHaveProperty( - 'city', - 'Stockholm' - ) - }) + }) it('creates a single origin request for a schema cache-miss', async () => { const buffer = Buffer.from(await schemaRegistry.encode(registryId, payload)) @@ -609,4 +593,30 @@ describe('SchemaRegistry - new Api', () => { ) }) }) + + describe('Avro tests', () => { + it('uses reader schema if specified (avro-only)', async () => { + const subject: ConfluentSubject = { + name: [SchemaType.AVRO, 'com.org.domain.fixtures', 'AnotherPerson'].join('.'), + } + const schema: ConfluentSchema = { + type: SchemaType.AVRO, + schema: schemaStringsByType[SchemaType.AVRO].v1, + } + const registryId = (await schemaRegistry.register(schema, { subject: subject.name })).id + const writerBuffer = Buffer.from(await schemaRegistry.encode(registryId, payload)) + const readerSchema = JSON.parse(schemaStringsByType[SchemaType.AVRO].v2) + + await expect( + schemaRegistry.decode(writerBuffer, { [SchemaType.AVRO]: { readerSchema } }), + ).resolves.toHaveProperty('city', 'Stockholm') + + const registeredReaderSchema = await schemaRegistry.getSchema(registryId) + await expect( + schemaRegistry.decode(writerBuffer, { + [SchemaType.AVRO]: { readerSchema: registeredReaderSchema }, + }), + ) + }) + }) }) diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 210310a..bc7bb3e 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -38,6 +38,13 @@ interface Opts { subject: string } +interface AvroDecodeOptions { + readerSchema?: RawAvroSchema | AvroSchema | Schema +} +interface DecodeOptions { + [SchemaType.AVRO]?: AvroDecodeOptions +} + const DEFAULT_OPTS = { compatibility: COMPATIBILITY.BACKWARD, separator: DEFAULT_SEPERATOR, @@ -144,16 +151,18 @@ export default class SchemaRegistry { const registeredSchema: RegisteredSchema = response.data() this.cache.setLatestRegistryId(subject.name, registeredSchema.id) - this.cache.setSchema(registeredSchema.id, schemaInstance) + this.cache.setSchema(registeredSchema.id, confluentSchema.type, schemaInstance) return registeredSchema } - public async getSchema(registryId: number): Promise { - const schema = this.cache.getSchema(registryId) + private async _getSchema( + registryId: number, + ): Promise<{ type: SchemaType; schema: Schema | AvroSchema }> { + const cacheEntry = this.cache.getSchema(registryId) - if (schema) { - return schema + if (cacheEntry) { + return cacheEntry } const response = await this.getSchemaOriginRequest(registryId) @@ -170,7 +179,11 @@ export default class SchemaRegistry { schema: rawSchema, } const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) - return this.cache.setSchema(registryId, schemaInstance) + return this.cache.setSchema(registryId, schemaType, schemaInstance) + } + + public async getSchema(registryId: number): Promise { + return await (await this._getSchema(registryId)).schema } public async encode(registryId: number, payload: any): Promise { @@ -180,7 +193,7 @@ export default class SchemaRegistry { ) } - const schema = await this.getSchema(registryId) + const { schema } = await this._getSchema(registryId) try { const serializedPayload = schema.toBuffer(payload) return encode(registryId, serializedPayload) @@ -201,7 +214,7 @@ export default class SchemaRegistry { return paths } - public async decode(buffer: Buffer, readerSchema?: Type): Promise { + public async decode(buffer: Buffer, options?: DecodeOptions): Promise { if (!Buffer.isBuffer(buffer)) { throw new ConfluentSchemaRegistryArgumentError('Invalid buffer') } @@ -215,9 +228,19 @@ export default class SchemaRegistry { ) } - const writerSchema = await this.getSchema(registryId) - if (readerSchema) { - if (readerSchema.equals(writerSchema as Type)){ + const { type, schema: writerSchema } = await this._getSchema(registryId) + + let rawReaderSchema + switch (type) { + case SchemaType.AVRO: + rawReaderSchema = options?.[SchemaType.AVRO]?.readerSchema as RawAvroSchema | AvroSchema + } + if (rawReaderSchema) { + const readerSchema = schemaFromConfluentSchema( + { type: SchemaType.AVRO, schema: rawReaderSchema }, + this.options, + ) as AvroSchema + if (readerSchema.equals(writerSchema as Type)) { /* Even when schemas are considered equal by `avsc`, * they still aren't interchangeable: * provided `readerSchema` may have different `opts` (e.g. logicalTypes / unionWrap flags) @@ -227,9 +250,9 @@ export default class SchemaRegistry { // decode using a resolver from writer type into reader type return readerSchema.fromBuffer(payload, readerSchema.createResolver(writerSchema as Type)) } - } else { - return writerSchema.fromBuffer(payload) } + + return writerSchema.fromBuffer(payload) } public async getRegistryId(subject: string, version: number | string): Promise { diff --git a/src/cache.ts b/src/cache.ts index 40b22c1..c2cee77 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -1,8 +1,10 @@ -import { AvroSchema, Schema } from './@types' +import { AvroSchema, Schema, SchemaType } from './@types' + +type CacheEntry = { type: SchemaType; schema: Schema | AvroSchema } export default class Cache { registryIdBySubject: { [key: string]: number } - schemasByRegistryId: { [key: string]: Schema } + schemasByRegistryId: { [key: string]: CacheEntry } constructor() { this.registryIdBySubject = {} @@ -17,10 +19,10 @@ export default class Cache { return this.registryIdBySubject[subject] } - getSchema = (registryId: number): Schema | AvroSchema => this.schemasByRegistryId[registryId] + getSchema = (registryId: number): CacheEntry | undefined => this.schemasByRegistryId[registryId] - setSchema = (registryId: number, schema: Schema) => { - this.schemasByRegistryId[registryId] = schema + setSchema = (registryId: number, type: SchemaType, schema: Schema): CacheEntry => { + this.schemasByRegistryId[registryId] = { type, schema } return this.schemasByRegistryId[registryId] } From 2138fc9ceaf37b4fdff3f1240866a58c1ae73264 Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 4 Nov 2021 10:32:13 +0100 Subject: [PATCH 4/6] Fix type discrimination between schema types --- src/JsonSchema.ts | 6 +++--- src/ProtoSchema.ts | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/JsonSchema.ts b/src/JsonSchema.ts index 8085e0c..a2acc35 100644 --- a/src/JsonSchema.ts +++ b/src/JsonSchema.ts @@ -1,4 +1,4 @@ -import { Schema, JsonOptions, ConfluentSchema } from './@types' +import { Schema, JsonOptions, JsonConfluentSchema } from './@types' import Ajv from 'ajv' import { ConfluentSchemaRegistryValidationError } from './errors' @@ -24,11 +24,11 @@ export interface ValidateFunction { export default class JsonSchema implements Schema { private validate: ValidateFunction - constructor(schema: ConfluentSchema, opts?: JsonOptions) { + constructor(schema: JsonConfluentSchema, opts?: JsonOptions) { this.validate = this.getJsonSchema(schema, opts) } - private getJsonSchema(schema: ConfluentSchema, opts?: JsonOptions) { + private getJsonSchema(schema: JsonConfluentSchema, opts?: JsonOptions) { const ajv = opts?.ajvInstance ?? new Ajv(opts) const validate = ajv.compile(JSON.parse(schema.schema)) return validate diff --git a/src/ProtoSchema.ts b/src/ProtoSchema.ts index 8583c71..d69c2d7 100644 --- a/src/ProtoSchema.ts +++ b/src/ProtoSchema.ts @@ -1,4 +1,4 @@ -import { Schema, ConfluentSchema, ProtoOptions } from './@types' +import { Schema, ProtoOptions, ProtoConfluentSchema } from './@types' import protobuf from 'protobufjs' import { IParserResult, ReflectionObject, Namespace, Type } from 'protobufjs/light' import { @@ -9,7 +9,7 @@ import { export default class ProtoSchema implements Schema { private message: Type - constructor(schema: ConfluentSchema, opts?: ProtoOptions) { + constructor(schema: ProtoConfluentSchema, opts?: ProtoOptions) { const parsedMessage = protobuf.parse(schema.schema) const root = parsedMessage.root this.message = root.lookupType(this.getTypeName(parsedMessage, opts)) From 7376ac6e9728d8af57fb43af256afa4230cac14e Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 4 Nov 2021 10:33:40 +0100 Subject: [PATCH 5/6] Remove outdated docs --- docs/introduction.md | 6 ------ 1 file changed, 6 deletions(-) diff --git a/docs/introduction.md b/docs/introduction.md index ee2f567..1d95909 100755 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -43,10 +43,4 @@ const encodedPayload = await registry.encode(id, payload) // Decode the payload const decodedPayload = await registry.decode(encodedPayload) - -// Decode with resolving to a reader schema (avro-only) -// Note: avsc opts, if needed, should be set to readerSchema -const avsc = require('avsc') -const readerSchema = avsc.Type.forSchema(/* schema, opts */) -const resolvedPayload = await registry.decode(encodedPayload, readerSchema) ``` \ No newline at end of file From fd327e596cd9edc52f799025b3d18e68f8a3c19a Mon Sep 17 00:00:00 2001 From: Tommy Brunn Date: Thu, 4 Nov 2021 10:34:19 +0100 Subject: [PATCH 6/6] Update docs --- docs/usage.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/usage.md b/docs/usage.md index e56f2cc..1028e03 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -203,8 +203,7 @@ used to encode the message. import avro from 'avsc' import { readAVSCAsync } from '@kafkajs/confluent-schema-registry' -const rawSchema = await readAVSCAsync('path/to/protocol.avdl') -const readerSchema = avro.Type.forSchema(rawSchema) +const readerSchema = await readAVSCAsync('path/to/protocol.avdl') const payload = await registry.decode(buffer, { [SchemaType.AVRO]: { readerSchema }