@@ -32,14 +32,17 @@ type patchArgs struct {
3232}
3333
3434func patchLinearizableOperations (operations []porcupine.Operation , reports []report.ClientReport , persistedRequests []model.EtcdRequest ) []porcupine.Operation {
35- putRevision , delRevision := watchRevisions (reports )
35+ putRevision := watchRevisions (reports )
3636 persistedPutCount := countPersistedPuts (persistedRequests )
3737 clientPutCount := countClientPuts (reports )
3838
3939 persistedDeleteCount := countPersistedDeletes (persistedRequests )
4040 clientDeleteCount := countClientDeletes (reports )
4141
42- putReturnTime , delReturnTime := uniqueOperationReturnTime (operations , persistedRequests , clientPutCount , clientDeleteCount )
42+ persistedCompactCount := countPersistedCompacts (persistedRequests )
43+ clientCompactCount := countClientCompacts (reports )
44+
45+ putReturnTime , delReturnTime , compactReturnTime := uniqueOperationReturnTime (operations , persistedRequests , clientPutCount , clientDeleteCount , clientCompactCount )
4346
4447 putArgs := make (map [model.PutOptions ]patchArgs )
4548 for opts , c := range clientPutCount {
@@ -56,18 +59,24 @@ func patchLinearizableOperations(operations []porcupine.Operation, reports []rep
5659 clientCount : c ,
5760 persistedCount : persistedDeleteCount [opts ],
5861 returnTime : delReturnTime [opts ],
59- revision : delRevision [opts ],
62+ }
63+ }
64+ compactArgs := make (map [model.CompactOptions ]patchArgs )
65+ for opts , c := range clientCompactCount {
66+ compactArgs [opts ] = patchArgs {
67+ clientCount : c ,
68+ persistedCount : persistedCompactCount [opts ],
69+ returnTime : compactReturnTime [opts ],
6070 }
6171 }
6272
6373 return patchOperations (
64- operations , putArgs , delArgs ,
74+ operations , putArgs , delArgs , compactArgs ,
6575 )
6676}
6777
68- func watchRevisions (reports []report.ClientReport ) (map [model.PutOptions ]int64 , map [model. DeleteOptions ] int64 ) {
78+ func watchRevisions (reports []report.ClientReport ) (map [model.PutOptions ]int64 ) {
6979 putRevisions := map [model.PutOptions ]int64 {}
70- delRevisions := map [model.DeleteOptions ]int64 {}
7180
7281 for _ , client := range reports {
7382 for _ , watch := range client .Watch {
@@ -79,27 +88,40 @@ func watchRevisions(reports []report.ClientReport) (map[model.PutOptions]int64,
7988 kv := model.PutOptions {Key : event .Key , Value : event .Value }
8089 putRevisions [kv ] = event .Revision
8190 case model .DeleteOperation :
82- // Delete events are also created by leaseRevoke rquest.
91+ // Don't track delete revisions - they can be triggered by leaseRevoke
92+ // So we can't reliably attribute them to explicit delete operations
93+ case model .CompactOperation :
8394 default :
8495 panic (fmt .Sprintf ("unknown event type %q" , event .Type ))
8596 }
8697 }
8798 }
8899 }
89100 }
90- return putRevisions , delRevisions
101+ return putRevisions
91102}
92103
93104func patchOperations (
94105 operations []porcupine.Operation ,
95106 putArgs map [model.PutOptions ]patchArgs ,
96107 delArgs map [model.DeleteOptions ]patchArgs ,
108+ compactArgs map [model.CompactOptions ]patchArgs ,
97109) []porcupine.Operation {
98110 newOperations := make ([]porcupine.Operation , 0 , len (operations ))
99111
100112 for _ , op := range operations {
101113 request := op .Input .(model.EtcdRequest )
102114 resp := op .Output .(model.MaybeEtcdResponse )
115+
116+ if request .Type == model .Compact {
117+ kv := model.CompactOptions {Revision : request .Compact .Revision }
118+ if arg , ok := compactArgs [kv ]; ok && arg .clientCount == 1 && arg .returnTime > 0 {
119+ op .Return = min (op .Return , arg .returnTime - 1 )
120+ }
121+ newOperations = append (newOperations , op )
122+ continue
123+ }
124+
103125 if resp .Error == "" || request .Type != model .Txn {
104126 // Cannot patch those requests.
105127 newOperations = append (newOperations , op )
@@ -150,9 +172,13 @@ func patchOperations(
150172 panic (fmt .Sprintf ("unknown operation type %q" , etcdOp .Type ))
151173 }
152174 }
175+ if request .Type == model .Compact {
176+ newOperations = append (newOperations , op )
177+ continue
178+ }
179+
153180 if isUniqueTxn (request .Txn , putArgs , delArgs ) {
154181 if ! persisted {
155- // Remove non persisted operations
156182 continue
157183 }
158184 if txnRevision != 0 {
@@ -198,16 +224,28 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation, putArgs map[model.PutOpt
198224 return true
199225 }
200226 case model .RangeOperation :
227+ case model .CompactOperation :
201228 default :
202229 panic (fmt .Sprintf ("unknown operation type %q" , operation .Type ))
203230 }
204231 }
205232 return false
206233}
207234
208- func uniqueOperationReturnTime (allOperations []porcupine.Operation , persistedRequests []model.EtcdRequest , clientPutCount map [model.PutOptions ]int64 , clientDeleteCount map [model.DeleteOptions ]int64 ) (map [model.PutOptions ]int64 , map [model.DeleteOptions ]int64 ) {
235+ func uniqueOperationReturnTime (
236+ allOperations []porcupine.Operation ,
237+ persistedRequests []model.EtcdRequest ,
238+ clientPutCount map [model.PutOptions ]int64 ,
239+ clientDeleteCount map [model.DeleteOptions ]int64 ,
240+ clientCompactCount map [model.CompactOptions ]int64 ,
241+ ) (
242+ map [model.PutOptions ]int64 ,
243+ map [model.DeleteOptions ]int64 ,
244+ map [model.CompactOptions ]int64 ,
245+ ) {
209246 putTimes := map [model.PutOptions ]int64 {}
210247 delTimes := map [model.DeleteOptions ]int64 {}
248+ compactTimes := map [model.CompactOptions ]int64 {}
211249 var lastReturnTime int64
212250 for _ , op := range allOperations {
213251 request := op .Input .(model.EtcdRequest )
@@ -237,6 +275,13 @@ func uniqueOperationReturnTime(allOperations []porcupine.Operation, persistedReq
237275 case model .LeaseGrant :
238276 case model .LeaseRevoke :
239277 case model .Compact :
278+ kv := model.CompactOptions {Revision : request .Compact .Revision }
279+ if clientCompactCount [kv ] > 1 {
280+ continue
281+ }
282+ if returnTime , ok := compactTimes [kv ]; ! ok || returnTime > op .Return {
283+ compactTimes [kv ] = op .Return
284+ }
240285 case model .Defragment :
241286 default :
242287 panic (fmt .Sprintf ("Unknown request type: %q" , request .Type ))
@@ -278,11 +323,19 @@ func uniqueOperationReturnTime(allOperations []porcupine.Operation, persistedReq
278323 case model .LeaseGrant :
279324 case model .LeaseRevoke :
280325 case model .Compact :
326+ kv := model.CompactOptions {Revision : request .Compact .Revision }
327+ if clientCompactCount [kv ] > 1 {
328+ continue
329+ }
330+ if returnTime , ok := compactTimes [kv ]; ok {
331+ lastReturnTime = min (returnTime , lastReturnTime )
332+ compactTimes [kv ] = lastReturnTime
333+ }
281334 default :
282335 panic (fmt .Sprintf ("Unknown request type: %q" , request .Type ))
283336 }
284337 }
285- return putTimes , delTimes
338+ return putTimes , delTimes , compactTimes
286339}
287340
288341func countClientPuts (reports []report.ClientReport ) map [model.PutOptions ]int64 {
@@ -313,6 +366,7 @@ func countPuts(counter map[model.PutOptions]int64, request model.EtcdRequest) {
313366 kv := model.PutOptions {Key : operation .Put .Key , Value : operation .Put .Value }
314367 counter [kv ]++
315368 case model .DeleteOperation :
369+ case model .CompactOperation :
316370 case model .RangeOperation :
317371 default :
318372 panic (fmt .Sprintf ("unknown operation type %q" , operation .Type ))
@@ -357,3 +411,32 @@ func countDeletes(counter map[model.DeleteOptions]int64, request model.EtcdReque
357411 }
358412 }
359413}
414+
415+ func countClientCompacts (reports []report.ClientReport ) map [model.CompactOptions ]int64 {
416+ counter := map [model.CompactOptions ]int64 {}
417+ for _ , client := range reports {
418+ for _ , op := range client .KeyValue {
419+ request := op .Input .(model.EtcdRequest )
420+ countCompacts (counter , request )
421+ }
422+ }
423+ return counter
424+ }
425+
426+ func countPersistedCompacts (requests []model.EtcdRequest ) map [model.CompactOptions ]int64 {
427+ counter := map [model.CompactOptions ]int64 {}
428+ for _ , req := range requests {
429+ countCompacts (counter , req )
430+ }
431+ return counter
432+ }
433+
434+ func countCompacts (counter map [model.CompactOptions ]int64 , request model.EtcdRequest ) {
435+ switch request .Type {
436+ case model .Compact :
437+ opts := model.CompactOptions {
438+ Revision : request .Compact .Revision ,
439+ }
440+ counter [opts ]++
441+ }
442+ }
0 commit comments