From bb75685778b61ed53bc49de280fb20cf18014dab Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Thu, 8 Jul 2021 15:37:21 +0200 Subject: [PATCH 1/9] Specify shell type --- bin/avdlToAVSC.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bin/avdlToAVSC.sh b/bin/avdlToAVSC.sh index 0af6303..2873bb3 100755 --- a/bin/avdlToAVSC.sh +++ b/bin/avdlToAVSC.sh @@ -1,3 +1,5 @@ +#!/bin/sh + avdl_path=$1 avsc_name=$2 From 7c8e714088e5082e44b2a8294358e63b7dfbcba3 Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Thu, 8 Jul 2021 16:03:15 +0200 Subject: [PATCH 2/9] Extend schema registration with Protocol Buffer references --- src/@types.ts | 1 + src/AvroHelper.ts | 7 ++++ src/JsonHelper.ts | 7 ++++ src/ProtoHelper.ts | 30 ++++++++++++--- src/SchemaRegistry.newApi.spec.ts | 62 +++++++++++++++++++++++++++++++ src/SchemaRegistry.ts | 39 ++++++++++++++++++- src/api/index.ts | 5 +++ 7 files changed, 145 insertions(+), 6 deletions(-) diff --git a/src/@types.ts b/src/@types.ts index 7d122ae..be2c13f 100644 --- a/src/@types.ts +++ b/src/@types.ts @@ -11,6 +11,7 @@ export enum SchemaType { export interface SchemaHelper { validate(schema: Schema): void getSubject(confluentSchema: ConfluentSchema, schema: Schema, separator: string): ConfluentSubject + referencedSchemas(schema: string): Promise } export type AvroOptions = Partial diff --git a/src/AvroHelper.ts b/src/AvroHelper.ts index 6e713f5..295b57c 100644 --- a/src/AvroHelper.ts +++ b/src/AvroHelper.ts @@ -44,4 +44,11 @@ export default class AvroHelper implements SchemaHelper { } return subject } + + public async referencedSchemas( + // eslint-disable-next-line @typescript-eslint/no-unused-vars + _schema: string, + ): Promise { + return [] + } } diff --git a/src/JsonHelper.ts b/src/JsonHelper.ts index 4a4b1cd..07bb634 100644 --- a/src/JsonHelper.ts +++ b/src/JsonHelper.ts @@ -14,4 +14,11 @@ export default class JsonHelper implements SchemaHelper { ): ConfluentSubject { throw new ConfluentSchemaRegistryError('not implemented yet') } + + public async referencedSchemas( + // eslint-disable-next-line @typescript-eslint/no-unused-vars + _schema: string, + ): Promise { + return [] + } } diff --git a/src/ProtoHelper.ts b/src/ProtoHelper.ts index d71124f..d2c5004 100644 --- a/src/ProtoHelper.ts +++ b/src/ProtoHelper.ts @@ -1,17 +1,37 @@ -// @ts-nocheck import { Schema, SchemaHelper, ConfluentSubject, ConfluentSchema } from './@types' import { ConfluentSchemaRegistryError } from './errors' +import { parse } from 'protobufjs' export default class ProtoHelper implements SchemaHelper { - public validate(schema: Schema): void { + // eslint-disable-next-line @typescript-eslint/no-unused-vars + public validate(_schema: Schema): void { return } public getSubject( - confluentSchema: ConfluentSchema, - schema: Schema, - separator: string, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + _confluentSchema: ConfluentSchema, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + _schema: Schema, + // eslint-disable-next-line @typescript-eslint/no-unused-vars + _separator: string, ): ConfluentSubject { throw new ConfluentSchemaRegistryError('not implemented yet') } + + /** + * Get the schemas referenced by the provided schema. + * + * @param schema The schema to find references for + * @param fetchSchema A helper function that can fetch the schema definition given + * the import path (reference name) + * + * @returns A map from Schema Registry subject to ConfluentSchema. The map describes + * each imported/referenced schema that is used by the given schema. + */ + public async referencedSchemas(schema: string): Promise { + const parsed = parse(schema) + const out: string[] = [] + return out.concat(parsed.imports || []).concat(parsed.weakImports || []) + } } diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index 4534e5f..dbd37c0 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -15,6 +15,7 @@ import encodedAnotherPersonV2Json from '../fixtures/json/encodedAnotherPersonV2' import encodedAnotherPersonV2Proto from '../fixtures/proto/encodedAnotherPersonV2' import encodedNestedV2Proto from '../fixtures/proto/encodedNestedV2' import wrongMagicByte from '../fixtures/wrongMagicByte' +import ProtoSchema from './ProtoSchema' const REGISTRY_HOST = 'http://localhost:8982' const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST } @@ -456,6 +457,32 @@ describe('SchemaRegistry - new Api', () => { v3Opts = { [SchemaType.PROTOBUF]: { messageName: 'AnotherPerson' } }, type = SchemaType.PROTOBUF + const protoV3 = ` + syntax = "proto3"; + package com.org.domain.fixtures; + message AnotherPerson { + string city = 2 [default = "Stockholm"]; + } + ` + + const protoImportsV3 = ` + syntax = "proto3"; + package com.org.domain.fixtures; + import "referenced.proto"; + message AnotherPerson { + string city = 1 [default = "Stockholm"]; + ReferencedMessage referenced = 2; + } + ` + + const protoReferencedMessage = ` + syntax = "proto3"; + package com.org.domain.fixtures; + message ReferencedMessage { + string something = 1; + } + ` + beforeAll(() => { schemaRegistry = new SchemaRegistry(schemaRegistryArgs, v3Opts) }) @@ -544,6 +571,41 @@ describe('SchemaRegistry - new Api', () => { }) }) + it('register and encode protocol buffer v3 schema', async () => { + const confluentSchemaV3: ConfluentSchema = { + type, + schema: protoV3, + } + + const schema1 = await schemaRegistry.register(confluentSchemaV3, { + subject: `${type}_test_protoV3-value`, + }) + + await schemaRegistry.encode(schema1.id, payload) + }) + + it('register and encode protocol buffer v3 schema with import', async () => { + const confluentSchemaV3: ConfluentSchema = { + type, + schema: protoImportsV3, + } + + async function fetchSchema(referenceName: string): Promise { + // eslint-disable-next-line no-console + console.log(`fetching schema for: ${referenceName}`) + return { type: SchemaType.PROTOBUF, schema: protoReferencedMessage } + } + + /*const schema1 = */ + await schemaRegistry.register(confluentSchemaV3, { + subject: `${type}_test_protoImportsV3-value`, + fetchSchema, + }) + + // TODO: fix encode, so it understands imported types + // await schemaRegistry.encode(schema1.id, payload) + }) + it('decodes', async () => { const confluentSchemaV4: ConfluentSchema = { type, diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 3f31ffd..345e664 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -20,6 +20,7 @@ import { ConfluentSubject, SchemaRegistryAPIClientOptions, AvroConfluentSchema, + SchemaHelper, } from './@types' import { helperTypeFromSchemaType, @@ -34,6 +35,7 @@ interface RegisteredSchema { interface Opts { compatibility?: COMPATIBILITY separator?: string + fetchSchema?: (referenceName: string) => Promise subject: string } @@ -81,6 +83,35 @@ export default class SchemaRegistry { return confluentSchema } + async registerReferences( + opts: Omit & { subject?: string }, + helper: SchemaHelper, + confluentSchema: ConfluentSchema, + ): Promise<{ subject: string; name: string; version: number }[]> { + const fetchSchema = opts.fetchSchema + if (!fetchSchema) { + return [] + } + + const subjects = await helper.referencedSchemas(confluentSchema.schema) + + return await Promise.all( + subjects.map(async subject => { + const schema = await fetchSchema(subject) + const registeredSchema = await this.register(schema, { ...opts, subject }) + const response = await this.api.Schema.versions({ id: registeredSchema.id }) + const allVersions: [{ subject: string; version: number }] = response.data() + + const subjectVersion = allVersions + .filter(it => it.subject === subject) + .map(it => it.version) + .reduce((a, b) => Math.max(a, b)) + + return { subject, name: subject, version: subjectVersion } + }), + ) + } + public async register( schema: Exclude, userOpts: Opts, @@ -97,7 +128,8 @@ export default class SchemaRegistry { schema: RawAvroSchema | ConfluentSchema, userOpts?: Opts, ): Promise { - const { compatibility, separator } = { ...DEFAULT_OPTS, ...userOpts } + const opts = { ...DEFAULT_OPTS, ...userOpts } + const { compatibility, separator } = opts const confluentSchema: ConfluentSchema = this.getConfluentSchema(schema) @@ -105,6 +137,10 @@ export default class SchemaRegistry { const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) helper.validate(schemaInstance) + // If this schema depends on other schemas, we need to make sure those are registered first. + // Later, we pass their IDs to Schema Registry as our 'references'. + const references = await this.registerReferences(opts, helper, confluentSchema) + let subject: ConfluentSubject if (userOpts?.subject) { subject = { @@ -138,6 +174,7 @@ export default class SchemaRegistry { body: { schemaType: confluentSchema.type, schema: confluentSchema.schema, + references, }, }) diff --git a/src/api/index.ts b/src/api/index.ts index 87fd65e..a23297e 100644 --- a/src/api/index.ts +++ b/src/api/index.ts @@ -28,6 +28,7 @@ export interface SchemaRegistryAPIClientArgs { export type SchemaRegistryAPIClient = Client<{ Schema: { find: (_: any) => any + versions: (_: any) => any } Subject: { all: (_: any) => any @@ -65,6 +66,10 @@ export default ({ method: 'get', path: '/schemas/ids/{id}', }, + versions: { + method: 'get', + path: '/schemas/ids/{id}/versions', + }, }, Subject: { all: { From c66fc55ed882f387738dba8cadbf074c71ad4f71 Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Fri, 9 Jul 2021 09:03:25 +0200 Subject: [PATCH 3/9] Support for references in getSchema --- src/ProtoSchema.ts | 15 ++++++++--- src/SchemaRegistry.newApi.spec.ts | 24 ++++++++++++++--- src/SchemaRegistry.ts | 44 +++++++++++++++++++++++++++---- src/schemaTypeResolver.ts | 3 ++- 4 files changed, 73 insertions(+), 13 deletions(-) diff --git a/src/ProtoSchema.ts b/src/ProtoSchema.ts index 8583c71..944426c 100644 --- a/src/ProtoSchema.ts +++ b/src/ProtoSchema.ts @@ -8,11 +8,20 @@ import { export default class ProtoSchema implements Schema { private message: Type + private root: protobuf.Root - constructor(schema: ConfluentSchema, opts?: ProtoOptions) { + constructor(schema: ConfluentSchema, opts?: ProtoOptions, references?: Schema[]) { const parsedMessage = protobuf.parse(schema.schema) - const root = parsedMessage.root - this.message = root.lookupType(this.getTypeName(parsedMessage, opts)) + this.root = parsedMessage.root + + if (references) { + const schemas = references as ProtoSchema[] + schemas.forEach(reference => { + this.root.add(reference.root) + }) + } + + this.message = this.root.lookupType(this.getTypeName(parsedMessage, opts)) } private getNestedTypeName(parent: { [k: string]: ReflectionObject } | undefined): string { diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index dbd37c0..f70383c 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -596,14 +596,30 @@ describe('SchemaRegistry - new Api', () => { return { type: SchemaType.PROTOBUF, schema: protoReferencedMessage } } - /*const schema1 = */ - await schemaRegistry.register(confluentSchemaV3, { + const schema1 = await schemaRegistry.register(confluentSchemaV3, { subject: `${type}_test_protoImportsV3-value`, fetchSchema, }) - // TODO: fix encode, so it understands imported types - // await schemaRegistry.encode(schema1.id, payload) + // Check that we can encode with the cached version from the register() call + const payload = { referenced: { something: 'imported-schema' } } + const encoded1 = await schemaRegistry.encode(schema1.id, payload) + const decoded1 = await schemaRegistry.decode(encoded1) + + // Clear the cache and try again to exercise getSchema() + schemaRegistry.cache.clear() + const encoded2 = await schemaRegistry.encode(schema1.id, payload) + schemaRegistry.cache.clear() + const decoded2 = await schemaRegistry.decode(encoded2) + + expect(encoded1).toEqual(encoded2) + expect(decoded1).toEqual(decoded2) + + // Check the default value + expect(decoded1.city).toEqual('Stockholm') + + // Check the value in the field defined in imported schema + expect(decoded1.referenced.something).toEqual('imported-schema') }) it('decodes', async () => { diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 345e664..8e55855 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -132,14 +132,23 @@ export default class SchemaRegistry { const { compatibility, separator } = opts const confluentSchema: ConfluentSchema = this.getConfluentSchema(schema) - const helper = helperTypeFromSchemaType(confluentSchema.type) - const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) - helper.validate(schemaInstance) // If this schema depends on other schemas, we need to make sure those are registered first. // Later, we pass their IDs to Schema Registry as our 'references'. const references = await this.registerReferences(opts, helper, confluentSchema) + const referenceSchemas = await Promise.all( + references.map(async reference => { + return this.getSchemaForSubject(reference.subject, reference.version) + }), + ) + + const schemaInstance = schemaFromConfluentSchema( + confluentSchema, + this.options, + referenceSchemas, + ) + helper.validate(schemaInstance) let subject: ConfluentSubject if (userOpts?.subject) { @@ -185,6 +194,15 @@ export default class SchemaRegistry { return registeredSchema } + public async getSchemaForSubject(subject: string, version: number): Promise { + const response = await this.api.Subject.version({ + subject, + version, + }) + const schema: { id: number } = response.data() + return await this.getSchema(schema.id) + } + public async getSchema(registryId: number): Promise { const schema = this.cache.getSchema(registryId) @@ -193,7 +211,11 @@ export default class SchemaRegistry { } const response = await this.getSchemaOriginRequest(registryId) - const foundSchema: { schema: string; schemaType: string } = response.data() + const foundSchema: { + schema: string + schemaType: string + references?: { name: string; subject: string; version: number }[] + } = response.data() const rawSchema = foundSchema.schema const schemaType = schemaTypeFromString(foundSchema.schemaType) @@ -201,11 +223,23 @@ export default class SchemaRegistry { throw new ConfluentSchemaRegistryError(`Unknown schema type ${foundSchema.schemaType}`) } + const referenceSchemas = await Promise.all( + (foundSchema.references || []).map(async reference => { + return this.getSchemaForSubject(reference.subject, reference.version) + }), + ) + const confluentSchema: ConfluentSchema = { type: schemaType, schema: rawSchema, } - const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options) + + const schemaInstance = schemaFromConfluentSchema( + confluentSchema, + this.options, + referenceSchemas, + ) + return this.cache.setSchema(registryId, schemaInstance) } diff --git a/src/schemaTypeResolver.ts b/src/schemaTypeResolver.ts index 1661690..dc44456 100644 --- a/src/schemaTypeResolver.ts +++ b/src/schemaTypeResolver.ts @@ -65,6 +65,7 @@ export const helperTypeFromSchemaType = ( export const schemaFromConfluentSchema = ( confluentSchema: ConfluentSchema, options?: SchemaRegistryAPIClientOptions, + referenceSchemas?: Schema[], ): Schema | AvroSchema => { try { let schema: Schema @@ -87,7 +88,7 @@ export const schemaFromConfluentSchema = ( } case SchemaType.PROTOBUF: { const opts: ProtoOptions | undefined = (options as ProtocolOptions)?.[SchemaType.PROTOBUF] - schema = new ProtoSchema(confluentSchema, opts) + schema = new ProtoSchema(confluentSchema, opts, referenceSchemas) break } default: From 06b0a13db48c91992436d8f49d39c2ed1345d5c7 Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Fri, 9 Jul 2021 10:45:23 +0200 Subject: [PATCH 4/9] Fix outdated doc string --- src/ProtoHelper.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/ProtoHelper.ts b/src/ProtoHelper.ts index d2c5004..8e2089e 100644 --- a/src/ProtoHelper.ts +++ b/src/ProtoHelper.ts @@ -23,11 +23,8 @@ export default class ProtoHelper implements SchemaHelper { * Get the schemas referenced by the provided schema. * * @param schema The schema to find references for - * @param fetchSchema A helper function that can fetch the schema definition given - * the import path (reference name) * - * @returns A map from Schema Registry subject to ConfluentSchema. The map describes - * each imported/referenced schema that is used by the given schema. + * @returns A list of imported/referenced schemas that is used by the given schema. */ public async referencedSchemas(schema: string): Promise { const parsed = parse(schema) From 4ddb0368890dd996dd2e6c9340bdb99a078fd89a Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Fri, 9 Jul 2021 10:46:18 +0200 Subject: [PATCH 5/9] Remove debug log --- src/SchemaRegistry.newApi.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index f70383c..7a9182b 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -591,8 +591,6 @@ describe('SchemaRegistry - new Api', () => { } async function fetchSchema(referenceName: string): Promise { - // eslint-disable-next-line no-console - console.log(`fetching schema for: ${referenceName}`) return { type: SchemaType.PROTOBUF, schema: protoReferencedMessage } } From 68cc9eb8dc2a9c883df6583b48c0975c1661b9fe Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Fri, 9 Jul 2021 11:55:53 +0200 Subject: [PATCH 6/9] Extend import test with an extra layer --- src/SchemaRegistry.newApi.spec.ts | 39 +++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index 7a9182b..28d8101 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -465,21 +465,30 @@ describe('SchemaRegistry - new Api', () => { } ` - const protoImportsV3 = ` + const protoContact = ` syntax = "proto3"; package com.org.domain.fixtures; - import "referenced.proto"; - message AnotherPerson { + message ContactMessage { + string email = 1; + } + ` + + const protoEmployee = ` + syntax = "proto3"; + package com.org.domain.fixtures; + import "Contact.proto"; + message Employee { string city = 1 [default = "Stockholm"]; - ReferencedMessage referenced = 2; + ContactMessage contact = 2; } ` - const protoReferencedMessage = ` + const protoCompany = ` syntax = "proto3"; package com.org.domain.fixtures; - message ReferencedMessage { - string something = 1; + import "Employee.proto"; + message Company { + Employee employee = 1; } ` @@ -587,11 +596,17 @@ describe('SchemaRegistry - new Api', () => { it('register and encode protocol buffer v3 schema with import', async () => { const confluentSchemaV3: ConfluentSchema = { type, - schema: protoImportsV3, + schema: protoCompany, } async function fetchSchema(referenceName: string): Promise { - return { type: SchemaType.PROTOBUF, schema: protoReferencedMessage } + if (referenceName === 'Contact.proto') { + return { type: SchemaType.PROTOBUF, schema: protoContact } + } + if (referenceName === 'Employee.proto') { + return { type: SchemaType.PROTOBUF, schema: protoEmployee } + } + throw `unknown reference ${referenceName}` } const schema1 = await schemaRegistry.register(confluentSchemaV3, { @@ -600,7 +615,7 @@ describe('SchemaRegistry - new Api', () => { }) // Check that we can encode with the cached version from the register() call - const payload = { referenced: { something: 'imported-schema' } } + const payload = { employee: { contact: { email: 'example@example.com' } } } const encoded1 = await schemaRegistry.encode(schema1.id, payload) const decoded1 = await schemaRegistry.decode(encoded1) @@ -614,10 +629,10 @@ describe('SchemaRegistry - new Api', () => { expect(decoded1).toEqual(decoded2) // Check the default value - expect(decoded1.city).toEqual('Stockholm') + expect(decoded1.employee.city).toEqual('Stockholm') // Check the value in the field defined in imported schema - expect(decoded1.referenced.something).toEqual('imported-schema') + expect(decoded1.employee.contact.email).toEqual('example@example.com') }) it('decodes', async () => { From a47dd570420dc67c78edd0722c5d2449a8fad775 Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Mon, 12 Jul 2021 10:21:03 +0200 Subject: [PATCH 7/9] Remove duplicate method and use existing instead --- src/SchemaRegistry.ts | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/src/SchemaRegistry.ts b/src/SchemaRegistry.ts index 8e55855..c518611 100644 --- a/src/SchemaRegistry.ts +++ b/src/SchemaRegistry.ts @@ -139,7 +139,7 @@ export default class SchemaRegistry { const references = await this.registerReferences(opts, helper, confluentSchema) const referenceSchemas = await Promise.all( references.map(async reference => { - return this.getSchemaForSubject(reference.subject, reference.version) + return this.getSchema(await this.getRegistryId(reference.subject, reference.version)) }), ) @@ -194,15 +194,6 @@ export default class SchemaRegistry { return registeredSchema } - public async getSchemaForSubject(subject: string, version: number): Promise { - const response = await this.api.Subject.version({ - subject, - version, - }) - const schema: { id: number } = response.data() - return await this.getSchema(schema.id) - } - public async getSchema(registryId: number): Promise { const schema = this.cache.getSchema(registryId) @@ -225,7 +216,7 @@ export default class SchemaRegistry { const referenceSchemas = await Promise.all( (foundSchema.references || []).map(async reference => { - return this.getSchemaForSubject(reference.subject, reference.version) + return this.getSchema(await this.getRegistryId(reference.subject, reference.version)) }), ) From 6ca2f0db568bd35eb1d19a85d64dd9f8e2d976de Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Tue, 31 Aug 2021 15:33:01 +0200 Subject: [PATCH 8/9] test commit From d7cb8f6821818984f45484862696cd66d041dee0 Mon Sep 17 00:00:00 2001 From: Johan Brinch Date: Fri, 3 Sep 2021 14:32:50 +0200 Subject: [PATCH 9/9] Avoid polluting imported schema cache entry --- src/ProtoSchema.ts | 5 ++- src/SchemaRegistry.newApi.spec.ts | 70 ++++++++++++++++++++++++++++++- 2 files changed, 72 insertions(+), 3 deletions(-) diff --git a/src/ProtoSchema.ts b/src/ProtoSchema.ts index 944426c..2aa64f8 100644 --- a/src/ProtoSchema.ts +++ b/src/ProtoSchema.ts @@ -17,7 +17,10 @@ export default class ProtoSchema implements Schema { if (references) { const schemas = references as ProtoSchema[] schemas.forEach(reference => { - this.root.add(reference.root) + // root.add() takes ownership over its input argument. + // add() can modify and extend the input schema, so we have to clone the reference to avoid polluting it. + const copy = reference.root.toJSON().nested + this.root.addJSON(copy) }) } diff --git a/src/SchemaRegistry.newApi.spec.ts b/src/SchemaRegistry.newApi.spec.ts index 28d8101..b94df02 100644 --- a/src/SchemaRegistry.newApi.spec.ts +++ b/src/SchemaRegistry.newApi.spec.ts @@ -483,14 +483,24 @@ describe('SchemaRegistry - new Api', () => { } ` + const protoOffice = ` + syntax = "proto3"; + package com.org.domain.fixtures; + message Office { + string address = 1; + } + ` + const protoCompany = ` syntax = "proto3"; package com.org.domain.fixtures; import "Employee.proto"; + import "Office.proto"; message Company { Employee employee = 1; + Office office = 2; } - ` + ` beforeAll(() => { schemaRegistry = new SchemaRegistry(schemaRegistryArgs, v3Opts) @@ -606,6 +616,9 @@ describe('SchemaRegistry - new Api', () => { if (referenceName === 'Employee.proto') { return { type: SchemaType.PROTOBUF, schema: protoEmployee } } + if (referenceName === 'Office.proto') { + return { type: SchemaType.PROTOBUF, schema: protoOffice } + } throw `unknown reference ${referenceName}` } @@ -615,7 +628,11 @@ describe('SchemaRegistry - new Api', () => { }) // Check that we can encode with the cached version from the register() call - const payload = { employee: { contact: { email: 'example@example.com' } } } + const payload = { + employee: { contact: { email: 'example@example.com' } }, + office: { address: 'Stockholm' }, + } + const encoded1 = await schemaRegistry.encode(schema1.id, payload) const decoded1 = await schemaRegistry.decode(encoded1) @@ -635,6 +652,55 @@ describe('SchemaRegistry - new Api', () => { expect(decoded1.employee.contact.email).toEqual('example@example.com') }) + it('no in-place changes to cached entry of imported schema', async () => { + const officeSchema: ConfluentSchema = { + type, + schema: protoOffice, + } + + const companySchema: ConfluentSchema = { + type, + schema: protoCompany, + } + + async function fetchSchema(referenceName: string): Promise { + if (referenceName === 'Contact.proto') { + return { type: SchemaType.PROTOBUF, schema: protoContact } + } + if (referenceName === 'Employee.proto') { + return { type: SchemaType.PROTOBUF, schema: protoEmployee } + } + if (referenceName === 'Office.proto') { + return { type: SchemaType.PROTOBUF, schema: protoOffice } + } + throw `unknown reference ${referenceName}` + } + + // Register a schema with no dependencies and store it before registering anything else. + const registeredOfficeSchemaId = ( + await schemaRegistry.register(officeSchema, { + subject: `${type}_test_officeSchema-value`, + }) + ).id + + const registeredOfficeSchemaBefore = JSON.stringify( + await schemaRegistry.getSchema(registeredOfficeSchemaId), + ) + + // Register a new schema that imports our already registered Office schema. + await schemaRegistry.register(companySchema, { + subject: `${type}_test_companySchema-value`, + fetchSchema, + }) + + // Check that registering this new schema did not modify the original imported schema. + const registeredOfficeSchemaAfter = JSON.stringify( + await schemaRegistry.getSchema(registeredOfficeSchemaId), + ) + + expect(registeredOfficeSchemaBefore).toEqual(registeredOfficeSchemaAfter) + }) + it('decodes', async () => { const confluentSchemaV4: ConfluentSchema = { type,