Skip to content

Commit 52ce610

Browse files
committed
confluent: add default_schema_id field to the schema_registry_decode processor
1 parent 327cd86 commit 52ce610

File tree

4 files changed

+107
-2
lines changed

4 files changed

+107
-2
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ All notable changes to this project will be documented in this file.
55

66
## 4.64.0 - TBD
77

8+
### Added
9+
10+
- Added `default_schema_id` field to the `schema_registry_decode` processor. (@mmatczuk)
11+
812
### Changed
913

1014
- (google_cloud_storage) Field `bucket` can now be interpolated (@rockwotj)

docs/modules/components/pages/processors/schema_registry_decode.adoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ schema_registry_decode:
4848
emit_default_values: false
4949
cache_duration: 10m
5050
url: "" # No default (required)
51+
default_schema_id: 0 # No default (optional)
5152
```
5253
5354
--
@@ -86,6 +87,7 @@ schema_registry_decode:
8687
emit_default_values: false
8788
cache_duration: 10m
8889
url: "" # No default (required)
90+
default_schema_id: 0 # No default (optional)
8991
oauth:
9092
enabled: false
9193
consumer_key: ""
@@ -338,6 +340,14 @@ The base URL of the schema registry service.
338340
*Type*: `string`
339341
340342
343+
=== `default_schema_id`
344+
345+
If set, this schema ID will be used when a message's schema header cannot be read (ErrBadHeader). If not set, schema header errors will be returned.
346+
347+
348+
*Type*: `int`
349+
350+
341351
=== `oauth`
342352
343353
Allows you to specify open authentication via OAuth version 1.

internal/impl/confluent/processor_schema_registry_decode.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,10 @@ root = this.apply("debeziumTimestampToAvroTimestamp")
179179
Description("The duration after which a schema is considered stale and will be removed from the cache.").
180180
Default("10m").Example("1h").Example("5m"),
181181
).
182-
Field(service.NewURLField("url").Description("The base URL of the schema registry service."))
182+
Field(service.NewURLField("url").Description("The base URL of the schema registry service.")).
183+
Field(service.NewIntField("default_schema_id").
184+
Description("If set, this schema ID will be used when a message's schema header cannot be read (ErrBadHeader). If not set, schema header errors will be returned.").
185+
Optional())
183186

184187
for _, f := range service.NewHTTPRequestAuthSignerFields() {
185188
spec = spec.Field(f.Version("4.7.0"))
@@ -212,6 +215,7 @@ type decodingConfig struct {
212215
emitUnpopulated bool
213216
emitDefaultValues bool
214217
}
218+
defaultSchemaID int
215219
}
216220

217221
type schemaRegistryDecoder struct {
@@ -287,6 +291,14 @@ func newSchemaRegistryDecoderFromConfig(conf *service.ParsedConfig, mgr *service
287291
if err != nil {
288292
return nil, err
289293
}
294+
295+
if conf.Contains("default_schema_id") {
296+
cfg.defaultSchemaID, err = conf.FieldInt("default_schema_id")
297+
if err != nil {
298+
return nil, err
299+
}
300+
}
301+
290302
cacheDuration, err := conf.FieldDuration("cache_duration")
291303
if err != nil {
292304
return nil, err
@@ -335,7 +347,11 @@ func (s *schemaRegistryDecoder) Process(_ context.Context, msg *service.Message)
335347

336348
var ch franz_sr.ConfluentHeader
337349
id, remaining, err := ch.DecodeID(b)
338-
if err != nil {
350+
if errors.Is(err, franz_sr.ErrBadHeader) && s.cfg.defaultSchemaID != 0 {
351+
// Use default schema ID when header cannot be read
352+
id = s.cfg.defaultSchemaID
353+
remaining = b
354+
} else if err != nil {
339355
return nil, err
340356
}
341357

internal/impl/confluent/processor_schema_registry_decode_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,81 @@ func TestSchemaRegistryDecodeProtobuf(t *testing.T) {
680680
decoder.cacheMut.Unlock()
681681
}
682682

683+
func TestSchemaRegistryDecodeWithDefaultSchemaID(t *testing.T) {
684+
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
685+
if path == "/schemas/ids/3" {
686+
return mustJBytes(t, map[string]any{
687+
"schema": testSchema,
688+
}), nil
689+
}
690+
return nil, nil
691+
})
692+
693+
tests := []struct {
694+
name string
695+
input string
696+
output string
697+
defaultID int
698+
errContains string
699+
}{
700+
{
701+
name: "error when no default schema is set",
702+
input: "\x06foo\x02\x02\x06foo\x06bar\x02\x0edancing", // Invalid header
703+
errContains: "5 byte header for value is missing or does not have 0 magic byte",
704+
},
705+
{
706+
name: "different error doesn't use default schema",
707+
input: "\x00\x00\x00\x00\x09", // Valid header but non-existent schema
708+
defaultID: 3,
709+
errContains: "schema 9 not found by registry: not found",
710+
},
711+
{
712+
name: "no header uses default schema",
713+
input: "\x06foo\x02\x02\x06foo\x06bar\x02\x0edancing", // No valid header at all
714+
output: `{"Address":{"my.namespace.com.address":{"City":{"string":"foo"},"State":"bar"}},"MaybeHobby":{"string":"dancing"},"Name":"foo"}`,
715+
defaultID: 3,
716+
},
717+
}
718+
719+
for _, test := range tests {
720+
test := test
721+
t.Run(test.name, func(t *testing.T) {
722+
cfg := decodingConfig{}
723+
cfg.avro.rawUnions = false
724+
if test.defaultID != 0 {
725+
cfg.defaultSchemaID = test.defaultID
726+
}
727+
728+
decoder, err := newSchemaRegistryDecoder(urlStr, noopReqSign, nil, cfg, schemaStaleAfter, service.MockResources())
729+
require.NoError(t, err)
730+
defer func() {
731+
require.NoError(t, decoder.Close(t.Context()))
732+
}()
733+
734+
outMsgs, err := decoder.Process(t.Context(), service.NewMessage([]byte(test.input)))
735+
if test.errContains != "" {
736+
require.Error(t, err)
737+
assert.Contains(t, err.Error(), test.errContains)
738+
} else {
739+
require.NoError(t, err)
740+
require.Len(t, outMsgs, 1)
741+
742+
b, err := outMsgs[0].AsBytes()
743+
require.NoError(t, err)
744+
745+
jdopts := jsondiff.DefaultJSONOptions()
746+
diff, explanation := jsondiff.Compare(b, []byte(test.output), &jdopts)
747+
assert.JSONEq(t, test.output, string(b))
748+
assert.Equalf(t, jsondiff.FullMatch.String(), diff.String(), "%s: %s", test.name, explanation)
749+
750+
v, ok := outMsgs[0].MetaGetMut("schema_id")
751+
assert.True(t, ok)
752+
assert.Equal(t, test.defaultID, v)
753+
}
754+
})
755+
}
756+
}
757+
683758
func TestSchemaRegistryDecodeJson(t *testing.T) {
684759
returnedSchema3 := false
685760
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {

0 commit comments

Comments
 (0)