Skip to content

Commit

Permalink
Add option to dump fragment data on error (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
at-wat authored Apr 2, 2021
1 parent 130c8c4 commit bd2dcb1
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 18 deletions.
18 changes: 18 additions & 0 deletions provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ type PutMediaOptions struct {
onError func(error)
retryCount int
retryIntervalBase time.Duration
fragmentHeadDumpLen int
}

type PutMediaOption func(*PutMediaOptions)
Expand Down Expand Up @@ -397,6 +398,14 @@ func (p *Provider) putMedia(conn *connection, chResp chan FragmentEvent, opts *P
if err = p.putMediaRaw(&nopCloser{bytes.NewReader(backup.Bytes())}, chResp, opts); err == nil {
break
}
if fe, ok := err.(*FragmentEvent); ok && opts.fragmentHeadDumpLen > 0 {
bb := backup.Bytes()
if len(bb) > opts.fragmentHeadDumpLen {
fe.fragmentHead = bb[:opts.fragmentHeadDumpLen]
} else {
fe.fragmentHead = bb
}
}
interval *= 2
}
}
Expand Down Expand Up @@ -490,6 +499,15 @@ func WithConnectionTimeout(timeout time.Duration) PutMediaOption {
}
}

// WithFragmentHeadDumpLen sets fragment data head dump length embedded to the FragmentEvent error message.
// Data dump is enabled only if PutMediaRetry is enabled.
// Set zero to disable.
func WithFragmentHeadDumpLen(n int) PutMediaOption {
return func(p *PutMediaOptions) {
p.fragmentHeadDumpLen = n
}
}

func WithHttpClient(client http.Client) PutMediaOption {
return func(p *PutMediaOptions) {
p.httpClient = client
Expand Down
151 changes: 135 additions & 16 deletions provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
package kinesisvideomanager_test

import (
"bytes"
"context"
"errors"
"fmt"
"net"
"net/http"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -39,9 +41,7 @@ var testData = [][]byte{{0x01, 0x02}}
func TestProvider(t *testing.T) {
var mu sync.Mutex

retryOpts := []kvm.PutMediaOption{
kvm.WithPutMediaRetry(2, 100*time.Millisecond),
}
retryOpt := kvm.WithPutMediaRetry(2, 100*time.Millisecond)

fragmentEventFmt := `{"EventType":"ERROR","FragmentTimecode":%d,"FragmentNumber":"91343852333754009371412493862204112772176002064","ErrorId":5000,"ErrorCode":"DUMMY_ERROR"}`

Expand All @@ -67,6 +67,7 @@ func TestProvider(t *testing.T) {
mockServerOpts func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption
putMediaOpts []kvm.PutMediaOption
expected []kvsm.FragmentTest
errCheck func(*testing.T, int, error) bool
}{
"NoError": {
mockServerOpts: func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption { return nil },
Expand All @@ -91,7 +92,7 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: retryOpts,
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1},
},
"DelayedHTTPErrorRetry": {
Expand All @@ -114,7 +115,7 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: retryOpts,
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1},
},
"KinesisErrorRetry": {
Expand All @@ -135,9 +136,121 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: retryOpts,
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1},
},
"KinesisFailDumpShort": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
return []kvsm.KinesisVideoServerOption{
kvsm.WithPutMediaHook(func(timecode uint64, f *kvsm.FragmentTest, w http.ResponseWriter) bool {
if _, err := w.Write([]byte(fmt.Sprintf(fragmentEventFmt, timecode))); err != nil {
t.Error(err)
}
t.Logf("Kinesis error injected: timecode=%d", timecode)
return false
}),
}
},
putMediaOpts: []kvm.PutMediaOption{
retryOpt,
kvm.WithFragmentHeadDumpLen(17),
kvm.WithSegmentUID([]byte{0x00, 0x01, 0x02, 0x03}),
},
expected: []kvsm.FragmentTest{},
errCheck: func(t *testing.T, cnt int, err error) bool {
if err == nil {
t.Error("Expected error")
return false
}
if cnt != 2 {
// Skip first fragment.
return false
}
fe, ok := err.(*kvm.FragmentEvent)
if !ok {
t.Errorf("Expected FragmentEvent, got %T", err)
return false
}
expectedDump := []byte{
0x1a, 0x45, 0xdf, 0xa3, 0xa4, // EBML
0x42, 0x86, 0x81, 0x01, // EBMLVersion
0x42, 0xf7, 0x81, 0x01, // ReadVersion
0x42, 0xf2, 0x81, 0x04, // MaxIDLength
}
if dump := fe.Dump(); !bytes.Equal(expectedDump, dump) {
t.Errorf("Expected dump:\n%v\ngot:\n%v", expectedDump, dump)
}
return true
},
},
"KinesisFailDump": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
return []kvsm.KinesisVideoServerOption{
kvsm.WithPutMediaHook(func(timecode uint64, f *kvsm.FragmentTest, w http.ResponseWriter) bool {
if _, err := w.Write([]byte(fmt.Sprintf(fragmentEventFmt, timecode))); err != nil {
t.Error(err)
}
t.Logf("Kinesis error injected: timecode=%d", timecode)
return false
}),
}
},
putMediaOpts: []kvm.PutMediaOption{
retryOpt,
kvm.WithFragmentHeadDumpLen(512),
kvm.WithSegmentUID([]byte{0x00, 0x01, 0x02, 0x03}),
},
expected: []kvsm.FragmentTest{},
errCheck: func(t *testing.T, cnt int, err error) bool {
if err == nil {
t.Error("Expected error")
return false
}
if cnt != 2 {
// Skip first fragment.
return false
}
fe, ok := err.(*kvm.FragmentEvent)
if !ok {
t.Errorf("Expected FragmentEvent, got %T", err)
return false
}
expectedDump := []byte{
0x1a, 0x45, 0xdf, 0xa3, 0xa4, // EBML
0x42, 0x86, 0x81, 0x01, // EBMLVersion
0x42, 0xf7, 0x81, 0x01, // ReadVersion
0x42, 0xf2, 0x81, 0x04, // MaxIDLength
0x42, 0xf3, 0x81, 0x08, // MaxSizeLength
0x42, 0x82, 0x89, 0x6d, 0x61, 0x74, 0x72, 0x6f, 0x73, 0x6b, 0x61, 0x00, // DocType
0x42, 0x87, 0x81, 0x02, // DocTypeVersion
0x42, 0x85, 0x81, 0x02, // DocTypeReadVersion
0x18, 0x53, 0x80, 0x67, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // Segment
0x15, 0x49, 0xa9, 0x66, 0xf2, // Info
0x2a, 0xd7, 0xb1, 0x83, 0x0f, 0x42, 0x40, // TimecodeScale
0x73, 0xa4, 0x84, 0x00, 0x01, 0x02, 0x03, // SegmentUID
0x73, 0x84, 0x81, 0x00, // SegmentFilename
0x7b, 0xa9, 0x9d, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x00, // Title
0x4d, 0x80, 0x9d, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x00, // MuxingApp
0x57, 0x41, 0x9d, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x00, // WritingApp
0x16, 0x54, 0xae, 0x6b, 0x80, // Tracks
0x1f, 0x43, 0xb6, 0x75, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // Cluster
0xe7, 0x82, 0x00, 0x00, // Timecode
0xa3, 0x86, 0x81, 0x00, 0x00, 0x00, 0x01, 0x02, // SimpleBlock
0xa3, 0x86, 0x81, 0x00, 0x01, 0x00, 0x01, 0x02, // SimpleBlock
0x12, 0x54, 0xc3, 0x67, 0x97, // Tags
0x73, 0x73, 0x94, // Tag
0x67, 0xc8, 0x91, // SimpleTag
0x45, 0xa3, 0x89, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x54, 0x41, 0x47, 0x00, // TagName
0x44, 0x87, 0x82, 0x32, 0x00, // TagString
}
dump := fe.Dump()
dump[191], dump[192] = 0, 0 // clear Timecode
if !bytes.Equal(expectedDump, dump) {
t.Errorf("Expected dump:\n%v\ngot:\n%v", expectedDump, dump)
}
return true
},
},
"DelayedKinesisErrorRetry": {
mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption {
return []kvsm.KinesisVideoServerOption{
Expand All @@ -157,7 +270,7 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: retryOpts,
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1},
},
"DisconnectRetry": {
Expand All @@ -176,7 +289,7 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: retryOpts,
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1},
},
"DelayedDisconnectRetry": {
Expand All @@ -196,7 +309,7 @@ func TestProvider(t *testing.T) {
}),
}
},
putMediaOpts: retryOpts,
putMediaOpts: []kvm.PutMediaOption{retryOpt},
expected: []kvsm.FragmentTest{expected0, expected1},
},
}
Expand All @@ -205,7 +318,6 @@ func TestProvider(t *testing.T) {
testCase := testCase
t.Run(name, func(t *testing.T) {
var wg sync.WaitGroup
defer wg.Wait()

dropped := make(map[uint64]bool)
var disconnected bool
Expand Down Expand Up @@ -239,6 +351,7 @@ func TestProvider(t *testing.T) {
Timecode: tc,
Block: newBlock(0),
}
time.Sleep(10 * time.Millisecond)
}
}()

Expand All @@ -262,25 +375,31 @@ func TestProvider(t *testing.T) {
}
}()

cnt := 0
var err error
var cntErr, cntTag uint32
var skipBelow uint32
opts := []kvm.PutMediaOption{
kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative),
kvm.WithProducerStartTimestamp(startTimestamp),
kvm.WithTags(func() []kvm.SimpleTag {
cnt++
cnt := atomic.AddUint32(&cntTag, 1)
return []kvm.SimpleTag{
{TagName: "TEST_TAG", TagString: fmt.Sprintf("%d", cnt)},
}
}),
kvm.OnError(func(e error) {
err = e
if testCase.errCheck != nil {
cnt := atomic.AddUint32(&cntErr, 1)
if testCase.errCheck(t, int(cnt), e) {
atomic.StoreUint32(&skipBelow, 1)
}
}
}),
}
opts = append(opts, testCase.putMediaOpts...)
pro.PutMedia(ch, chResp, opts...)
if err != nil {
t.Fatalf("Failed to run PutMedia: %v", err)
wg.Wait()
if skipBelow == 1 {
return
}

<-ctx.Done()
Expand Down
15 changes: 13 additions & 2 deletions putresp.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package kinesisvideomanager

import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -52,6 +53,8 @@ type FragmentEvent struct {
FragmentNumber string // 158-bit number, handle as string
ErrorId ErrorID
ErrorCode string

fragmentHead []byte
}

func (e *FragmentEvent) IsError() bool {
Expand All @@ -62,11 +65,19 @@ func (e *FragmentEvent) Error() string {
if e.EventType != "ERROR" {
panic("non-error FragmentEvent is used as error")
}
return fmt.Sprintf(`fragment event error: { Timecode: %d, FragmentNumber: %s, ErrorId: %d, ErrorCode: "%s" }`,
e.FragmentTimecode, e.FragmentNumber, e.ErrorId, e.ErrorCode,
var dump string
if len(e.fragmentHead) > 0 {
dump = `, Data: "` + base64.RawStdEncoding.EncodeToString(e.fragmentHead) + `"`
}
return fmt.Sprintf(`fragment event error: { Timecode: %d, FragmentNumber: %s, ErrorId: %d, ErrorCode: "%s"%s }`,
e.FragmentTimecode, e.FragmentNumber, e.ErrorId, e.ErrorCode, dump,
)
}

func (e *FragmentEvent) Dump() []byte {
return e.fragmentHead
}

func parseFragmentEvent(r io.Reader) ([]FragmentEvent, error) {
dec := json.NewDecoder(r)
var ret []FragmentEvent
Expand Down
7 changes: 7 additions & 0 deletions putresp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ func TestFragmentEvent(t *testing.T) {
if s := fe[0].Error(); s != expected {
t.Errorf("Expected error string:\n%s\ngot:\n%s", expected, s)
}

fe[0].fragmentHead = []byte("test")

expected2 := `fragment event error: { Timecode: 12345, FragmentNumber: 91343852333754009371412493862204112772176002064, ErrorId: 5000, ErrorCode: "DUMMY_ERROR", Data: "dGVzdA" }`
if s := fe[0].Error(); s != expected2 {
t.Errorf("Expected error string:\n%s\ngot:\n%s", expected2, s)
}
})
t.Run("ParseError", func(t *testing.T) {
_, err := parseFragmentEvent(strings.NewReader("{"))
Expand Down

0 comments on commit bd2dcb1

Please sign in to comment.