11package metric
22
33import (
4+ "runtime"
45 "sync"
56 "sync/atomic"
6- "runtime"
77)
88
99// An almost lock-free FIFO buffer
1010// Locks are only used when flushing
1111
12- const bufferMaskBits = 8 // determines the size of the buffer
13- const bufferSize = uint64 (1 ) << bufferMaskBits
14- const bufferMask = bufferSize - 1
12+ const bufferMaskBits = 8 // determines the size of the buffer
13+ const bufferSize = uint64 (1 ) << bufferMaskBits
14+ const bufferMask = bufferSize - 1
1515
16- const indexStart = 0
16+ const indexStart = 0
1717
1818type event struct {
1919 seq uint64
2020 val uint64 // union of 64-bit numeric values including float64
21- mu sync.Mutex
22- cv * sync.Cond
21+ mu sync.Mutex
22+ cv * sync.Cond
2323}
2424
2525type dequeueFunc func (f Sink , val uint64 )
2626
2727// A generic stream of values which all have to be propagated to the sink.
2828type eventStream struct {
29- widx uint64 // index of next free slot
30- ridx uint64 // index of next unread slot
29+ widx uint64 // index of next free slot
30+ ridx uint64 // index of next unread slot
3131 slots [bufferSize ]event
3232
3333 flusher Flusher
3434
3535 dequeue dequeueFunc
3636
37- name string
37+ name string
3838 mtype int
3939}
4040
41-
4241func (c * Client ) newEventStream (name string , mtype int , dqf dequeueFunc ) * eventStream {
43- e := & eventStream {name : name , mtype : mtype , dequeue : dqf , widx : indexStart , ridx : indexStart }
42+ e := & eventStream {name : name , mtype : mtype , dequeue : dqf , widx : indexStart , ridx : indexStart }
4443
4544 // make sure first slot is not valid from the start due to zero-value
4645 // and set all sequences to their "old" value
4746 for i := range e .slots {
48- e .slots [i ].seq = uint64 (i )- bufferSize
47+ e .slots [i ].seq = uint64 (i ) - bufferSize
4948 e .slots [i ].cv = sync .NewCond (& (e .slots [i ].mu ))
5049 }
5150 return e
@@ -56,7 +55,7 @@ func (e *eventStream) SetFlusher(f Flusher) {
5655}
5756
5857// Flush as much as possible
59- func (e * eventStream ) Flush (f Sink ) {
58+ func (e * eventStream ) Flush (f Sink ) {
6059
6160 var idx uint64
6261
@@ -80,20 +79,19 @@ func (e* eventStream) Flush(f Sink) {
8079 atomic .StoreUint64 (& (e .ridx ), ridx )
8180}
8281
83-
8482func (e * eventStream ) Name () string {
8583 return e .name
8684}
8785
8886func (e * eventStream ) MeterType () int {
89- return e .mtype
87+ return e .mtype
9088}
9189
9290func (e * eventStream ) Enqueue (val uint64 ) {
9391
94- var ridx uint64
95- var widx uint64
96- var idx uint64
92+ var ridx uint64
93+ var widx uint64
94+ var idx uint64
9795
9896 // First get a slot
9997 widx = atomic .AddUint64 (& (e .widx ), 1 )
@@ -113,7 +111,7 @@ func (e *eventStream) Enqueue(val uint64) {
113111 // We have not catched up
114112 e .slots [idx ].val = val
115113 // mark the slot written
116- oldmark := atomic .SwapUint64 (& (e .slots [idx ].seq ),widx )
114+ oldmark := atomic .SwapUint64 (& (e .slots [idx ].seq ), widx )
117115
118116 // test to see if someone was waiting for that mark
119117 if oldmark != widx - bufferSize {
@@ -135,16 +133,16 @@ func (e *eventStream) Enqueue(val uint64) {
135133 // we need to make an atomic operation which:
136134 // 1) Decides whether to go to sleep and wait for our slot to be ready.
137135 // 2) Informs the writer of the slot that we want to be woken.
138-
136+
139137 // The slot we are waiting for have sequence 1 buffersize back from rdix,
140138 // if it's still not ready
141139
142- oldmark := ridx - bufferSize
140+ oldmark := ridx - bufferSize
143141
144142 idx2 := ridx & bufferMask // from here on we look at the stale read index.
145143 e .slots [idx2 ].mu .Lock ()
146144 // Try skew the mark to indicate we're waiting
147- mustwait := atomic .CompareAndSwapUint64 (& (e .slots [idx2 ].seq ),oldmark ,oldmark + 1 )
145+ mustwait := atomic .CompareAndSwapUint64 (& (e .slots [idx2 ].seq ), oldmark , oldmark + 1 )
148146 if mustwait {
149147 // We have now at the same time determined that the slot is not ready
150148 // and set it to indicate that who ever writes it must signal us.
@@ -159,12 +157,12 @@ func (e *eventStream) Enqueue(val uint64) {
159157 if actualmark == oldmark + 1 {
160158 // skewed - join the waiters.
161159 e .slots [idx2 ].cv .Wait ()
162- } else { // stuff can happen fast
160+ } else { // stuff can happen fast
163161 // The slot was actually up to date - so advance the reader
164162 e .flusher .FlushMeter (e )
165163 }
166164 }
167165 e .slots [idx2 ].mu .Unlock ()
168-
166+
169167 }
170168}
0 commit comments