Skip to content

Commit

Permalink
Merge pull request #166 from kafkajs/avro-schema-resolution
Browse files Browse the repository at this point in the history
Support reader schema when decoding avro
  • Loading branch information
Nevon authored Nov 22, 2021
2 parents d282f9f + fd327e5 commit 8ebd46b
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 33 deletions.
22 changes: 22 additions & 0 deletions docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,28 @@ const payload = await registry.decode(buffer)
// { full_name: 'John Doe' }
```

`registry.decode` has an optional second `options` argument with options
specific to each schema type.

### Avro

With Avro you can specify a specific reader schema to use to decode the
message, rather than using the schema registered in the registry. This can
be useful if you need a projection that is different from the writer schema,
or if you want to decode a message with a different version than was
used to encode the message.

```js
import avro from 'avsc'
import { readAVSCAsync } from '@kafkajs/confluent-schema-registry'

const readerSchema = await readAVSCAsync('path/to/protocol.avdl')

const payload = await registry.decode(buffer, {
[SchemaType.AVRO]: { readerSchema }
})
```

## Configuration

### Retry
Expand Down
9 changes: 6 additions & 3 deletions src/@types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Resolver, ForSchemaOptions } from 'avsc'
import { Resolver, ForSchemaOptions, Type } from 'avsc'
import { ValidateFunction } from './JsonSchema'
import Ajv from 'ajv'

Expand Down Expand Up @@ -48,15 +48,18 @@ export interface RawAvroSchema {
fields: any[]
}

export interface AvroSchema extends Schema, RawAvroSchema {}
export interface AvroSchema
extends Schema,
RawAvroSchema,
Pick<Type, 'equals' | 'createResolver'> {}

export interface ConfluentSubject {
name: string
}

export interface AvroConfluentSchema {
type: SchemaType.AVRO
schema: string
schema: string | RawAvroSchema
}

export interface ProtoConfluentSchema {
Expand Down
15 changes: 12 additions & 3 deletions src/AvroHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,15 @@ import avro from 'avsc'

export default class AvroHelper implements SchemaHelper {
private getRawAvroSchema(schema: ConfluentSchema): RawAvroSchema {
return JSON.parse(schema.schema) as RawAvroSchema
return (typeof schema.schema === 'string'
? JSON.parse(schema.schema)
: schema.schema) as RawAvroSchema
}

public getAvroSchema(schema: ConfluentSchema, opts?: AvroOptions) {
const rawSchema: RawAvroSchema = this.getRawAvroSchema(schema)
public getAvroSchema(schema: ConfluentSchema | RawAvroSchema, opts?: AvroOptions) {
const rawSchema: RawAvroSchema = this.isRawAvroSchema(schema)
? schema
: this.getRawAvroSchema(schema)
// @ts-ignore TODO: Fix typings for Schema...
const avroSchema: AvroSchema = avro.Type.forSchema(rawSchema, opts)
return avroSchema
Expand Down Expand Up @@ -44,4 +48,9 @@ export default class AvroHelper implements SchemaHelper {
}
return subject
}

private isRawAvroSchema(schema: ConfluentSchema | RawAvroSchema): schema is RawAvroSchema {
const asRawAvroSchema = schema as RawAvroSchema
return asRawAvroSchema.name != null && asRawAvroSchema.type != null
}
}
6 changes: 3 additions & 3 deletions src/JsonSchema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Schema, JsonOptions, ConfluentSchema } from './@types'
import { Schema, JsonOptions, JsonConfluentSchema } from './@types'
import Ajv from 'ajv'
import { ConfluentSchemaRegistryValidationError } from './errors'

Expand All @@ -24,11 +24,11 @@ export interface ValidateFunction {
export default class JsonSchema implements Schema {
private validate: ValidateFunction

constructor(schema: ConfluentSchema, opts?: JsonOptions) {
constructor(schema: JsonConfluentSchema, opts?: JsonOptions) {
this.validate = this.getJsonSchema(schema, opts)
}

private getJsonSchema(schema: ConfluentSchema, opts?: JsonOptions) {
private getJsonSchema(schema: JsonConfluentSchema, opts?: JsonOptions) {
const ajv = opts?.ajvInstance ?? new Ajv(opts)
const validate = ajv.compile(JSON.parse(schema.schema))
return validate
Expand Down
4 changes: 2 additions & 2 deletions src/ProtoSchema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Schema, ConfluentSchema, ProtoOptions } from './@types'
import { Schema, ProtoOptions, ProtoConfluentSchema } from './@types'
import protobuf from 'protobufjs'
import { IParserResult, ReflectionObject, Namespace, Type } from 'protobufjs/light'
import {
Expand All @@ -9,7 +9,7 @@ import {
export default class ProtoSchema implements Schema {
private message: Type

constructor(schema: ConfluentSchema, opts?: ProtoOptions) {
constructor(schema: ProtoConfluentSchema, opts?: ProtoOptions) {
const parsedMessage = protobuf.parse(schema.schema)
const root = parsedMessage.root
this.message = root.lookupType(this.getTypeName(parsedMessage, opts))
Expand Down
35 changes: 28 additions & 7 deletions src/SchemaRegistry.newApi.spec.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import { Type } from 'avsc'
import { v4 as uuid } from 'uuid'

import SchemaRegistry from './SchemaRegistry'
import {
ConfluentSubject,
ConfluentSchema,
SchemaType,
AvroConfluentSchema,
JsonConfluentSchema,
} from './@types'
import { ConfluentSubject, ConfluentSchema, SchemaType } from './@types'
import API, { SchemaRegistryAPIClient } from './api'
import { COMPATIBILITY, DEFAULT_API_CLIENT_ID } from './constants'
import encodedAnotherPersonV2Avro from '../fixtures/avro/encodedAnotherPersonV2'
Expand Down Expand Up @@ -598,4 +593,30 @@ describe('SchemaRegistry - new Api', () => {
)
})
})

describe('Avro tests', () => {
it('uses reader schema if specified (avro-only)', async () => {
const subject: ConfluentSubject = {
name: [SchemaType.AVRO, 'com.org.domain.fixtures', 'AnotherPerson'].join('.'),
}
const schema: ConfluentSchema = {
type: SchemaType.AVRO,
schema: schemaStringsByType[SchemaType.AVRO].v1,
}
const registryId = (await schemaRegistry.register(schema, { subject: subject.name })).id
const writerBuffer = Buffer.from(await schemaRegistry.encode(registryId, payload))
const readerSchema = JSON.parse(schemaStringsByType[SchemaType.AVRO].v2)

await expect(
schemaRegistry.decode(writerBuffer, { [SchemaType.AVRO]: { readerSchema } }),
).resolves.toHaveProperty('city', 'Stockholm')

const registeredReaderSchema = await schemaRegistry.getSchema(registryId)
await expect(
schemaRegistry.decode(writerBuffer, {
[SchemaType.AVRO]: { readerSchema: registeredReaderSchema },
}),
)
})
})
})
57 changes: 47 additions & 10 deletions src/SchemaRegistry.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Type } from 'avsc'
import { Response } from 'mappersmith'

import { encode, MAGIC_BYTE } from './wireEncoder'
Expand Down Expand Up @@ -37,6 +38,13 @@ interface Opts {
subject: string
}

interface AvroDecodeOptions {
readerSchema?: RawAvroSchema | AvroSchema | Schema
}
interface DecodeOptions {
[SchemaType.AVRO]?: AvroDecodeOptions
}

const DEFAULT_OPTS = {
compatibility: COMPATIBILITY.BACKWARD,
separator: DEFAULT_SEPERATOR,
Expand Down Expand Up @@ -143,16 +151,18 @@ export default class SchemaRegistry {

const registeredSchema: RegisteredSchema = response.data()
this.cache.setLatestRegistryId(subject.name, registeredSchema.id)
this.cache.setSchema(registeredSchema.id, schemaInstance)
this.cache.setSchema(registeredSchema.id, confluentSchema.type, schemaInstance)

return registeredSchema
}

public async getSchema(registryId: number): Promise<Schema | AvroSchema> {
const schema = this.cache.getSchema(registryId)
private async _getSchema(
registryId: number,
): Promise<{ type: SchemaType; schema: Schema | AvroSchema }> {
const cacheEntry = this.cache.getSchema(registryId)

if (schema) {
return schema
if (cacheEntry) {
return cacheEntry
}

const response = await this.getSchemaOriginRequest(registryId)
Expand All @@ -169,7 +179,11 @@ export default class SchemaRegistry {
schema: rawSchema,
}
const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options)
return this.cache.setSchema(registryId, schemaInstance)
return this.cache.setSchema(registryId, schemaType, schemaInstance)
}

public async getSchema(registryId: number): Promise<Schema | AvroSchema> {
return await (await this._getSchema(registryId)).schema
}

public async encode(registryId: number, payload: any): Promise<Buffer> {
Expand All @@ -179,7 +193,7 @@ export default class SchemaRegistry {
)
}

const schema = await this.getSchema(registryId)
const { schema } = await this._getSchema(registryId)
try {
const serializedPayload = schema.toBuffer(payload)
return encode(registryId, serializedPayload)
Expand All @@ -200,7 +214,7 @@ export default class SchemaRegistry {
return paths
}

public async decode(buffer: Buffer): Promise<any> {
public async decode(buffer: Buffer, options?: DecodeOptions): Promise<any> {
if (!Buffer.isBuffer(buffer)) {
throw new ConfluentSchemaRegistryArgumentError('Invalid buffer')
}
Expand All @@ -214,8 +228,31 @@ export default class SchemaRegistry {
)
}

const schema = await this.getSchema(registryId)
return schema.fromBuffer(payload)
const { type, schema: writerSchema } = await this._getSchema(registryId)

let rawReaderSchema
switch (type) {
case SchemaType.AVRO:
rawReaderSchema = options?.[SchemaType.AVRO]?.readerSchema as RawAvroSchema | AvroSchema
}
if (rawReaderSchema) {
const readerSchema = schemaFromConfluentSchema(
{ type: SchemaType.AVRO, schema: rawReaderSchema },
this.options,
) as AvroSchema
if (readerSchema.equals(writerSchema as Type)) {
/* Even when schemas are considered equal by `avsc`,
* they still aren't interchangeable:
* provided `readerSchema` may have different `opts` (e.g. logicalTypes / unionWrap flags)
* see https://github.com/mtth/avsc/issues/362 */
return readerSchema.fromBuffer(payload)
} else {
// decode using a resolver from writer type into reader type
return readerSchema.fromBuffer(payload, readerSchema.createResolver(writerSchema as Type))
}
}

return writerSchema.fromBuffer(payload)
}

public async getRegistryId(subject: string, version: number | string): Promise<number> {
Expand Down
12 changes: 7 additions & 5 deletions src/cache.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { AvroSchema, Schema } from './@types'
import { AvroSchema, Schema, SchemaType } from './@types'

type CacheEntry = { type: SchemaType; schema: Schema | AvroSchema }

export default class Cache {
registryIdBySubject: { [key: string]: number }
schemasByRegistryId: { [key: string]: Schema }
schemasByRegistryId: { [key: string]: CacheEntry }

constructor() {
this.registryIdBySubject = {}
Expand All @@ -17,10 +19,10 @@ export default class Cache {
return this.registryIdBySubject[subject]
}

getSchema = (registryId: number): Schema | AvroSchema => this.schemasByRegistryId[registryId]
getSchema = (registryId: number): CacheEntry | undefined => this.schemasByRegistryId[registryId]

setSchema = (registryId: number, schema: Schema) => {
this.schemasByRegistryId[registryId] = schema
setSchema = (registryId: number, type: SchemaType, schema: Schema): CacheEntry => {
this.schemasByRegistryId[registryId] = { type, schema }

return this.schemasByRegistryId[registryId]
}
Expand Down

0 comments on commit 8ebd46b

Please sign in to comment.