Skip to content
This repository was archived by the owner on Jun 2, 2022. It is now read-only.

Commit 90a1f33

Browse files
authored
Merge pull request #722 from MikaelSmith/fix-tail-buffering
Handle buffering incomplete lines in Stream output
2 parents 25ca265 + 75dd112 commit 90a1f33

File tree

2 files changed

+116
-5
lines changed

2 files changed

+116
-5
lines changed

cmd/tail.go

+41-5
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
package cmd
22

33
import (
4+
"bytes"
45
"io"
56
"io/ioutil"
67
"os"
78
"os/exec"
8-
"strings"
99
"time"
1010

1111
"github.com/Benchkram/errz"
@@ -35,15 +35,48 @@ type line struct {
3535

3636
type lineWriter struct {
3737
name string
38+
buf bytes.Buffer
3839
out chan line
3940
}
4041

41-
func (w lineWriter) Write(b []byte) (int, error) {
42-
s := strings.TrimSuffix(string(b), "\n")
43-
w.out <- line{Line: tail.Line{Text: s, Time: time.Now()}, source: w.name}
42+
func (w *lineWriter) Write(b []byte) (int, error) {
43+
// Buffer lines, then submit all completed lines to the output channel. For incomplete lines
44+
// we just return the number of bytes written. Call Finish() when done writing to ensure any
45+
// final line without line endings are also written to the output channel.
46+
w.buf.Write(b)
47+
i := bytes.LastIndexAny(w.buf.Bytes(), "\r\n")
48+
if i == -1 {
49+
// Incomplete line, so just return.
50+
return len(b), nil
51+
}
52+
53+
// Completed line. Remove line endings from the buffer and text (in case of \r\n) then submit it.
54+
text := w.buf.Next(i)
55+
56+
// Consume \r or \n. Note that the Buffer takes care of re-using space when we catch up.
57+
crOrLf, err := w.buf.ReadByte()
58+
if err != nil {
59+
// Impossible because the next character was already found to be a \r or \n.
60+
panic(err)
61+
}
62+
63+
// If the last character was \n, we could have had \r\n. We want just the line without line
64+
// endings so check if the previous character was \r and if so remove it.
65+
if last := len(text) - 1; last >= 0 && crOrLf == '\n' && text[last] == '\r' {
66+
text = text[:last]
67+
}
68+
69+
w.out <- line{Line: tail.Line{Text: string(text), Time: time.Now()}, source: w.name}
4470
return len(b), nil
4571
}
4672

73+
func (w *lineWriter) Finish() {
74+
if w.buf.Len() > 0 {
75+
// Write remainder because it didn't end in a newline.
76+
w.out <- line{Line: tail.Line{Text: w.buf.String(), Time: time.Now()}, source: w.name}
77+
}
78+
}
79+
4780
// Streams output via API to aggregator channel.
4881
// Returns nil if streaming's not supported on this path.
4982
func tailStream(conn client.Client, agg chan line, path string) io.Closer {
@@ -63,9 +96,12 @@ func tailStream(conn client.Client, agg chan line, path string) io.Closer {
6396

6497
// Start copying the stream to the aggregate channel
6598
go func() {
66-
_, err := io.Copy(lineWriter{name: path, out: agg}, stream)
99+
lw := lineWriter{name: path, out: agg}
100+
_, err := io.Copy(&lw, stream)
67101
if err != nil {
68102
agg <- line{Line: tail.Line{Time: time.Now(), Err: err}, source: path}
103+
} else {
104+
lw.Finish()
69105
}
70106
}()
71107
return stream

cmd/tail_test.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package cmd
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/stretchr/testify/assert"
8+
)
9+
10+
func TestLineWriter(t *testing.T) {
11+
// Setup a writer with test message and output channel. Also mark time before any messages.
12+
const msg = "a complete line"
13+
out := make(chan line, 1)
14+
lw := lineWriter{name: "mine", out: out}
15+
16+
// Write a message, mark after it's written but before we finish to verify Write produced it.
17+
start := time.Now()
18+
validWrite(t, &lw, msg+"\n")
19+
mark := time.Now()
20+
lw.Finish()
21+
assertLine(t, out, "mine", msg, start, mark)
22+
23+
start = time.Now()
24+
validWrite(t, &lw, msg+"\r")
25+
mark = time.Now()
26+
lw.Finish()
27+
assertLine(t, out, "mine", msg, start, mark)
28+
29+
// Classic Windows endings, e.g. CRLF
30+
start = time.Now()
31+
validWrite(t, &lw, msg+"\r\n")
32+
mark = time.Now()
33+
lw.Finish()
34+
assertLine(t, out, "mine", msg, start, mark)
35+
36+
// Test message split over multiple writes
37+
start = time.Now()
38+
split := len(msg) / 2
39+
validWrite(t, &lw, msg[:split])
40+
validWrite(t, &lw, msg[split:])
41+
validWrite(t, &lw, "\r\n")
42+
mark = time.Now()
43+
lw.Finish()
44+
assertLine(t, out, "mine", msg, start, mark)
45+
46+
// Test multiple lines, with no newline on last one
47+
start = time.Now()
48+
validWrite(t, &lw, msg)
49+
validWrite(t, &lw, "\r")
50+
assertLine(t, out, "mine", msg, start, time.Now())
51+
start = time.Now()
52+
validWrite(t, &lw, msg)
53+
validWrite(t, &lw, "\n")
54+
assertLine(t, out, "mine", msg, start, time.Now())
55+
start = time.Now()
56+
validWrite(t, &lw, msg)
57+
lw.Finish()
58+
assertLine(t, out, "mine", msg, start, time.Now())
59+
}
60+
61+
func validWrite(t *testing.T, lw *lineWriter, msg string) {
62+
n, err := lw.Write([]byte(msg))
63+
assert.NoError(t, err)
64+
assert.Equal(t, len(msg), n)
65+
}
66+
67+
func assertLine(t *testing.T, out <-chan line, source, msg string, before, after time.Time) {
68+
ln, ok := <-out
69+
assert.True(t, ok)
70+
assert.NoError(t, ln.Err)
71+
assert.Equal(t, "mine", ln.source)
72+
assert.Equal(t, msg, ln.Text)
73+
assert.True(t, before.Before(ln.Time))
74+
assert.True(t, after.After(ln.Time))
75+
}

0 commit comments

Comments
 (0)