Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement support for Protocol Buffer references (imports) #135

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions bin/avdlToAVSC.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#!/bin/sh

avdl_path=$1
avsc_name=$2

Expand Down
1 change: 1 addition & 0 deletions src/@types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export enum SchemaType {
export interface SchemaHelper {
validate(schema: Schema): void
getSubject(confluentSchema: ConfluentSchema, schema: Schema, separator: string): ConfluentSubject
referencedSchemas(schema: string): Promise<string[]>
}

export type AvroOptions = Partial<ForSchemaOptions>
Expand Down
7 changes: 7 additions & 0 deletions src/AvroHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,11 @@ export default class AvroHelper implements SchemaHelper {
}
return subject
}

public async referencedSchemas(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_schema: string,
): Promise<string[]> {
return []
}
}
7 changes: 7 additions & 0 deletions src/JsonHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,11 @@ export default class JsonHelper implements SchemaHelper {
): ConfluentSubject {
throw new ConfluentSchemaRegistryError('not implemented yet')
}

public async referencedSchemas(
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_schema: string,
): Promise<string[]> {
return []
}
}
27 changes: 22 additions & 5 deletions src/ProtoHelper.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,34 @@
// @ts-nocheck
import { Schema, SchemaHelper, ConfluentSubject, ConfluentSchema } from './@types'
import { ConfluentSchemaRegistryError } from './errors'
import { parse } from 'protobufjs'

export default class ProtoHelper implements SchemaHelper {
public validate(schema: Schema): void {
// eslint-disable-next-line @typescript-eslint/no-unused-vars
public validate(_schema: Schema): void {
return
}

public getSubject(
confluentSchema: ConfluentSchema,
schema: Schema,
separator: string,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_confluentSchema: ConfluentSchema,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_schema: Schema,
// eslint-disable-next-line @typescript-eslint/no-unused-vars
_separator: string,
): ConfluentSubject {
throw new ConfluentSchemaRegistryError('not implemented yet')
}

/**
* Get the schemas referenced by the provided schema.
*
* @param schema The schema to find references for
*
* @returns A list of imported/referenced schemas that is used by the given schema.
*/
public async referencedSchemas(schema: string): Promise<string[]> {
const parsed = parse(schema)
const out: string[] = []
return out.concat(parsed.imports || []).concat(parsed.weakImports || [])
}
}
18 changes: 15 additions & 3 deletions src/ProtoSchema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,23 @@ import {

export default class ProtoSchema implements Schema {
private message: Type
private root: protobuf.Root

constructor(schema: ConfluentSchema, opts?: ProtoOptions) {
constructor(schema: ConfluentSchema, opts?: ProtoOptions, references?: Schema[]) {
const parsedMessage = protobuf.parse(schema.schema)
const root = parsedMessage.root
this.message = root.lookupType(this.getTypeName(parsedMessage, opts))
this.root = parsedMessage.root

if (references) {
const schemas = references as ProtoSchema[]
schemas.forEach(reference => {
// root.add() takes ownership over its input argument.
// add() can modify and extend the input schema, so we have to clone the reference to avoid polluting it.
const copy = reference.root.toJSON().nested
this.root.addJSON(copy)
})
}

this.message = this.root.lookupType(this.getTypeName(parsedMessage, opts))
}

private getNestedTypeName(parent: { [k: string]: ReflectionObject } | undefined): string {
Expand Down
157 changes: 157 additions & 0 deletions src/SchemaRegistry.newApi.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import encodedAnotherPersonV2Json from '../fixtures/json/encodedAnotherPersonV2'
import encodedAnotherPersonV2Proto from '../fixtures/proto/encodedAnotherPersonV2'
import encodedNestedV2Proto from '../fixtures/proto/encodedNestedV2'
import wrongMagicByte from '../fixtures/wrongMagicByte'
import ProtoSchema from './ProtoSchema'

const REGISTRY_HOST = 'http://localhost:8982'
const schemaRegistryAPIClientArgs = { host: REGISTRY_HOST }
Expand Down Expand Up @@ -456,6 +457,51 @@ describe('SchemaRegistry - new Api', () => {
v3Opts = { [SchemaType.PROTOBUF]: { messageName: 'AnotherPerson' } },
type = SchemaType.PROTOBUF

const protoV3 = `
syntax = "proto3";
package com.org.domain.fixtures;
message AnotherPerson {
string city = 2 [default = "Stockholm"];
}
`

const protoContact = `
syntax = "proto3";
package com.org.domain.fixtures;
message ContactMessage {
string email = 1;
}
`

const protoEmployee = `
syntax = "proto3";
package com.org.domain.fixtures;
import "Contact.proto";
message Employee {
string city = 1 [default = "Stockholm"];
ContactMessage contact = 2;
}
`

const protoOffice = `
syntax = "proto3";
package com.org.domain.fixtures;
message Office {
string address = 1;
}
`

const protoCompany = `
syntax = "proto3";
package com.org.domain.fixtures;
import "Employee.proto";
import "Office.proto";
message Company {
Employee employee = 1;
Office office = 2;
}
`

beforeAll(() => {
schemaRegistry = new SchemaRegistry(schemaRegistryArgs, v3Opts)
})
Expand Down Expand Up @@ -544,6 +590,117 @@ describe('SchemaRegistry - new Api', () => {
})
})

it('register and encode protocol buffer v3 schema', async () => {
const confluentSchemaV3: ConfluentSchema = {
type,
schema: protoV3,
}

const schema1 = await schemaRegistry.register(confluentSchemaV3, {
subject: `${type}_test_protoV3-value`,
})

await schemaRegistry.encode(schema1.id, payload)
})

it('register and encode protocol buffer v3 schema with import', async () => {
const confluentSchemaV3: ConfluentSchema = {
type,
schema: protoCompany,
}

async function fetchSchema(referenceName: string): Promise<ConfluentSchema> {
if (referenceName === 'Contact.proto') {
return { type: SchemaType.PROTOBUF, schema: protoContact }
}
if (referenceName === 'Employee.proto') {
return { type: SchemaType.PROTOBUF, schema: protoEmployee }
}
if (referenceName === 'Office.proto') {
return { type: SchemaType.PROTOBUF, schema: protoOffice }
}
throw `unknown reference ${referenceName}`
}

const schema1 = await schemaRegistry.register(confluentSchemaV3, {
subject: `${type}_test_protoImportsV3-value`,
fetchSchema,
})

// Check that we can encode with the cached version from the register() call
const payload = {
employee: { contact: { email: '[email protected]' } },
office: { address: 'Stockholm' },
}

const encoded1 = await schemaRegistry.encode(schema1.id, payload)
const decoded1 = await schemaRegistry.decode(encoded1)

// Clear the cache and try again to exercise getSchema()
schemaRegistry.cache.clear()
const encoded2 = await schemaRegistry.encode(schema1.id, payload)
schemaRegistry.cache.clear()
const decoded2 = await schemaRegistry.decode(encoded2)

expect(encoded1).toEqual(encoded2)
expect(decoded1).toEqual(decoded2)

// Check the default value
expect(decoded1.employee.city).toEqual('Stockholm')

// Check the value in the field defined in imported schema
expect(decoded1.employee.contact.email).toEqual('[email protected]')
})

it('no in-place changes to cached entry of imported schema', async () => {
const officeSchema: ConfluentSchema = {
type,
schema: protoOffice,
}

const companySchema: ConfluentSchema = {
type,
schema: protoCompany,
}

async function fetchSchema(referenceName: string): Promise<ConfluentSchema> {
if (referenceName === 'Contact.proto') {
return { type: SchemaType.PROTOBUF, schema: protoContact }
}
if (referenceName === 'Employee.proto') {
return { type: SchemaType.PROTOBUF, schema: protoEmployee }
}
if (referenceName === 'Office.proto') {
return { type: SchemaType.PROTOBUF, schema: protoOffice }
}
throw `unknown reference ${referenceName}`
}

// Register a schema with no dependencies and store it before registering anything else.
const registeredOfficeSchemaId = (
await schemaRegistry.register(officeSchema, {
subject: `${type}_test_officeSchema-value`,
})
).id

const registeredOfficeSchemaBefore = JSON.stringify(
await schemaRegistry.getSchema(registeredOfficeSchemaId),
)

// Register a new schema that imports our already registered Office schema.
await schemaRegistry.register(companySchema, {
subject: `${type}_test_companySchema-value`,
fetchSchema,
})

// Check that registering this new schema did not modify the original imported schema.
const registeredOfficeSchemaAfter = JSON.stringify(
await schemaRegistry.getSchema(registeredOfficeSchemaId),
)

expect(registeredOfficeSchemaBefore).toEqual(registeredOfficeSchemaAfter)
})

it('decodes', async () => {
const confluentSchemaV4: ConfluentSchema = {
type,
Expand Down
Loading