Skip to content

Commit

Permalink
CLI: JSON output option for cat (#289)
Browse files Browse the repository at this point in the history
Adds a ros1msg JSON transcoder to the ros utilities package, and calls
it from the CLI's cat command when the --json switch is supplied.
  • Loading branch information
wkalt authored Mar 14, 2022
1 parent d49fc09 commit 5a3c884
Show file tree
Hide file tree
Showing 11 changed files with 1,545 additions and 37 deletions.
3 changes: 3 additions & 0 deletions go/cli/mcap/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,6 @@ lint:

test:
go test ./...

bench:
make -C cmd bench
96 changes: 89 additions & 7 deletions go/cli/mcap/cmd/cat.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,79 @@
package cmd

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math"
"os"
"strconv"
"strings"

"github.com/foxglove/mcap/go/cli/mcap/utils"
"github.com/foxglove/mcap/go/cli/mcap/utils/ros"
"github.com/foxglove/mcap/go/mcap"
"github.com/spf13/cobra"
)

var (
topics string
start int64
end int64
topics string
start int64
end int64
formatJSON bool
)

func printMessages(ctx context.Context, w io.Writer, it mcap.MessageIterator) error {
type DecimalTime uint64

func digits(n uint64) int {
if n == 0 {
return 1
}
count := 0
for n != 0 {
n = n / 10
count++
}
return count
}

func (t DecimalTime) MarshalJSON() ([]byte, error) {
seconds := uint64(t) / 1e9
nanoseconds := uint64(t) % 1e9
requiredLength := digits(seconds) + 1 + 9
buf := make([]byte, 0, requiredLength)
buf = strconv.AppendInt(buf, int64(seconds), 10)
buf = append(buf, '.')
for i := 0; i < 9-digits(nanoseconds); i++ {
buf = append(buf, '0')
}
buf = strconv.AppendInt(buf, int64(nanoseconds), 10)
return buf, nil
}

type Message struct {
Topic string `json:"topic"`
Sequence uint32 `json:"sequence"`
LogTime DecimalTime `json:"log_time"`
PublishTime DecimalTime `json:"publish_time"`
Data json.RawMessage `json:"data"`
}

func printMessages(
ctx context.Context,
w io.Writer,
it mcap.MessageIterator,
formatJSON bool,
) error {
msg := &bytes.Buffer{}
msgReader := &bytes.Reader{}
buf := make([]byte, 1024*1024)
transcoders := make(map[uint16]*ros.JSONTranscoder)
encoder := json.NewEncoder(w)
target := Message{}
for {
schema, channel, message, err := it.Next(buf)
if err != nil {
Expand All @@ -31,7 +82,37 @@ func printMessages(ctx context.Context, w io.Writer, it mcap.MessageIterator) er
}
log.Fatalf("Failed to read next message: %s", err)
}
fmt.Fprintf(w, "%d %s [%s] %v...\n", message.LogTime, channel.Topic, schema.Name, message.Data[:10])
if !formatJSON {
fmt.Fprintf(w, "%d %s [%s] %v...\n", message.LogTime, channel.Topic, schema.Name, message.Data[:10])
continue
}
if schema.Encoding != "ros1msg" {
return fmt.Errorf("JSON output only supported for ros1msg schemas")
}
transcoder, ok := transcoders[channel.SchemaID]
if !ok {
packageName := strings.Split(schema.Name, "/")[0]
transcoder, err = ros.NewJSONTranscoder(packageName, schema.Data)
if err != nil {
return fmt.Errorf("failed to build transcoder for %s: %w", channel.Topic, err)
}
transcoders[channel.SchemaID] = transcoder
}
msgReader.Reset(message.Data)
err = transcoder.Transcode(msg, msgReader)
if err != nil {
return fmt.Errorf("failed to transcode %s record on %s: %w", schema.Name, channel.Topic, err)
}
target.Topic = channel.Topic
target.Sequence = message.Sequence
target.LogTime = DecimalTime(message.LogTime)
target.PublishTime = DecimalTime(message.PublishTime)
target.Data = msg.Bytes()
err = encoder.Encode(target)
if err != nil {
return fmt.Errorf("failed to write encoded message")
}
msg.Reset()
}
return nil
}
Expand All @@ -57,7 +138,7 @@ var catCmd = &cobra.Command{
if err != nil {
log.Fatalf("Failed to read messages: %s", err)
}
err = printMessages(ctx, os.Stdout, it)
err = printMessages(ctx, os.Stdout, it, formatJSON)
if err != nil {
log.Fatalf("Failed to print messages: %s", err)
}
Expand All @@ -79,7 +160,7 @@ var catCmd = &cobra.Command{
if err != nil {
return fmt.Errorf("failed to read messages: %w", err)
}
err = printMessages(ctx, os.Stdout, it)
err = printMessages(ctx, os.Stdout, it, formatJSON)
if err != nil {
return fmt.Errorf("failed to print messages: %w", err)
}
Expand All @@ -97,4 +178,5 @@ func init() {
catCmd.PersistentFlags().Int64VarP(&start, "start-secs", "", 0, "start time")
catCmd.PersistentFlags().Int64VarP(&end, "end-secs", "", math.MaxInt64, "end time")
catCmd.PersistentFlags().StringVarP(&topics, "topics", "", "", "comma-separated list of topics")
catCmd.PersistentFlags().BoolVarP(&formatJSON, "json", "", false, "print messages as JSON")
}
45 changes: 45 additions & 0 deletions go/cli/mcap/cmd/cat_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package cmd

import (
"bytes"
"context"
"io"
"io/ioutil"
"math"
"testing"

"github.com/foxglove/mcap/go/mcap"
"github.com/stretchr/testify/assert"
)

func BenchmarkCat(b *testing.B) {
ctx := context.Background()
cases := []struct {
assertion string
inputfile string
formatJSON bool
}{
{
"demo.bag",
"../../../../testdata/mcap/demo.mcap",
true,
},
}
for _, c := range cases {
input, err := ioutil.ReadFile(c.inputfile)
assert.Nil(b, err)
w := io.Discard
r := bytes.NewReader(input)
b.Run(c.assertion, func(b *testing.B) {
for i := 0; i < b.N; i++ {
reader, err := mcap.NewReader(r)
assert.Nil(b, err)
it, err := reader.Messages(0, math.MaxInt64, []string{}, true)
assert.Nil(b, err)
err = printMessages(ctx, w, it, c.formatJSON)
assert.Nil(b, err)
r.Reset(input)
}
})
}
}
Loading

0 comments on commit 5a3c884

Please sign in to comment.