Skip to content

Commit

Permalink
fix bad mutation in merge (#839)
Browse files Browse the repository at this point in the history
**Public-Facing Changes**
Fixed a bug where `mcap merge` would incorrectly change some channels' schemas.
Fixed a bug where `mcap merge` would crash on some input files with schemaless channels (schemaId=0).

**Description**
Fixes #837
  • Loading branch information
jtbandes authored Feb 28, 2023
1 parent edd144f commit f7ee8b5
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 5 deletions.
15 changes: 10 additions & 5 deletions go/cli/mcap/cmd/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []io.Reader) error {
return fmt.Errorf("failed to read first message on input %d: %w", inputID, err)
}
if schema != nil {
schema.ID, err = m.addSchema(writer, inputID, schema)
_, err = m.addSchema(writer, inputID, schema)
if err != nil {
return fmt.Errorf("failed to add initial schema for input %d: %w", inputID, err)
}
Expand Down Expand Up @@ -231,10 +231,15 @@ func (m *mcapMerger) mergeInputs(w io.Writer, inputs []io.Reader) error {
var ok bool
newMessage.ChannelID, ok = m.outputChannelID(msg.InputID, newChannel.ID)
if !ok {
_, ok := m.outputSchemaID(msg.InputID, newSchema.ID)
if !ok {
// if the schema is unknown, add it to the output
m.addSchema(writer, msg.InputID, newSchema)
if newSchema != nil {
_, ok := m.outputSchemaID(msg.InputID, newSchema.ID)
if !ok {
// if the schema is unknown, add it to the output
_, err := m.addSchema(writer, msg.InputID, newSchema)
if err != nil {
return fmt.Errorf("failed to add schema: %w", err)
}
}
}
newMessage.ChannelID, err = m.addChannel(writer, msg.InputID, newChannel)
if err != nil {
Expand Down
106 changes: 106 additions & 0 deletions go/cli/mcap/cmd/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,63 @@ func TestMCAPMerging(t *testing.T) {
}
}

func TestChannelsWithSameSchema(t *testing.T) {
buf := &bytes.Buffer{}
writer, err := mcap.NewWriter(buf, &mcap.WriterOptions{
Chunked: true,
})
assert.Nil(t, err)
assert.Nil(t, writer.WriteHeader(&mcap.Header{Profile: "testprofile"}))

assert.Nil(t, writer.WriteSchema(&mcap.Schema{
ID: 1,
Name: "foo",
}))
assert.Nil(t, writer.WriteSchema(&mcap.Schema{
ID: 2,
Name: "bar",
}))
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 1,
SchemaID: 2,
Topic: "/bar1",
}))
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 2,
SchemaID: 2,
Topic: "/bar2",
}))
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 3,
SchemaID: 1,
Topic: "/foo",
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 1,
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 2,
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 3,
}))
assert.Nil(t, writer.Close())
merger := newMCAPMerger(mergeOpts{
chunked: true,
})
output := &bytes.Buffer{}
assert.Nil(t, merger.mergeInputs(output, []io.Reader{buf}))
reader, err := mcap.NewReader(bytes.NewReader(output.Bytes()))
assert.Nil(t, err)
info, err := reader.Info()
assert.Nil(t, err)

assert.NotNil(t, info.Schemas)
assert.Equal(t, 2, len(info.Schemas))
assert.Equal(t, info.Schemas[1].Name, "bar")
assert.Equal(t, info.Schemas[2].Name, "foo")
}

func TestMultiChannelInput(t *testing.T) {
buf1 := &bytes.Buffer{}
buf2 := &bytes.Buffer{}
Expand Down Expand Up @@ -126,3 +183,52 @@ func TestSchemalessChannelInput(t *testing.T) {
assert.Equal(t, 100, schemaIDs[0])
assert.Equal(t, 100, schemaIDs[1])
}

func TestMultipleSchemalessChannelSingleInput(t *testing.T) {
buf := &bytes.Buffer{}
writer, err := mcap.NewWriter(buf, &mcap.WriterOptions{
Chunked: true,
})
assert.Nil(t, err)
assert.Nil(t, writer.WriteHeader(&mcap.Header{Profile: "testprofile"}))

assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 1,
SchemaID: 0,
Topic: "/foo",
}))
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 2,
SchemaID: 0,
Topic: "/bar",
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 1,
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 2,
}))
assert.Nil(t, writer.Close())

merger := newMCAPMerger(mergeOpts{})
output := &bytes.Buffer{}
assert.Nil(t, merger.mergeInputs(output, []io.Reader{buf}))

// output should now be a well-formed mcap
reader, err := mcap.NewReader(output)
assert.Nil(t, err)
assert.Equal(t, reader.Header().Profile, "testprofile")
it, err := reader.Messages(readopts.UsingIndex(false))
assert.Nil(t, err)
messages := make(map[string]int)
schemaIDs := make(map[uint16]int)
err = mcap.Range(it, func(schema *mcap.Schema, channel *mcap.Channel, message *mcap.Message) error {
messages[channel.Topic]++
schemaIDs[channel.SchemaID]++
return nil
})
assert.Nil(t, err)
assert.Equal(t, 1, messages["/foo"])
assert.Equal(t, 1, messages["/bar"])
assert.Equal(t, 2, schemaIDs[0])
}

0 comments on commit f7ee8b5

Please sign in to comment.