Skip to content

Commit

Permalink
Add subcommand for sorting a file (#1009)
Browse files Browse the repository at this point in the history
Adds a "sort" subcommand to the mcap CLI tool. This will rewrite the
messages into a new file, physically sorted on time.
  • Loading branch information
Wyatt Alt authored Nov 3, 2023
1 parent 3a38aea commit f9e4a6d
Show file tree
Hide file tree
Showing 5 changed files with 325 additions and 1 deletion.
196 changes: 196 additions & 0 deletions go/cli/mcap/cmd/sort.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package cmd

import (
"errors"
"fmt"
"io"
"os"

"github.com/foxglove/mcap/go/mcap"
"github.com/spf13/cobra"
)

var (
sortOutputFile string
sortChunkSize int64
sortCompression string
sortIncludeCRC bool
sortChunked bool
)

type errUnindexedFile struct {
err error
}

func (e errUnindexedFile) Error() string {
return e.err.Error()
}

func (e errUnindexedFile) Is(tgt error) bool {
_, ok := tgt.(errUnindexedFile)
return ok
}

func fileHasNoMessages(r io.ReadSeeker) (bool, error) {
_, err := r.Seek(0, io.SeekStart)
if err != nil {
return false, err
}
reader, err := mcap.NewReader(r)
if err != nil {
return false, err
}
defer reader.Close()
it, err := reader.Messages(mcap.UsingIndex(false), mcap.InOrder(mcap.FileOrder))
if err != nil {
return false, err
}
_, _, _, err = it.Next(nil)
if err != nil {
if errors.Is(err, io.EOF) {
return true, nil
}
return false, err
}
return false, nil
}

func sortFile(w io.Writer, r io.ReadSeeker) error {
reader, err := mcap.NewReader(r)
if err != nil {
return fmt.Errorf("failed to create reader: %w", err)
}
writer, err := mcap.NewWriter(w, &mcap.WriterOptions{
Chunked: sortChunked,
Compression: mcap.CompressionFormat(sortCompression),
ChunkSize: sortChunkSize,
IncludeCRC: sortIncludeCRC,
})
if err != nil {
return fmt.Errorf("failed to create writer: %w", err)
}
info, err := reader.Info()
if err != nil {
return errUnindexedFile{err}
}

isEmpty, err := fileHasNoMessages(r)
if err != nil {
return fmt.Errorf("failed to check if file is empty: %w", err)
}

if len(info.ChunkIndexes) == 0 && !isEmpty {
return errUnindexedFile{errors.New("no chunk index records")}
}

err = writer.WriteHeader(info.Header)
if err != nil {
return fmt.Errorf("failed to write header: %w", err)
}

// handle the attachments and metadata metadata first; physical location in
// the file is irrelevant but order is preserved.
for _, index := range info.AttachmentIndexes {
attReader, err := reader.GetAttachmentReader(index.Offset)
if err != nil {
return fmt.Errorf("failed to read attachment: %w", err)
}
err = writer.WriteAttachment(&mcap.Attachment{
Name: index.Name,
MediaType: index.MediaType,
CreateTime: index.CreateTime,
LogTime: index.LogTime,
DataSize: index.DataSize,
Data: attReader.Data(),
})
if err != nil {
return fmt.Errorf("failed to write attachment: %w", err)
}
}
for _, index := range info.MetadataIndexes {
metadata, err := reader.GetMetadata(index.Offset)
if err != nil {
return fmt.Errorf("failed to read metadata: %w", err)
}
err = writer.WriteMetadata(metadata)
if err != nil {
return fmt.Errorf("failed to write metadata: %w", err)
}
}

it, err := reader.Messages(mcap.UsingIndex(true), mcap.InOrder(mcap.LogTimeOrder))
if err != nil {
return fmt.Errorf("failed to read messages: %w", err)
}
schemas := make(map[uint16]*mcap.Schema)
channels := make(map[uint16]*mcap.Schema)
for {
schema, channel, message, err := it.Next(nil)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
}
if schema != nil {
if _, ok := schemas[schema.ID]; !ok {
err := writer.WriteSchema(schema)
if err != nil {
return fmt.Errorf("failed to write schema: %w", err)
}
}
}
if _, ok := channels[channel.ID]; !ok {
err := writer.WriteChannel(channel)
if err != nil {
return fmt.Errorf("failed to write channel: %w", err)
}
}
err = writer.WriteMessage(message)
if err != nil {
return fmt.Errorf("failed to write message: %w", err)
}
}

return writer.Close()
}

var sortCmd = &cobra.Command{
Use: "sort [file] -o output.mcap",
Short: "Read an MCAP file and write the messages out physically sorted on log time",
Run: func(cmd *cobra.Command, args []string) {
if len(args) != 1 {
die("supply a file")
}
f, err := os.Open(args[0])
if err != nil {
die("failed to open file: %s", err)
}
defer f.Close()

output, err := os.Create(sortOutputFile)
if err != nil {
die("failed to open output: %s", err)
}
err = sortFile(output, f)
if err != nil {
if errors.Is(err, errUnindexedFile{}) {
die("Error reading file index: %s. "+
"You may need to run `mcap recover` if the file is corrupt or not chunk indexed.", err)
}
die("failed to sort file: %s", err)
}
},
}

func init() {
rootCmd.AddCommand(sortCmd)
sortCmd.PersistentFlags().StringVarP(&sortOutputFile, "output-file", "o", "", "output file")
sortCmd.PersistentFlags().Int64VarP(&sortChunkSize, "chunk-size", "", 4*1024*1024, "chunk size")
sortCmd.PersistentFlags().StringVarP(&sortCompression, "compression", "", "zstd", "chunk compression algorithm")
sortCmd.PersistentFlags().BoolVarP(&sortIncludeCRC, "include-crc", "", true, "include chunk CRCs in output")
sortCmd.PersistentFlags().BoolVarP(&sortChunked, "chunked", "", true, "create an indexed and chunk-compressed output")
err := sortCmd.MarkPersistentFlagRequired("output-file")
if err != nil {
die("failed to mark flag required: %s", err)
}
}
74 changes: 74 additions & 0 deletions go/cli/mcap/cmd/sort_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package cmd

import (
"bytes"
"testing"

"github.com/foxglove/mcap/go/mcap"
"github.com/stretchr/testify/assert"
)

func TestSortFile(t *testing.T) {
buf := &bytes.Buffer{}
writer, err := mcap.NewWriter(buf, &mcap.WriterOptions{
Chunked: true,
})
assert.Nil(t, err)
assert.Nil(t, writer.WriteHeader(&mcap.Header{}))
assert.Nil(t, writer.WriteSchema(&mcap.Schema{
ID: 1,
Name: "foo",
Encoding: "ros1",
Data: []byte{},
}))
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 0,
SchemaID: 1,
Topic: "/foo",
MessageEncoding: "ros1msg",
}))
assert.Nil(t, writer.WriteChannel(&mcap.Channel{
ID: 2,
SchemaID: 0,
Topic: "/bar",
MessageEncoding: "ros1msg",
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 0,
Sequence: 0,
LogTime: 100,
PublishTime: 0,
Data: []byte{},
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 0,
Sequence: 0,
LogTime: 50,
PublishTime: 0,
Data: []byte{},
}))
assert.Nil(t, writer.WriteMessage(&mcap.Message{
ChannelID: 2,
Sequence: 0,
LogTime: 25,
PublishTime: 0,
Data: []byte{},
}))
assert.Nil(t, writer.Close())

// sort the file
reader := bytes.NewReader(buf.Bytes())
w := &bytes.Buffer{}
assert.Nil(t, sortFile(w, reader))

// verify it is now sorted
r, err := mcap.NewReader(bytes.NewReader(w.Bytes()))
assert.Nil(t, err)

it, err := r.Messages(mcap.UsingIndex(false))
assert.Nil(t, err)

_, _, msg, err := it.Next(nil)
assert.Nil(t, err)
assert.Equal(t, 25, int(msg.LogTime))
}
14 changes: 14 additions & 0 deletions go/mcap/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ func (r *Reader) Info() (*Info, error) {
return info, nil
}

// GetAttachmentReader returns an attachment reader located at the specific offset.
// The reader must be consumed before the base reader is used again.
func (r *Reader) GetAttachmentReader(offset uint64) (*AttachmentReader, error) {
_, err := r.rs.Seek(int64(offset+9), io.SeekStart)
if err != nil {
return nil, err
}
ar, err := parseAttachmentReader(r.rs, true)
if err != nil {
return nil, err
}
return ar, nil
}

func (r *Reader) GetMetadata(offset uint64) (*Metadata, error) {
_, err := r.rs.Seek(int64(offset), io.SeekStart)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions go/mcap/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,46 @@ func TestReadingMetadata(t *testing.T) {
assert.Equal(t, expectedMetadata, metadata)
}

func TestGetAttachmentReader(t *testing.T) {
buf := &bytes.Buffer{}
writer, err := NewWriter(buf, &WriterOptions{
Chunked: true,
ChunkSize: 1024,
Compression: "",
})
assert.Nil(t, err)
assert.Nil(t, writer.WriteHeader(&Header{}))
assert.Nil(t, writer.WriteAttachment(&Attachment{
LogTime: 10,
CreateTime: 1000,
Name: "foo",
MediaType: "text",
DataSize: 3,
Data: bytes.NewReader([]byte{'a', 'b', 'c'}),
}))
assert.Nil(t, writer.Close())

reader, err := NewReader(bytes.NewReader(buf.Bytes()))
assert.Nil(t, err)

info, err := reader.Info()
assert.Nil(t, err)
assert.Equal(t, 1, len(info.AttachmentIndexes))
idx := info.AttachmentIndexes[0]
ar, err := reader.GetAttachmentReader(idx.Offset)
assert.Nil(t, err)

assert.Equal(t, "foo", ar.Name)
assert.Equal(t, "text", ar.MediaType)
assert.Equal(t, 3, int(ar.DataSize))
assert.Equal(t, 10, int(ar.LogTime))
assert.Equal(t, 1000, int(ar.CreateTime))

data, err := io.ReadAll(ar.Data())
assert.Nil(t, err)
assert.Equal(t, []byte{'a', 'b', 'c'}, data)
}

func TestReadingMessageOrderWithOverlappingChunks(t *testing.T) {
buf := &bytes.Buffer{}
// write an MCAP with two chunks, where in each chunk all messages have ascending timestamps,
Expand Down
2 changes: 1 addition & 1 deletion go/mcap/version.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package mcap

// Version of the MCAP library.
var Version = "v1.0.4"
var Version = "v1.1.0"

0 comments on commit f9e4a6d

Please sign in to comment.