From bd2dcb1bdf5fff70ac3dcba9216a8bcad513c42a Mon Sep 17 00:00:00 2001 From: Atsushi Watanabe Date: Fri, 2 Apr 2021 13:29:07 +0900 Subject: [PATCH] Add option to dump fragment data on error (#168) --- provider.go | 18 ++++++ provider_test.go | 151 ++++++++++++++++++++++++++++++++++++++++++----- putresp.go | 15 ++++- putresp_test.go | 7 +++ 4 files changed, 173 insertions(+), 18 deletions(-) diff --git a/provider.go b/provider.go index 9b81a94..a654b76 100644 --- a/provider.go +++ b/provider.go @@ -97,6 +97,7 @@ type PutMediaOptions struct { onError func(error) retryCount int retryIntervalBase time.Duration + fragmentHeadDumpLen int } type PutMediaOption func(*PutMediaOptions) @@ -397,6 +398,14 @@ func (p *Provider) putMedia(conn *connection, chResp chan FragmentEvent, opts *P if err = p.putMediaRaw(&nopCloser{bytes.NewReader(backup.Bytes())}, chResp, opts); err == nil { break } + if fe, ok := err.(*FragmentEvent); ok && opts.fragmentHeadDumpLen > 0 { + bb := backup.Bytes() + if len(bb) > opts.fragmentHeadDumpLen { + fe.fragmentHead = bb[:opts.fragmentHeadDumpLen] + } else { + fe.fragmentHead = bb + } + } interval *= 2 } } @@ -490,6 +499,15 @@ func WithConnectionTimeout(timeout time.Duration) PutMediaOption { } } +// WithFragmentHeadDumpLen sets fragment data head dump length embedded to the FragmentEvent error message. +// Data dump is enabled only if PutMediaRetry is enabled. +// Set zero to disable. +func WithFragmentHeadDumpLen(n int) PutMediaOption { + return func(p *PutMediaOptions) { + p.fragmentHeadDumpLen = n + } +} + func WithHttpClient(client http.Client) PutMediaOption { return func(p *PutMediaOptions) { p.httpClient = client diff --git a/provider_test.go b/provider_test.go index 9a96475..e769df0 100644 --- a/provider_test.go +++ b/provider_test.go @@ -15,6 +15,7 @@ package kinesisvideomanager_test import ( + "bytes" "context" "errors" "fmt" @@ -22,6 +23,7 @@ import ( "net/http" "reflect" "sync" + "sync/atomic" "testing" "time" @@ -39,9 +41,7 @@ var testData = [][]byte{{0x01, 0x02}} func TestProvider(t *testing.T) { var mu sync.Mutex - retryOpts := []kvm.PutMediaOption{ - kvm.WithPutMediaRetry(2, 100*time.Millisecond), - } + retryOpt := kvm.WithPutMediaRetry(2, 100*time.Millisecond) fragmentEventFmt := `{"EventType":"ERROR","FragmentTimecode":%d,"FragmentNumber":"91343852333754009371412493862204112772176002064","ErrorId":5000,"ErrorCode":"DUMMY_ERROR"}` @@ -67,6 +67,7 @@ func TestProvider(t *testing.T) { mockServerOpts func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption putMediaOpts []kvm.PutMediaOption expected []kvsm.FragmentTest + errCheck func(*testing.T, int, error) bool }{ "NoError": { mockServerOpts: func(*testing.T, map[uint64]bool, *bool, func()) []kvsm.KinesisVideoServerOption { return nil }, @@ -91,7 +92,7 @@ func TestProvider(t *testing.T) { }), } }, - putMediaOpts: retryOpts, + putMediaOpts: []kvm.PutMediaOption{retryOpt}, expected: []kvsm.FragmentTest{expected0, expected1}, }, "DelayedHTTPErrorRetry": { @@ -114,7 +115,7 @@ func TestProvider(t *testing.T) { }), } }, - putMediaOpts: retryOpts, + putMediaOpts: []kvm.PutMediaOption{retryOpt}, expected: []kvsm.FragmentTest{expected0, expected1}, }, "KinesisErrorRetry": { @@ -135,9 +136,121 @@ func TestProvider(t *testing.T) { }), } }, - putMediaOpts: retryOpts, + putMediaOpts: []kvm.PutMediaOption{retryOpt}, expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1}, }, + "KinesisFailDumpShort": { + mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption { + return []kvsm.KinesisVideoServerOption{ + kvsm.WithPutMediaHook(func(timecode uint64, f *kvsm.FragmentTest, w http.ResponseWriter) bool { + if _, err := w.Write([]byte(fmt.Sprintf(fragmentEventFmt, timecode))); err != nil { + t.Error(err) + } + t.Logf("Kinesis error injected: timecode=%d", timecode) + return false + }), + } + }, + putMediaOpts: []kvm.PutMediaOption{ + retryOpt, + kvm.WithFragmentHeadDumpLen(17), + kvm.WithSegmentUID([]byte{0x00, 0x01, 0x02, 0x03}), + }, + expected: []kvsm.FragmentTest{}, + errCheck: func(t *testing.T, cnt int, err error) bool { + if err == nil { + t.Error("Expected error") + return false + } + if cnt != 2 { + // Skip first fragment. + return false + } + fe, ok := err.(*kvm.FragmentEvent) + if !ok { + t.Errorf("Expected FragmentEvent, got %T", err) + return false + } + expectedDump := []byte{ + 0x1a, 0x45, 0xdf, 0xa3, 0xa4, // EBML + 0x42, 0x86, 0x81, 0x01, // EBMLVersion + 0x42, 0xf7, 0x81, 0x01, // ReadVersion + 0x42, 0xf2, 0x81, 0x04, // MaxIDLength + } + if dump := fe.Dump(); !bytes.Equal(expectedDump, dump) { + t.Errorf("Expected dump:\n%v\ngot:\n%v", expectedDump, dump) + } + return true + }, + }, + "KinesisFailDump": { + mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption { + return []kvsm.KinesisVideoServerOption{ + kvsm.WithPutMediaHook(func(timecode uint64, f *kvsm.FragmentTest, w http.ResponseWriter) bool { + if _, err := w.Write([]byte(fmt.Sprintf(fragmentEventFmt, timecode))); err != nil { + t.Error(err) + } + t.Logf("Kinesis error injected: timecode=%d", timecode) + return false + }), + } + }, + putMediaOpts: []kvm.PutMediaOption{ + retryOpt, + kvm.WithFragmentHeadDumpLen(512), + kvm.WithSegmentUID([]byte{0x00, 0x01, 0x02, 0x03}), + }, + expected: []kvsm.FragmentTest{}, + errCheck: func(t *testing.T, cnt int, err error) bool { + if err == nil { + t.Error("Expected error") + return false + } + if cnt != 2 { + // Skip first fragment. + return false + } + fe, ok := err.(*kvm.FragmentEvent) + if !ok { + t.Errorf("Expected FragmentEvent, got %T", err) + return false + } + expectedDump := []byte{ + 0x1a, 0x45, 0xdf, 0xa3, 0xa4, // EBML + 0x42, 0x86, 0x81, 0x01, // EBMLVersion + 0x42, 0xf7, 0x81, 0x01, // ReadVersion + 0x42, 0xf2, 0x81, 0x04, // MaxIDLength + 0x42, 0xf3, 0x81, 0x08, // MaxSizeLength + 0x42, 0x82, 0x89, 0x6d, 0x61, 0x74, 0x72, 0x6f, 0x73, 0x6b, 0x61, 0x00, // DocType + 0x42, 0x87, 0x81, 0x02, // DocTypeVersion + 0x42, 0x85, 0x81, 0x02, // DocTypeReadVersion + 0x18, 0x53, 0x80, 0x67, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // Segment + 0x15, 0x49, 0xa9, 0x66, 0xf2, // Info + 0x2a, 0xd7, 0xb1, 0x83, 0x0f, 0x42, 0x40, // TimecodeScale + 0x73, 0xa4, 0x84, 0x00, 0x01, 0x02, 0x03, // SegmentUID + 0x73, 0x84, 0x81, 0x00, // SegmentFilename + 0x7b, 0xa9, 0x9d, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x00, // Title + 0x4d, 0x80, 0x9d, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x00, // MuxingApp + 0x57, 0x41, 0x9d, 0x6b, 0x69, 0x6e, 0x65, 0x73, 0x69, 0x73, 0x76, 0x69, 0x64, 0x65, 0x6f, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x2e, 0x50, 0x72, 0x6f, 0x76, 0x69, 0x64, 0x65, 0x72, 0x00, // WritingApp + 0x16, 0x54, 0xae, 0x6b, 0x80, // Tracks + 0x1f, 0x43, 0xb6, 0x75, 0x01, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, // Cluster + 0xe7, 0x82, 0x00, 0x00, // Timecode + 0xa3, 0x86, 0x81, 0x00, 0x00, 0x00, 0x01, 0x02, // SimpleBlock + 0xa3, 0x86, 0x81, 0x00, 0x01, 0x00, 0x01, 0x02, // SimpleBlock + 0x12, 0x54, 0xc3, 0x67, 0x97, // Tags + 0x73, 0x73, 0x94, // Tag + 0x67, 0xc8, 0x91, // SimpleTag + 0x45, 0xa3, 0x89, 0x54, 0x45, 0x53, 0x54, 0x5f, 0x54, 0x41, 0x47, 0x00, // TagName + 0x44, 0x87, 0x82, 0x32, 0x00, // TagString + } + dump := fe.Dump() + dump[191], dump[192] = 0, 0 // clear Timecode + if !bytes.Equal(expectedDump, dump) { + t.Errorf("Expected dump:\n%v\ngot:\n%v", expectedDump, dump) + } + return true + }, + }, "DelayedKinesisErrorRetry": { mockServerOpts: func(t *testing.T, dropped map[uint64]bool, _ *bool, _ func()) []kvsm.KinesisVideoServerOption { return []kvsm.KinesisVideoServerOption{ @@ -157,7 +270,7 @@ func TestProvider(t *testing.T) { }), } }, - putMediaOpts: retryOpts, + putMediaOpts: []kvm.PutMediaOption{retryOpt}, expected: []kvsm.FragmentTest{expected0, expected0, expected1, expected1}, }, "DisconnectRetry": { @@ -176,7 +289,7 @@ func TestProvider(t *testing.T) { }), } }, - putMediaOpts: retryOpts, + putMediaOpts: []kvm.PutMediaOption{retryOpt}, expected: []kvsm.FragmentTest{expected0, expected1}, }, "DelayedDisconnectRetry": { @@ -196,7 +309,7 @@ func TestProvider(t *testing.T) { }), } }, - putMediaOpts: retryOpts, + putMediaOpts: []kvm.PutMediaOption{retryOpt}, expected: []kvsm.FragmentTest{expected0, expected1}, }, } @@ -205,7 +318,6 @@ func TestProvider(t *testing.T) { testCase := testCase t.Run(name, func(t *testing.T) { var wg sync.WaitGroup - defer wg.Wait() dropped := make(map[uint64]bool) var disconnected bool @@ -239,6 +351,7 @@ func TestProvider(t *testing.T) { Timecode: tc, Block: newBlock(0), } + time.Sleep(10 * time.Millisecond) } }() @@ -262,25 +375,31 @@ func TestProvider(t *testing.T) { } }() - cnt := 0 - var err error + var cntErr, cntTag uint32 + var skipBelow uint32 opts := []kvm.PutMediaOption{ kvm.WithFragmentTimecodeType(kvm.FragmentTimecodeTypeRelative), kvm.WithProducerStartTimestamp(startTimestamp), kvm.WithTags(func() []kvm.SimpleTag { - cnt++ + cnt := atomic.AddUint32(&cntTag, 1) return []kvm.SimpleTag{ {TagName: "TEST_TAG", TagString: fmt.Sprintf("%d", cnt)}, } }), kvm.OnError(func(e error) { - err = e + if testCase.errCheck != nil { + cnt := atomic.AddUint32(&cntErr, 1) + if testCase.errCheck(t, int(cnt), e) { + atomic.StoreUint32(&skipBelow, 1) + } + } }), } opts = append(opts, testCase.putMediaOpts...) pro.PutMedia(ch, chResp, opts...) - if err != nil { - t.Fatalf("Failed to run PutMedia: %v", err) + wg.Wait() + if skipBelow == 1 { + return } <-ctx.Done() diff --git a/putresp.go b/putresp.go index 140eeb4..9f5c5ab 100644 --- a/putresp.go +++ b/putresp.go @@ -15,6 +15,7 @@ package kinesisvideomanager import ( + "encoding/base64" "encoding/json" "fmt" "io" @@ -52,6 +53,8 @@ type FragmentEvent struct { FragmentNumber string // 158-bit number, handle as string ErrorId ErrorID ErrorCode string + + fragmentHead []byte } func (e *FragmentEvent) IsError() bool { @@ -62,11 +65,19 @@ func (e *FragmentEvent) Error() string { if e.EventType != "ERROR" { panic("non-error FragmentEvent is used as error") } - return fmt.Sprintf(`fragment event error: { Timecode: %d, FragmentNumber: %s, ErrorId: %d, ErrorCode: "%s" }`, - e.FragmentTimecode, e.FragmentNumber, e.ErrorId, e.ErrorCode, + var dump string + if len(e.fragmentHead) > 0 { + dump = `, Data: "` + base64.RawStdEncoding.EncodeToString(e.fragmentHead) + `"` + } + return fmt.Sprintf(`fragment event error: { Timecode: %d, FragmentNumber: %s, ErrorId: %d, ErrorCode: "%s"%s }`, + e.FragmentTimecode, e.FragmentNumber, e.ErrorId, e.ErrorCode, dump, ) } +func (e *FragmentEvent) Dump() []byte { + return e.fragmentHead +} + func parseFragmentEvent(r io.Reader) ([]FragmentEvent, error) { dec := json.NewDecoder(r) var ret []FragmentEvent diff --git a/putresp_test.go b/putresp_test.go index 90ec087..1424ecf 100644 --- a/putresp_test.go +++ b/putresp_test.go @@ -36,6 +36,13 @@ func TestFragmentEvent(t *testing.T) { if s := fe[0].Error(); s != expected { t.Errorf("Expected error string:\n%s\ngot:\n%s", expected, s) } + + fe[0].fragmentHead = []byte("test") + + expected2 := `fragment event error: { Timecode: 12345, FragmentNumber: 91343852333754009371412493862204112772176002064, ErrorId: 5000, ErrorCode: "DUMMY_ERROR", Data: "dGVzdA" }` + if s := fe[0].Error(); s != expected2 { + t.Errorf("Expected error string:\n%s\ngot:\n%s", expected2, s) + } }) t.Run("ParseError", func(t *testing.T) { _, err := parseFragmentEvent(strings.NewReader("{"))