Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support schema references for Avro, Protocol Buffer, and JSON schema #197

Merged
merged 38 commits into from
Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from 36 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ff6a6d7
multi-references-for-protobuf
Mar 10, 2022
6ee47b0
cleanup
Mar 10, 2022
cbf3986
refactoring
Mar 11, 2022
15f5999
refactor
Mar 11, 2022
19a1028
add comment
Mar 11, 2022
064cfb6
bump package.json
Mar 11, 2022
3943fca
rename updateOptionsWithSchemaReferences
Mar 11, 2022
b4aefa4
make requests parallel
Mar 11, 2022
662b0ee
rename
Mar 11, 2022
4ec1311
rename
Mar 11, 2022
54c6218
rename
Mar 11, 2022
00c1aaa
remove /* eslint-disable no-console */
Mar 11, 2022
97ceaeb
add nested references for json
Mar 11, 2022
43c09a5
Add multi schema references for protobuf and json schema
Mar 14, 2022
d2bab57
add documentation for nested references
Mar 14, 2022
21d9198
Merge pull request #2 from debitoor/add-json-protobuf-documentation
Mar 14, 2022
bc01ead
add-nested-references-for-avro
Mar 15, 2022
25715b4
Merge pull request #3 from debitoor/add-nested-references-for-avro
Mar 15, 2022
50cb210
improve-doc
Mar 15, 2022
be3c063
Merge pull request #4 from debitoor/improve-doc
Mar 15, 2022
254577e
cleanup-ts
Mar 15, 2022
eaef078
Merge pull request #5 from debitoor/cleanup-ts
Mar 15, 2022
183be7f
refactor-referredSchemas
Mar 16, 2022
54fccd1
Merge pull request #6 from debitoor/refactor-referredSchemas
Mar 16, 2022
90b41e0
efactoring-and-improve-examples
Mar 17, 2022
fabb6f8
Merge pull request #7 from debitoor/refactoring-and-improve-examples
Mar 17, 2022
1b4f9a0
Merge branch 'master' into master
Mar 17, 2022
a3db4e0
enable-typeHook-for-referred-schemas
Mar 22, 2022
1a3dad2
Merge pull request #8 from debitoor/enable-typeHook-for-referred-schemas
Mar 22, 2022
8a34858
Merge remote-tracking branch 'upstream/master'
Apr 8, 2022
3c5fefe
fix-review-comments-2
Sep 30, 2022
f3c3bdf
review comments
Oct 3, 2022
d94bcc4
review comments
Oct 3, 2022
3d2e0bb
Merge pull request #9 from debitoor/fix-review-comments-2
bifrost Oct 3, 2022
d0c9ab9
minor-refactoring
Oct 4, 2022
c9406c0
Merge pull request #10 from debitoor/minor-refactoring
bifrost Oct 4, 2022
e193664
extend-ajvInstance
Oct 4, 2022
0b2d05c
Merge pull request #11 from debitoor/extend-ajvInstance
bifrost Oct 4, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions docs/schema-avro.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
---
id: schema-avro
title: Example Avro Schemas
sidebar_label: Example Avro Schemas
---

## Schema with references to other schemas

You might want to split the Avro definition into several schemas one for each type.

```json
{
"type" : "record",
"namespace" : "test",
"name" : "A",
"fields" : [
{ "name" : "id" , "type" : "int" },
{ "name" : "b" , "type" : "test.B" }
]
}
```

```json
{
"type" : "record",
"namespace" : "test",
"name" : "B",
"fields" : [
{ "name" : "id" , "type" : "int" }
]
}
```

To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:

* `name` - the fully qualified name of the referenced schema. Example: `test.B`
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

The library will handle an arbitrary number of nested levels.

```js
const schemaA = {
type: 'record',
namespace: 'test',
name: 'A',
fields: [
{ name: 'id', type: 'int' },
{ name: 'b', type: 'test.B' },
],
}

const schemaB = {
type: 'record',
namespace: 'test',
name: 'B',
fields: [{ name: 'id', type: 'int' }],
}

await schemaRegistry.register(
{ type: SchemaType.AVRO, schema: JSON.stringify(schemaB) },
{ subject: 'Avro:B' },
)

const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Avro:B' })
const { version } = JSON.parse(response.responseData)

const { id } = await schemaRegistry.register(
{
type: SchemaType.AVRO,
schema: JSON.stringify(schemaA),
references: [
{
name: 'test.B',
subject: 'Avro:B',
version,
},
],
},
{ subject: 'Avro:A' },
)

const obj = { id: 1, b: { id: 2 } }

const buffer = await schemaRegistry.encode(id, obj)
const decodedObj = await schemaRegistry.decode(buffer)
```
85 changes: 85 additions & 0 deletions docs/schema-json.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
id: schema-json
title: Example JSON Schemas
sidebar_label: Example JSON Schemas
---

## Schema with references to other schemas

You might want to split the JSON definition into several schemas one for each type.

```JSON
{
"$id": "https://example.com/schemas/A",
"type": "object",
"properties": {
"id": { "type": "number" },
"b": { "$ref": "https://example.com/schemas/B" }
}
}
```

```JSON
{
"$id": "https://example.com/schemas/B",
"type": "object",
"properties": {
"id": { "type": "number" }
}
}
```

To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:

* `name` - A URL matching the `$ref` from the schema
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

The library will handle an arbitrary number of nested levels.

```js
const schemaA = {
$id: 'https://example.com/schemas/A',
type: 'object',
properties: {
id: { type: 'number' },
b: { $ref: 'https://example.com/schemas/B' },
},
}

const schemaB = {
$id: 'https://example.com/schemas/B',
type: 'object',
properties: {
id: { type: 'number' },
},
}

await schemaRegistry.register(
{ type: SchemaType.JSON, schema: JSON.stringify(schemaB) },
{ subject: 'JSON:B' },
)

const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'JSON:B' })
const { version } = JSON.parse(response.responseData)

const { id } = await schemaRegistry.register(
{
type: SchemaType.JSON,
schema: JSON.stringify(schemaA),
references: [
{
name: 'https://example.com/schemas/B',
subject: 'JSON:B',
version,
},
],
},
{ subject: 'JSON:A' },
)

const obj = { id: 1, b: { id: 2 } }

const buffer = await schemaRegistry.encode(id, obj)
const decodedObj = await schemaRegistry.decode(buffer)
```
85 changes: 85 additions & 0 deletions docs/schema-protobuf.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
---
id: schema-protobuf
title: Example Protobuf Schemas
sidebar_label: Example Protobuf Schemas
---

## Schema with references to other schemas

You might want to split the Protobuf definition into several schemas, one for each type.

```protobuf
syntax = "proto3";
package test;
import "test/B.proto";

message A {
int32 id = 1;
B b = 2;
}
```

```protobuf
syntax = "proto3";
package test;

message B {
int32 id = 1;
}
```

To register schemas with references, the schemas have to be registered in reverse order. The schema that references another schema has to be registered after the schema it references. In this example B has to be registered before A. Furthermore, when registering A, a list of references have to be provided. A reference consist of:

* `name` - String matching the import statement. For example: `test/B.proto`
* `subject` - the subject the schema is registered under in the registry
* `version` - the version of the schema you want to use

The library will handle an arbitrary number of nested levels.

```js
const schemaA = `
syntax = "proto3";
package test;
import "test/B.proto";

message A {
int32 id = 1;
B b = 2;
}`

const schemaB = `
syntax = "proto3";
package test;

message B {
int32 id = 1;
}`

await schemaRegistry.register(
{ type: SchemaType.PROTOBUF, schema: schemaB },
{ subject: 'Proto:B' },
)

const response = await schemaRegistry.api.Subject.latestVersion({ subject: 'Proto:B' })
const { version } = JSON.parse(response.responseData)

const { id } = await schemaRegistry.register(
{
type: SchemaType.PROTOBUF,
schema: schemaA,
references: [
{
name: 'test/B.proto',
subject: 'Proto:B',
version,
},
],
},
{ subject: 'Proto:A' },
)

const obj = { id: 1, b: { id: 2 } }

const buffer = await schemaRegistry.encode(id, obj)
const decodedObj = await schemaRegistry.decode(buffer)
```
27 changes: 23 additions & 4 deletions src/@types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,26 @@ export enum SchemaType {
PROTOBUF = 'PROTOBUF',
UNKNOWN = 'UNKNOWN',
}

export interface SchemaHelper {
validate(schema: Schema): void
getSubject(confluentSchema: ConfluentSchema, schema: Schema, separator: string): ConfluentSubject
toConfluentSchema(data: SchemaResponse): ConfluentSchema
updateOptionsFromSchemaReferences(
referencedSchemas: ConfluentSchema[],
options?: ProtocolOptions,
): ProtocolOptions
}

export type AvroOptions = Partial<ForSchemaOptions>
export type AvroOptions = Partial<ForSchemaOptions> & {
referencedSchemas?: AvroConfluentSchema[]
}
export type JsonOptions = ConstructorParameters<typeof Ajv>[0] & {
ajvInstance?: {
compile: (schema: any) => ValidateFunction
}
referencedSchemas?: JsonConfluentSchema[]
}
export type ProtoOptions = { messageName: string }
export type ProtoOptions = { messageName?: string; referencedSchemas?: ProtoConfluentSchema[] }

export interface LegacyOptions {
forSchemaOptions?: AvroOptions
Expand Down Expand Up @@ -60,16 +67,28 @@ export interface ConfluentSubject {
export interface AvroConfluentSchema {
type: SchemaType.AVRO
schema: string | RawAvroSchema
references?: SchemaReference[]
}

export type SchemaReference = {
name: string
subject: string
version: number
}
export interface ProtoConfluentSchema {
type: SchemaType.PROTOBUF
schema: string
references?: SchemaReference[]
}

export interface JsonConfluentSchema {
type: SchemaType.JSON
schema: string
references?: SchemaReference[]
}
export interface SchemaResponse {
schema: string
schemaType: string
references?: SchemaReference[]
}

export type ConfluentSchema = AvroConfluentSchema | ProtoConfluentSchema | JsonConfluentSchema
Expand Down
41 changes: 38 additions & 3 deletions src/AvroHelper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ import {
ConfluentSchema,
SchemaHelper,
ConfluentSubject,
ProtocolOptions,
AvroConfluentSchema,
} from './@types'
import { ConfluentSchemaRegistryArgumentError } from './errors'
import avro from 'avsc'
import avro, { ForSchemaOptions, Schema, Type } from 'avsc'
import { SchemaResponse, SchemaType } from './@types'

type TypeHook = (schema: Schema, opts: ForSchemaOptions) => Type
export default class AvroHelper implements SchemaHelper {
private getRawAvroSchema(schema: ConfluentSchema): RawAvroSchema {
return (typeof schema.schema === 'string'
Expand All @@ -21,7 +25,27 @@ export default class AvroHelper implements SchemaHelper {
? schema
: this.getRawAvroSchema(schema)
// @ts-ignore TODO: Fix typings for Schema...
const avroSchema: AvroSchema = avro.Type.forSchema(rawSchema, opts)

const addReferencedSchemas = (userHook?: TypeHook): TypeHook => (
schema: avro.Schema,
opts: ForSchemaOptions,
) => {
const avroOpts = opts as AvroOptions
avroOpts?.referencedSchemas?.forEach(subSchema => {
const rawSubSchema = this.getRawAvroSchema(subSchema)
avroOpts.typeHook = userHook
avro.Type.forSchema(rawSubSchema, avroOpts)
})
if (userHook) {
return userHook(schema, opts)
}
}

const avroSchema = avro.Type.forSchema(rawSchema, {
...opts,
typeHook: addReferencedSchemas(opts?.typeHook),
})

return avroSchema
}

Expand All @@ -32,7 +56,7 @@ export default class AvroHelper implements SchemaHelper {
}

public getSubject(
schema: ConfluentSchema,
schema: AvroConfluentSchema,
// @ts-ignore
avroSchema: AvroSchema,
separator: string,
Expand All @@ -53,4 +77,15 @@ export default class AvroHelper implements SchemaHelper {
const asRawAvroSchema = schema as RawAvroSchema
return asRawAvroSchema.name != null && asRawAvroSchema.type != null
}

public toConfluentSchema(data: SchemaResponse): AvroConfluentSchema {
return { type: SchemaType.AVRO, schema: data.schema, references: data.references }
}

updateOptionsFromSchemaReferences(
referencedSchemas: AvroConfluentSchema[],
options: ProtocolOptions = {},
): ProtocolOptions {
return { ...options, [SchemaType.AVRO]: { ...options[SchemaType.AVRO], referencedSchemas } }
}
}
Loading