diff --git a/src/@types.ts b/src/@types.ts index d744ed2..53604c7 100644 --- a/src/@types.ts +++ b/src/@types.ts @@ -10,6 +10,14 @@ export interface Schema extends RawSchema { isValid: (payload: object, opts: { errorHook: (path: any) => void }) => void // FIXME: } +export interface SchemaReference { + name: string + subject: string + version: number | string +} + +export type SchemaRef = Omit + declare global { // eslint-disable-next-line @typescript-eslint/no-namespace namespace jest { diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index d5f69f7..cdb54d4 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -1,4 +1,6 @@ +/* eslint-disable prettier/prettier */ import { Response } from 'mappersmith' +import { types } from 'avsc' import { encode, MAGIC_BYTE } from './encoder' import decode from './decoder' @@ -14,7 +16,7 @@ import { ConfluentSchemaRegistryArgumentError, ConfluentSchemaRegistryCompatibilityError, } from './errors' -import { Schema, RawSchema } from './@types' +import { RawSchema, Schema, SchemaReference } from './@types' interface RegisteredSchema { id: number @@ -102,10 +104,21 @@ export default class SchemaRegistry { } const response = await this.getSchemaOriginRequest(registryId) - const foundSchema: { schema: string } = response.data() + const foundSchema: { schema: string; references?: Array } = response.data() const rawSchema: RawSchema = JSON.parse(foundSchema.schema) - - return this.cache.setSchema(registryId, rawSchema) + let logicalTypes: Record types.LogicalType> | undefined + if (foundSchema.references) { + logicalTypes = Object.fromEntries( + await Promise.all( + foundSchema.references.map(async ({ name, subject, version }) => { + const schemaType = await this.getSchema(await this.getRegistryId(subject, version)) + + return [ name, schemaType ]; + }), + ), + ) + } + return this.cache.setSchema(registryId, rawSchema, logicalTypes) } public async encode(registryId: number, jsonPayload: any): Promise { @@ -140,8 +153,13 @@ export default class SchemaRegistry { } public async getRegistryId(subject: string, version: number | string): Promise { + const cached = this.cache.getRegistryIdBySchemaRef({subject, version}); + if (cached) { + return cached; + } const response = await this.api.Subject.version({ subject, version }) const { id }: { id: number } = response.data() + this.cache.setRegistryIdBySchemaRef({subject, version}, id); return id } diff --git a/src/cache.ts b/src/cache.ts index 8059902..f83faa4 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -1,18 +1,31 @@ -import avro, { ForSchemaOptions } from 'avsc' +import avro, { ForSchemaOptions, types } from 'avsc' -import { Schema, RawSchema } from './@types' +import { RawSchema, Schema, SchemaRef } from './@types' export default class Cache { registryIdBySubject: { [key: string]: number } schemasByRegistryId: { [key: string]: Schema } + registryIdBySchemaRef: { [key: string]: number } forSchemaOptions?: Partial constructor(forSchemaOptions?: Partial) { this.registryIdBySubject = {} this.schemasByRegistryId = {} + this.registryIdBySchemaRef = {} this.forSchemaOptions = forSchemaOptions } + private schemaKeyGen = ({ subject, version }: SchemaRef): string => `${subject}:${version}` + + getRegistryIdBySchemaRef = (schema: SchemaRef): number => + this.registryIdBySchemaRef[this.schemaKeyGen(schema)] + + setRegistryIdBySchemaRef = (schema: SchemaRef, registryId: number) => { + this.registryIdBySchemaRef[this.schemaKeyGen(schema)] = registryId + + return this.registryIdBySchemaRef[this.schemaKeyGen(schema)] + } + getLatestRegistryId = (subject: string): number | undefined => this.registryIdBySubject[subject] setLatestRegistryId = (subject: string, id: number): number => { @@ -23,9 +36,34 @@ export default class Cache { getSchema = (registryId: number): Schema => this.schemasByRegistryId[registryId] - setSchema = (registryId: number, schema: RawSchema) => { + setSchema = ( + registryId: number, + schema: RawSchema, + logicalTypesExtra: Record types.LogicalType> = {}, + ) => { // @ts-ignore TODO: Fix typings for Schema... - this.schemasByRegistryId[registryId] = avro.Type.forSchema(schema, this.forSchemaOptions) + this.schemasByRegistryId[registryId] = avro.Type.forSchema(schema, { + ...this.forSchemaOptions, + typeHook: + this.forSchemaOptions?.typeHook || + function(attr, opts) { + if (typeof attr == 'string') { + if (attr in opts.logicalTypes) { + return (opts.logicalTypes[attr] as unknown) as avro.Type + } + // if we map this as 'namespace.type'. + const qualifiedName = `${opts.namespace}.${attr}` + if (qualifiedName in opts.logicalTypes) { + return (opts.logicalTypes[qualifiedName] as unknown) as avro.Type + } + } + return (undefined as unknown) as avro.Type + }, + logicalTypes: { + ...this.forSchemaOptions?.logicalTypes, + ...logicalTypesExtra, + }, + }) return this.schemasByRegistryId[registryId] }