From f7ee8b52d04fde10c3a7b7bdb767df46570e39b3 Mon Sep 17 00:00:00 2001 From: Jacob Bandes-Storch Date: Tue, 28 Feb 2023 14:42:28 -0800 Subject: [PATCH] fix bad mutation in merge (#839) **Public-Facing Changes** Fixed a bug where `mcap merge` would incorrectly change some channels' schemas. Fixed a bug where `mcap merge` would crash on some input files with schemaless channels (schemaId=0). **Description** Fixes #837 --- go/cli/mcap/cmd/merge.go | 15 +++-- go/cli/mcap/cmd/merge_test.go | 106 ++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 5 deletions(-) diff --git a/go/cli/mcap/cmd/merge.go b/go/cli/mcap/cmd/merge.go index 7584f93956..4eea0ecc75 100644 --- a/go/cli/mcap/cmd/merge.go +++ b/go/cli/mcap/cmd/merge.go @@ -187,7 +187,7 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []io.Reader) error { return fmt.Errorf("failed to read first message on input %d: %w", inputID, err) } if schema != nil { - schema.ID, err = m.addSchema(writer, inputID, schema) + _, err = m.addSchema(writer, inputID, schema) if err != nil { return fmt.Errorf("failed to add initial schema for input %d: %w", inputID, err) } @@ -231,10 +231,15 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []io.Reader) error { var ok bool newMessage.ChannelID, ok = m.outputChannelID(msg.InputID, newChannel.ID) if !ok { - _, ok := m.outputSchemaID(msg.InputID, newSchema.ID) - if !ok { - // if the schema is unknown, add it to the output - m.addSchema(writer, msg.InputID, newSchema) + if newSchema != nil { + _, ok := m.outputSchemaID(msg.InputID, newSchema.ID) + if !ok { + // if the schema is unknown, add it to the output + _, err := m.addSchema(writer, msg.InputID, newSchema) + if err != nil { + return fmt.Errorf("failed to add schema: %w", err) + } + } } newMessage.ChannelID, err = m.addChannel(writer, msg.InputID, newChannel) if err != nil { diff --git a/go/cli/mcap/cmd/merge_test.go b/go/cli/mcap/cmd/merge_test.go index edf294b1da..a34c08ea6e 100644 --- a/go/cli/mcap/cmd/merge_test.go +++ b/go/cli/mcap/cmd/merge_test.go @@ -70,6 +70,63 @@ func TestMCAPMerging(t *testing.T) { } } +func TestChannelsWithSameSchema(t *testing.T) { + buf := &bytes.Buffer{} + writer, err := mcap.NewWriter(buf, &mcap.WriterOptions{ + Chunked: true, + }) + assert.Nil(t, err) + assert.Nil(t, writer.WriteHeader(&mcap.Header{Profile: "testprofile"})) + + assert.Nil(t, writer.WriteSchema(&mcap.Schema{ + ID: 1, + Name: "foo", + })) + assert.Nil(t, writer.WriteSchema(&mcap.Schema{ + ID: 2, + Name: "bar", + })) + assert.Nil(t, writer.WriteChannel(&mcap.Channel{ + ID: 1, + SchemaID: 2, + Topic: "/bar1", + })) + assert.Nil(t, writer.WriteChannel(&mcap.Channel{ + ID: 2, + SchemaID: 2, + Topic: "/bar2", + })) + assert.Nil(t, writer.WriteChannel(&mcap.Channel{ + ID: 3, + SchemaID: 1, + Topic: "/foo", + })) + assert.Nil(t, writer.WriteMessage(&mcap.Message{ + ChannelID: 1, + })) + assert.Nil(t, writer.WriteMessage(&mcap.Message{ + ChannelID: 2, + })) + assert.Nil(t, writer.WriteMessage(&mcap.Message{ + ChannelID: 3, + })) + assert.Nil(t, writer.Close()) + merger := newMCAPMerger(mergeOpts{ + chunked: true, + }) + output := &bytes.Buffer{} + assert.Nil(t, merger.mergeInputs(output, []io.Reader{buf})) + reader, err := mcap.NewReader(bytes.NewReader(output.Bytes())) + assert.Nil(t, err) + info, err := reader.Info() + assert.Nil(t, err) + + assert.NotNil(t, info.Schemas) + assert.Equal(t, 2, len(info.Schemas)) + assert.Equal(t, info.Schemas[1].Name, "bar") + assert.Equal(t, info.Schemas[2].Name, "foo") +} + func TestMultiChannelInput(t *testing.T) { buf1 := &bytes.Buffer{} buf2 := &bytes.Buffer{} @@ -126,3 +183,52 @@ func TestSchemalessChannelInput(t *testing.T) { assert.Equal(t, 100, schemaIDs[0]) assert.Equal(t, 100, schemaIDs[1]) } + +func TestMultipleSchemalessChannelSingleInput(t *testing.T) { + buf := &bytes.Buffer{} + writer, err := mcap.NewWriter(buf, &mcap.WriterOptions{ + Chunked: true, + }) + assert.Nil(t, err) + assert.Nil(t, writer.WriteHeader(&mcap.Header{Profile: "testprofile"})) + + assert.Nil(t, writer.WriteChannel(&mcap.Channel{ + ID: 1, + SchemaID: 0, + Topic: "/foo", + })) + assert.Nil(t, writer.WriteChannel(&mcap.Channel{ + ID: 2, + SchemaID: 0, + Topic: "/bar", + })) + assert.Nil(t, writer.WriteMessage(&mcap.Message{ + ChannelID: 1, + })) + assert.Nil(t, writer.WriteMessage(&mcap.Message{ + ChannelID: 2, + })) + assert.Nil(t, writer.Close()) + + merger := newMCAPMerger(mergeOpts{}) + output := &bytes.Buffer{} + assert.Nil(t, merger.mergeInputs(output, []io.Reader{buf})) + + // output should now be a well-formed mcap + reader, err := mcap.NewReader(output) + assert.Nil(t, err) + assert.Equal(t, reader.Header().Profile, "testprofile") + it, err := reader.Messages(readopts.UsingIndex(false)) + assert.Nil(t, err) + messages := make(map[string]int) + schemaIDs := make(map[uint16]int) + err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error { + messages[channel.Topic]++ + schemaIDs[channel.SchemaID]++ + return nil + }) + assert.Nil(t, err) + assert.Equal(t, 1, messages["/foo"]) + assert.Equal(t, 1, messages["/bar"]) + assert.Equal(t, 2, schemaIDs[0]) +}