Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for Protocol Buffer references (imports) #135

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/avdlToAVSC.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/sh

avdl_path=$1
avsc_name=$2

Expand Down
1 change: 1 addition & 0 deletions src/@types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export enum SchemaType {
export interface SchemaHelper {
validate(schema: Schema): void
getSubject(confluentSchema: ConfluentSchema, schema: Schema, separator: string): ConfluentSubject
referencedSchemas(schema: string): Promise<string[]>
}

export type AvroOptions = Partial<ForSchemaOptions>
Expand Down
7 changes: 7 additions & 0 deletions src/AvroHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ export default class AvroHelper implements SchemaHelper {
}
return subject
}

public async referencedSchemas(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_schema: string,
): Promise<string[]> {
return []
}
}
7 changes: 7 additions & 0 deletions src/JsonHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,11 @@ export default class JsonHelper implements SchemaHelper {
): ConfluentSubject {
throw new ConfluentSchemaRegistryError('not implemented yet')
}

public async referencedSchemas(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_schema: string,
): Promise<string[]> {
return []
}
}
27 changes: 22 additions & 5 deletions src/ProtoHelper.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
// @ts-nocheck
import { Schema, SchemaHelper, ConfluentSubject, ConfluentSchema } from './@types'
import { ConfluentSchemaRegistryError } from './errors'
import { parse } from 'protobufjs'

export default class ProtoHelper implements SchemaHelper {
public validate(schema: Schema): void {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
public validate(_schema: Schema): void {
return
}

public getSubject(
confluentSchema: ConfluentSchema,
schema: Schema,
separator: string,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_confluentSchema: ConfluentSchema,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_schema: Schema,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_separator: string,
): ConfluentSubject {
throw new ConfluentSchemaRegistryError('not implemented yet')
}

/**
* Get the schemas referenced by the provided schema.
*
* @param schema The schema to find references for
*
* @returns A list of imported/referenced schemas that is used by the given schema.
*/
public async referencedSchemas(schema: string): Promise<string[]> {
const parsed = parse(schema)
const out: string[] = []
return out.concat(parsed.imports || []).concat(parsed.weakImports || [])
}
}
15 changes: 12 additions & 3 deletions src/ProtoSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,20 @@ import {

export default class ProtoSchema implements Schema {
private message: Type
private root: protobuf.Root

constructor(schema: ConfluentSchema, opts?: ProtoOptions) {
constructor(schema: ConfluentSchema, opts?: ProtoOptions, references?: Schema[]) {
const parsedMessage = protobuf.parse(schema.schema)
const root = parsedMessage.root
this.message = root.lookupType(this.getTypeName(parsedMessage, opts))
this.root = parsedMessage.root

if (references) {
const schemas = references as ProtoSchema[]
schemas.forEach(reference => {
this.root.add(reference.root)
brinchj marked this conversation as resolved.
Show resolved Hide resolved
})
}

this.message = this.root.lookupType(this.getTypeName(parsedMessage, opts))
}

private getNestedTypeName(parent: { [k: string]: ReflectionObject } | undefined): string {
Expand Down
91 changes: 91 additions & 0 deletions src/SchemaRegistry.newApi.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import encodedAnotherPersonV2Json from '../fixtures/json/encodedAnotherPersonV2'
import encodedAnotherPersonV2Proto from '../fixtures/proto/encodedAnotherPersonV2'
import encodedNestedV2Proto from '../fixtures/proto/encodedNestedV2'
import wrongMagicByte from '../fixtures/wrongMagicByte'
import ProtoSchema from './ProtoSchema'

const REGISTRY_HOST = 'http://localhost:8982'
const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST }
Expand Down Expand Up @@ -456,6 +457,41 @@ describe('SchemaRegistry - new Api', () => {
v3Opts = { [SchemaType.PROTOBUF]: { messageName: 'AnotherPerson' } },
type = SchemaType.PROTOBUF

const protoV3 = `
syntax = "proto3";
package com.org.domain.fixtures;
message AnotherPerson {
string city = 2 [default = "Stockholm"];
}
`

const protoContact = `
syntax = "proto3";
package com.org.domain.fixtures;
message ContactMessage {
string email = 1;
}
`

const protoEmployee = `
syntax = "proto3";
package com.org.domain.fixtures;
import "Contact.proto";
message Employee {
string city = 1 [default = "Stockholm"];
ContactMessage contact = 2;
}
`

const protoCompany = `
syntax = "proto3";
package com.org.domain.fixtures;
import "Employee.proto";
message Company {
Employee employee = 1;
}
`

beforeAll(() => {
schemaRegistry = new SchemaRegistry(schemaRegistryArgs, v3Opts)
})
Expand Down Expand Up @@ -544,6 +580,61 @@ describe('SchemaRegistry - new Api', () => {
})
})

it('register and encode protocol buffer v3 schema', async () => {
const confluentSchemaV3: ConfluentSchema = {
type,
schema: protoV3,
}

const schema1 = await schemaRegistry.register(confluentSchemaV3, {
subject: `${type}_test_protoV3-value`,
})

await schemaRegistry.encode(schema1.id, payload)
})

it('register and encode protocol buffer v3 schema with import', async () => {
const confluentSchemaV3: ConfluentSchema = {
type,
schema: protoCompany,
}

async function fetchSchema(referenceName: string): Promise<ConfluentSchema> {
if (referenceName === 'Contact.proto') {
return { type: SchemaType.PROTOBUF, schema: protoContact }
}
if (referenceName === 'Employee.proto') {
return { type: SchemaType.PROTOBUF, schema: protoEmployee }
}
throw `unknown reference ${referenceName}`
}

const schema1 = await schemaRegistry.register(confluentSchemaV3, {
subject: `${type}_test_protoImportsV3-value`,
fetchSchema,
})

// Check that we can encode with the cached version from the register() call
const payload = { employee: { contact: { email: '[email protected]' } } }
const encoded1 = await schemaRegistry.encode(schema1.id, payload)
const decoded1 = await schemaRegistry.decode(encoded1)

// Clear the cache and try again to exercise getSchema()
schemaRegistry.cache.clear()
const encoded2 = await schemaRegistry.encode(schema1.id, payload)
schemaRegistry.cache.clear()
const decoded2 = await schemaRegistry.decode(encoded2)

expect(encoded1).toEqual(encoded2)
expect(decoded1).toEqual(decoded2)

// Check the default value
expect(decoded1.employee.city).toEqual('Stockholm')

// Check the value in the field defined in imported schema
expect(decoded1.employee.contact.email).toEqual('[email protected]')
})

it('decodes', async () => {
const confluentSchemaV4: ConfluentSchema = {
type,
Expand Down
81 changes: 76 additions & 5 deletions src/SchemaRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import {
ConfluentSubject,
SchemaRegistryAPIClientOptions,
AvroConfluentSchema,
SchemaHelper,
} from './@types'
import {
helperTypeFromSchemaType,
Expand All @@ -34,6 +35,7 @@ interface RegisteredSchema {
interface Opts {
compatibility?: COMPATIBILITY
separator?: string
fetchSchema?: (referenceName: string) => Promise<ConfluentSchema>
subject: string
}

Expand Down Expand Up @@ -81,6 +83,35 @@ export default class SchemaRegistry {
return confluentSchema
}

async registerReferences(
opts: Omit<Opts, 'subject'> & { subject?: string },
helper: SchemaHelper,
confluentSchema: ConfluentSchema,
): Promise<{ subject: string; name: string; version: number }[]> {
const fetchSchema = opts.fetchSchema
if (!fetchSchema) {
return []
}

const subjects = await helper.referencedSchemas(confluentSchema.schema)

return await Promise.all(
subjects.map(async subject => {
const schema = await fetchSchema(subject)
const registeredSchema = await this.register(schema, { ...opts, subject })
const response = await this.api.Schema.versions({ id: registeredSchema.id })
const allVersions: [{ subject: string; version: number }] = response.data()

const subjectVersion = allVersions
.filter(it => it.subject === subject)
.map(it => it.version)
.reduce((a, b) => Math.max(a, b))

return { subject, name: subject, version: subjectVersion }
}),
)
}

public async register(
schema: Exclude<ConfluentSchema, AvroConfluentSchema>,
userOpts: Opts,
Expand All @@ -97,12 +128,26 @@ export default class SchemaRegistry {
schema: RawAvroSchema | ConfluentSchema,
userOpts?: Opts,
): Promise<RegisteredSchema> {
const { compatibility, separator } = { ...DEFAULT_OPTS, ...userOpts }
const opts = { ...DEFAULT_OPTS, ...userOpts }
const { compatibility, separator } = opts

const confluentSchema: ConfluentSchema = this.getConfluentSchema(schema)

const helper = helperTypeFromSchemaType(confluentSchema.type)
const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options)

// If this schema depends on other schemas, we need to make sure those are registered first.
// Later, we pass their IDs to Schema Registry as our 'references'.
const references = await this.registerReferences(opts, helper, confluentSchema)
const referenceSchemas = await Promise.all(
references.map(async reference => {
return this.getSchemaForSubject(reference.subject, reference.version)
}),
)

const schemaInstance = schemaFromConfluentSchema(
confluentSchema,
this.options,
referenceSchemas,
)
helper.validate(schemaInstance)

let subject: ConfluentSubject
Expand Down Expand Up @@ -138,6 +183,7 @@ export default class SchemaRegistry {
body: {
schemaType: confluentSchema.type,
schema: confluentSchema.schema,
references,
},
})

Expand All @@ -148,6 +194,15 @@ export default class SchemaRegistry {
return registeredSchema
}

public async getSchemaForSubject(subject: string, version: number): Promise<Schema | AvroSchema> {
const response = await this.api.Subject.version({
subject,
version,
})
const schema: { id: number } = response.data()
return await this.getSchema(schema.id)
}

public async getSchema(registryId: number): Promise<Schema | AvroSchema> {
const schema = this.cache.getSchema(registryId)

Expand All @@ -156,19 +211,35 @@ export default class SchemaRegistry {
}

const response = await this.getSchemaOriginRequest(registryId)
const foundSchema: { schema: string; schemaType: string } = response.data()
const foundSchema: {
schema: string
schemaType: string
references?: { name: string; subject: string; version: number }[]
} = response.data()
const rawSchema = foundSchema.schema
const schemaType = schemaTypeFromString(foundSchema.schemaType)

if (schemaType === SchemaType.UNKNOWN) {
throw new ConfluentSchemaRegistryError(`Unknown schema type ${foundSchema.schemaType}`)
}

const referenceSchemas = await Promise.all(
(foundSchema.references || []).map(async reference => {
return this.getSchemaForSubject(reference.subject, reference.version)
}),
)

const confluentSchema: ConfluentSchema = {
type: schemaType,
schema: rawSchema,
}
const schemaInstance = schemaFromConfluentSchema(confluentSchema, this.options)

const schemaInstance = schemaFromConfluentSchema(
confluentSchema,
this.options,
referenceSchemas,
)

return this.cache.setSchema(registryId, schemaInstance)
}

Expand Down
5 changes: 5 additions & 0 deletions src/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export interface SchemaRegistryAPIClientArgs {
export type SchemaRegistryAPIClient = Client<{
Schema: {
find: (_: any) => any
versions: (_: any) => any
}
Subject: {
all: (_: any) => any
Expand Down Expand Up @@ -65,6 +66,10 @@ export default ({
method: 'get',
path: '/schemas/ids/{id}',
},
versions: {
method: 'get',
path: '/schemas/ids/{id}/versions',
},
},
Subject: {
all: {
Expand Down
3 changes: 2 additions & 1 deletion src/schemaTypeResolver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ export const helperTypeFromSchemaType = (
export const schemaFromConfluentSchema = (
confluentSchema: ConfluentSchema,
options?: SchemaRegistryAPIClientOptions,
referenceSchemas?: Schema[],
): Schema | AvroSchema => {
try {
let schema: Schema
Expand All @@ -87,7 +88,7 @@ export const schemaFromConfluentSchema = (
}
case SchemaType.PROTOBUF: {
const opts: ProtoOptions | undefined = (options as ProtocolOptions)?.[SchemaType.PROTOBUF]
schema = new ProtoSchema(confluentSchema, opts)
schema = new ProtoSchema(confluentSchema, opts, referenceSchemas)
break
}
default:
Expand Down