Skip to content

Commit 3effe48

Browse files
committed
Add awslogs driver for Amazon CloudWatch Logs
Signed-off-by: Samuel Karp <[email protected]>
1 parent 8543336 commit 3effe48

File tree

15 files changed

+1072
-7
lines changed

15 files changed

+1072
-7
lines changed

contrib/completion/bash/docker

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,7 @@ __docker_log_driver_options() {
279279
local gelf_options="gelf-address gelf-tag"
280280
local json_file_options="max-file max-size"
281281
local syslog_options="syslog-address syslog-facility syslog-tag"
282+
local awslogs_options="awslogs-region awslogs-group awslogs-stream"
282283

283284
case $(__docker_value_of_option --log-driver) in
284285
'')
@@ -296,6 +297,9 @@ __docker_log_driver_options() {
296297
syslog)
297298
COMPREPLY=( $( compgen -W "$syslog_options" -S = -- "$cur" ) )
298299
;;
300+
awslogs)
301+
COMPREPLY=( $( compgen -W "$awslogs_options" -S = -- "$cur" ) )
302+
;;
299303
*)
300304
return
301305
;;

contrib/completion/zsh/_docker

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ __docker_subcommand() {
238238
"($help)--ipc=-[IPC namespace to use]:IPC namespace: "
239239
"($help)*--link=-[Add link to another container]:link:->link"
240240
"($help)*"{-l,--label=-}"[Set meta data on a container]:label: "
241-
"($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd none)"
241+
"($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd awslogs none)"
242242
"($help)*--log-opt=-[Log driver specific options]:log driver options: "
243243
"($help)*--lxc-conf=-[Add custom lxc options]:lxc options: "
244244
"($help)--mac-address=-[Container MAC address]:MAC address: "
@@ -617,7 +617,7 @@ _docker() {
617617
"($help)--ipv6[Enable IPv6 networking]" \
618618
"($help -l --log-level)"{-l,--log-level=-}"[Set the logging level]:level:(debug info warn error fatal)" \
619619
"($help)*--label=-[Set key=value labels to the daemon]:label: " \
620-
"($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd none)" \
620+
"($help)--log-driver=-[Default driver for container logs]:Logging driver:(json-file syslog journald gelf fluentd awslogs none)" \
621621
"($help)*--log-opt=-[Log driver specific options]:log driver options: " \
622622
"($help)--mtu=-[Set the containers network MTU]:mtu:(0 576 1420 1500 9000)" \
623623
"($help -p --pidfile)"{-p,--pidfile=-}"[Path to use for daemon PID file]:PID file:_files" \

daemon/logdrivers_linux.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package daemon
33
import (
44
// Importing packages here only to make sure their init gets called and
55
// therefore they register themselves to the logdriver factory.
6+
_ "github.com/docker/docker/daemon/logger/awslogs"
67
_ "github.com/docker/docker/daemon/logger/fluentd"
78
_ "github.com/docker/docker/daemon/logger/gelf"
89
_ "github.com/docker/docker/daemon/logger/journald"

daemon/logdrivers_windows.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@ package daemon
33
import (
44
// Importing packages here only to make sure their init gets called and
55
// therefore they register themselves to the logdriver factory.
6+
_ "github.com/docker/docker/daemon/logger/awslogs"
67
_ "github.com/docker/docker/daemon/logger/jsonfilelog"
78
)
Lines changed: 326 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,326 @@
1+
// Package awslogs provides the logdriver for forwarding container logs to Amazon CloudWatch Logs
2+
package awslogs
3+
4+
import (
5+
"fmt"
6+
"os"
7+
"sort"
8+
"strings"
9+
"sync"
10+
"time"
11+
12+
"github.com/aws/aws-sdk-go/aws"
13+
"github.com/aws/aws-sdk-go/aws/awserr"
14+
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
15+
"github.com/docker/docker/daemon/logger"
16+
"github.com/docker/docker/vendor/src/github.com/Sirupsen/logrus"
17+
)
18+
19+
const (
20+
name = "awslogs"
21+
regionKey = "awslogs-region"
22+
regionEnvKey = "AWS_REGION"
23+
logGroupKey = "awslogs-group"
24+
logStreamKey = "awslogs-stream"
25+
batchPublishFrequency = 5 * time.Second
26+
27+
// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
28+
perEventBytes = 26
29+
maximumBytesPerPut = 1048576
30+
maximumLogEventsPerPut = 10000
31+
32+
// See: http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/cloudwatch_limits.html
33+
maximumBytesPerEvent = 262144 - perEventBytes
34+
35+
resourceAlreadyExistsCode = "ResourceAlreadyExistsException"
36+
dataAlreadyAcceptedCode = "DataAlreadyAcceptedException"
37+
invalidSequenceTokenCode = "InvalidSequenceTokenException"
38+
)
39+
40+
type logStream struct {
41+
logStreamName string
42+
logGroupName string
43+
client api
44+
messages chan *logger.Message
45+
lock sync.RWMutex
46+
closed bool
47+
sequenceToken *string
48+
}
49+
50+
type api interface {
51+
CreateLogStream(*cloudwatchlogs.CreateLogStreamInput) (*cloudwatchlogs.CreateLogStreamOutput, error)
52+
PutLogEvents(*cloudwatchlogs.PutLogEventsInput) (*cloudwatchlogs.PutLogEventsOutput, error)
53+
}
54+
55+
type byTimestamp []*cloudwatchlogs.InputLogEvent
56+
57+
// init registers the awslogs driver and sets the default region, if provided
58+
func init() {
59+
if os.Getenv(regionEnvKey) != "" {
60+
aws.DefaultConfig.Region = aws.String(os.Getenv(regionEnvKey))
61+
}
62+
if err := logger.RegisterLogDriver(name, New); err != nil {
63+
logrus.Fatal(err)
64+
}
65+
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
66+
logrus.Fatal(err)
67+
}
68+
}
69+
70+
// New creates an awslogs logger using the configuration passed in on the
71+
// context. Supported context configuration variables are awslogs-region,
72+
// awslogs-group, and awslogs-stream. When available, configuration is
73+
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
74+
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
75+
// the EC2 Instance Metadata Service.
76+
func New(ctx logger.Context) (logger.Logger, error) {
77+
logGroupName := ctx.Config[logGroupKey]
78+
logStreamName := ctx.ContainerID
79+
if ctx.Config[logStreamKey] != "" {
80+
logStreamName = ctx.Config[logStreamKey]
81+
}
82+
config := aws.DefaultConfig
83+
if ctx.Config[regionKey] != "" {
84+
config = aws.DefaultConfig.Merge(&aws.Config{
85+
Region: aws.String(ctx.Config[regionKey]),
86+
})
87+
}
88+
containerStream := &logStream{
89+
logStreamName: logStreamName,
90+
logGroupName: logGroupName,
91+
client: cloudwatchlogs.New(config),
92+
messages: make(chan *logger.Message, 4096),
93+
}
94+
err := containerStream.create()
95+
if err != nil {
96+
return nil, err
97+
}
98+
go containerStream.collectBatch()
99+
100+
return containerStream, nil
101+
}
102+
103+
// Name returns the name of the awslogs logging driver
104+
func (l *logStream) Name() string {
105+
return name
106+
}
107+
108+
// Log submits messages for logging by an instance of the awslogs logging driver
109+
func (l *logStream) Log(msg *logger.Message) error {
110+
l.lock.RLock()
111+
defer l.lock.RUnlock()
112+
if !l.closed {
113+
l.messages <- msg
114+
}
115+
return nil
116+
}
117+
118+
// Close closes the instance of the awslogs logging driver
119+
func (l *logStream) Close() error {
120+
l.lock.Lock()
121+
defer l.lock.Unlock()
122+
if !l.closed {
123+
close(l.messages)
124+
}
125+
l.closed = true
126+
return nil
127+
}
128+
129+
// create creates a log stream for the instance of the awslogs logging driver
130+
func (l *logStream) create() error {
131+
input := &cloudwatchlogs.CreateLogStreamInput{
132+
LogGroupName: aws.String(l.logGroupName),
133+
LogStreamName: aws.String(l.logStreamName),
134+
}
135+
136+
_, err := l.client.CreateLogStream(input)
137+
138+
if err != nil {
139+
if awsErr, ok := err.(awserr.Error); ok {
140+
fields := logrus.Fields{
141+
"errorCode": awsErr.Code(),
142+
"message": awsErr.Message(),
143+
"origError": awsErr.OrigErr(),
144+
"logGroupName": l.logGroupName,
145+
"logStreamName": l.logStreamName,
146+
}
147+
if awsErr.Code() == resourceAlreadyExistsCode {
148+
// Allow creation to succeed
149+
logrus.WithFields(fields).Info("Log stream already exists")
150+
return nil
151+
}
152+
logrus.WithFields(fields).Error("Failed to create log stream")
153+
}
154+
}
155+
return err
156+
}
157+
158+
// newTicker is used for time-based batching. newTicker is a variable such
159+
// that the implementation can be swapped out for unit tests.
160+
var newTicker = func(freq time.Duration) *time.Ticker {
161+
return time.NewTicker(freq)
162+
}
163+
164+
// collectBatch executes as a goroutine to perform batching of log events for
165+
// submission to the log stream. Batching is performed on time- and size-
166+
// bases. Time-based batching occurs at a 5 second interval (defined in the
167+
// batchPublishFrequency const). Size-based batching is performed on the
168+
// maximum number of events per batch (defined in maximumLogEventsPerPut) and
169+
// the maximum number of total bytes in a batch (defined in
170+
// maximumBytesPerPut). Log messages are split by the maximum bytes per event
171+
// (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
172+
// (defined in perEventBytes) which is accounted for in split- and batch-
173+
// calculations.
174+
func (l *logStream) collectBatch() {
175+
timer := newTicker(batchPublishFrequency)
176+
var events []*cloudwatchlogs.InputLogEvent
177+
bytes := 0
178+
for {
179+
select {
180+
case <-timer.C:
181+
l.publishBatch(events)
182+
events = events[:0]
183+
bytes = 0
184+
case msg, more := <-l.messages:
185+
if !more {
186+
l.publishBatch(events)
187+
return
188+
}
189+
unprocessedLine := msg.Line
190+
for len(unprocessedLine) > 0 {
191+
// Split line length so it does not exceed the maximum
192+
lineBytes := len(unprocessedLine)
193+
if lineBytes > maximumBytesPerEvent {
194+
lineBytes = maximumBytesPerEvent
195+
}
196+
line := unprocessedLine[:lineBytes]
197+
unprocessedLine = unprocessedLine[lineBytes:]
198+
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
199+
// Publish an existing batch if it's already over the maximum number of events or if adding this
200+
// event would push it over the maximum number of total bytes.
201+
l.publishBatch(events)
202+
events = events[:0]
203+
bytes = 0
204+
}
205+
events = append(events, &cloudwatchlogs.InputLogEvent{
206+
Message: aws.String(string(line)),
207+
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
208+
})
209+
bytes += (lineBytes + perEventBytes)
210+
}
211+
}
212+
}
213+
}
214+
215+
// publishBatch calls PutLogEvents for a given set of InputLogEvents,
216+
// accounting for sequencing requirements (each request must reference the
217+
// sequence token returned by the previous request).
218+
func (l *logStream) publishBatch(events []*cloudwatchlogs.InputLogEvent) {
219+
if len(events) == 0 {
220+
return
221+
}
222+
223+
sort.Sort(byTimestamp(events))
224+
225+
nextSequenceToken, err := l.putLogEvents(events, l.sequenceToken)
226+
227+
if err != nil {
228+
if awsErr, ok := err.(awserr.Error); ok {
229+
if awsErr.Code() == dataAlreadyAcceptedCode {
230+
// already submitted, just grab the correct sequence token
231+
parts := strings.Split(awsErr.Message(), " ")
232+
nextSequenceToken = &parts[len(parts)-1]
233+
logrus.WithFields(logrus.Fields{
234+
"errorCode": awsErr.Code(),
235+
"message": awsErr.Message(),
236+
"logGroupName": l.logGroupName,
237+
"logStreamName": l.logStreamName,
238+
}).Info("Data already accepted, ignoring error")
239+
err = nil
240+
} else if awsErr.Code() == invalidSequenceTokenCode {
241+
// sequence code is bad, grab the correct one and retry
242+
parts := strings.Split(awsErr.Message(), " ")
243+
token := parts[len(parts)-1]
244+
nextSequenceToken, err = l.putLogEvents(events, &token)
245+
}
246+
}
247+
}
248+
if err != nil {
249+
logrus.Error(err)
250+
} else {
251+
l.sequenceToken = nextSequenceToken
252+
}
253+
}
254+
255+
// putLogEvents wraps the PutLogEvents API
256+
func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenceToken *string) (*string, error) {
257+
input := &cloudwatchlogs.PutLogEventsInput{
258+
LogEvents: events,
259+
SequenceToken: sequenceToken,
260+
LogGroupName: aws.String(l.logGroupName),
261+
LogStreamName: aws.String(l.logStreamName),
262+
}
263+
resp, err := l.client.PutLogEvents(input)
264+
if err != nil {
265+
if awsErr, ok := err.(awserr.Error); ok {
266+
logrus.WithFields(logrus.Fields{
267+
"errorCode": awsErr.Code(),
268+
"message": awsErr.Message(),
269+
"origError": awsErr.OrigErr(),
270+
"logGroupName": l.logGroupName,
271+
"logStreamName": l.logStreamName,
272+
}).Error("Failed to put log events")
273+
}
274+
return nil, err
275+
}
276+
return resp.NextSequenceToken, nil
277+
}
278+
279+
// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
280+
// awslogs-group, and awslogs-stream
281+
func ValidateLogOpt(cfg map[string]string) error {
282+
for key := range cfg {
283+
switch key {
284+
case logGroupKey:
285+
case logStreamKey:
286+
case regionKey:
287+
default:
288+
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
289+
}
290+
}
291+
if cfg[logGroupKey] == "" {
292+
return fmt.Errorf("must specify a value for log opt '%s'", logGroupKey)
293+
}
294+
if cfg[regionKey] == "" && os.Getenv(regionEnvKey) == "" {
295+
return fmt.Errorf(
296+
"must specify a value for environment variable '%s' or log opt '%s'",
297+
regionEnvKey,
298+
regionKey)
299+
}
300+
return nil
301+
}
302+
303+
// Len returns the length of a byTimestamp slice. Len is required by the
304+
// sort.Interface interface.
305+
func (slice byTimestamp) Len() int {
306+
return len(slice)
307+
}
308+
309+
// Less compares two values in a byTimestamp slice by Timestamp. Less is
310+
// required by the sort.Interface interface.
311+
func (slice byTimestamp) Less(i, j int) bool {
312+
iTimestamp, jTimestamp := int64(0), int64(0)
313+
if slice != nil && slice[i].Timestamp != nil {
314+
iTimestamp = *slice[i].Timestamp
315+
}
316+
if slice != nil && slice[j].Timestamp != nil {
317+
jTimestamp = *slice[j].Timestamp
318+
}
319+
return iTimestamp < jTimestamp
320+
}
321+
322+
// Swap swaps two values in a byTimestamp slice with each other. Swap is
323+
// required by the sort.Interface interface.
324+
func (slice byTimestamp) Swap(i, j int) {
325+
slice[i], slice[j] = slice[j], slice[i]
326+
}

0 commit comments

Comments
 (0)