Skip to content

Commit

Permalink
Compressed histogram V2 support (#31)
Browse files Browse the repository at this point in the history
* [wip] Wip on V2 compressed encoding. Currently with differences on decode->encode histograms from other implementations

* [wip] Wip on V2 compressed encoding. Currently with differences on decode->encode histograms from other implementations

* [fix] Fix per PR review signature of getIntegerToDoubleValueConversionRatio() float64

* [add] Added Dump Load Merge test
  • Loading branch information
filipecosta90 authored Sep 27, 2020
1 parent 185cb5a commit 311d303
Show file tree
Hide file tree
Showing 13 changed files with 1,414 additions and 3 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.vscode/
.idea/
.DS_Store

coverage.txt

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fmt:
$(GOFMT) ./...

test: get fmt
$(GOTEST) -race -covermode=atomic ./...
$(GOTEST) -count=1 ./...

coverage: get test
$(GOTEST) -race -coverprofile=coverage.txt -covermode=atomic .
Expand Down
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
module github.com/HdrHistogram/hdrhistogram-go

go 1.14

require (
github.com/google/go-cmp v0.5.2
github.com/stretchr/testify v1.6.1
)
15 changes: 15 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.5.2 h1:X2ev0eStA3AbceY54o37/0PQ/UWqKEiiO2dKL5OPaFM=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
16 changes: 14 additions & 2 deletions hdr.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func (h *Histogram) ByteSize() int {
return 6*8 + 5*4 + len(h.counts)*8
}

func (h *Histogram) getNormalizingIndexOffset() int32 {
return 1
}

// Merge merges the data stored in the given histogram with the receiver,
// returning the number of recorded values which had to be dropped.
func (h *Histogram) Merge(from *Histogram) (dropped int64) {
Expand Down Expand Up @@ -232,12 +236,16 @@ func (h *Histogram) RecordValues(v, n int64) error {
if idx < 0 || int(h.countsLen) <= idx {
return fmt.Errorf("value %d is too large to be recorded", v)
}
h.counts[idx] += n
h.totalCount += n
h.setCountAtIndex(idx, n)

return nil
}

func (h *Histogram) setCountAtIndex(idx int, n int64) {
h.counts[idx] += n
h.totalCount += n
}

// ValueAtQuantile returns the recorded value at the given quantile (0..100).
func (h *Histogram) ValueAtQuantile(q float64) int64 {
if q > 100 {
Expand Down Expand Up @@ -455,6 +463,10 @@ func (h *Histogram) countsIndexFor(v int64) int {
return int(h.countsIndex(bucketIdx, subBucketIdx))
}

func (h *Histogram) getIntegerToDoubleValueConversionRatio() float64 {
return 1.0
}

type iterator struct {
h *Histogram
bucketIdx, subBucketIdx int32
Expand Down
280 changes: 280 additions & 0 deletions hdr_encoding.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
//Histograms are encoded using the HdrHistogram V2 format which is based on an adapted ZigZag LEB128 encoding where:
//consecutive zero counters are encoded as a negative number representing the count of consecutive zeros
//non zero counter values are encoded as a positive number
//An empty histogram (all zeros counters) is encoded in exactly 48 bytes regardless of the counter size.
//A typical histogram (2 digits precision 1 usec to 1 day range) can be encoded in less than the typical MTU size of 1500 bytes.
package hdrhistogram

import (
"bytes"
"compress/zlib"
"encoding/base64"
"encoding/binary"
"errors"
"fmt"
"io/ioutil"
)

const (
V2EncodingCookieBase int32 = 0x1c849303
V2CompressedEncodingCookieBase int32 = 0x1c849304
encodingCookie int32 = V2EncodingCookieBase | 0x10
compressedEncodingCookie int32 = V2CompressedEncodingCookieBase | 0x10

ENCODING_HEADER_SIZE = 40
)

// Encode returns a snapshot view of the Histogram.
// The snapshot is compact binary representations of the state of the histogram.
// They are intended to be used for archival or transmission to other systems for further analysis.
func (h *Histogram) Encode(version int32) (buffer []byte, err error) {
switch version {
case V2CompressedEncodingCookieBase:
buffer, err = h.dumpV2CompressedEncoding()
default:
err = fmt.Errorf("The provided enconding version %d is not supported.", version)
}
return
}

// Decode returns a new Histogram by decoding it from a String containing
// a base64 encoded compressed histogram representation.
func Decode(encoded []byte) (rh *Histogram, err error) {
var decoded []byte
decoded, err = base64.StdEncoding.DecodeString(string(encoded))
if err != nil {
return
}
rbuf := bytes.NewBuffer(decoded[0:8])
r32 := make([]int32, 2)
err = binary.Read(rbuf, binary.BigEndian, &r32)
if err != nil {
return
}
Cookie := r32[0] & ^0xf0
lengthOfCompressedContents := r32[1]
if Cookie != V2CompressedEncodingCookieBase {
err = fmt.Errorf("Encoding not supported, only V2 is supported. Got %d want %d", Cookie, V2CompressedEncodingCookieBase)
return
}
decodeLengthOfCompressedContents := int32(len(decoded[8:]))
if lengthOfCompressedContents > decodeLengthOfCompressedContents {
err = fmt.Errorf("The compressed contents buffer is smaller than the lengthOfCompressedContents. Got %d want %d", decodeLengthOfCompressedContents, lengthOfCompressedContents)
return
}
rh, err = decodeCompressedFormat(decoded[8:8+lengthOfCompressedContents], ENCODING_HEADER_SIZE)
return
}

// internal method to encode an histogram in V2 Compressed format
func (h *Histogram) dumpV2CompressedEncoding() (outBuffer []byte, err error) {
// final buffer
buf := new(bytes.Buffer)
err = binary.Write(buf, binary.BigEndian, compressedEncodingCookie)

toCompress, err := h.encodeIntoByteBuffer()
if err != nil {
return
}
uncompressedBytes := toCompress.Bytes()

var b bytes.Buffer
w, err := zlib.NewWriterLevel(&b, zlib.BestCompression)
if err != nil {
return
}
w.Write(uncompressedBytes)
w.Close()

// LengthOfCompressedContents
compressedContents := b.Bytes()
err = binary.Write(buf, binary.BigEndian, int32(len(compressedContents)))
err = binary.Write(buf, binary.BigEndian, compressedContents)
outBuffer = []byte(base64.StdEncoding.EncodeToString(buf.Bytes()))
return
}

func (h *Histogram) encodeIntoByteBuffer() (*bytes.Buffer, error) {

countsBytes, err := h.fillBufferFromCountsArray()
if err != nil {
return nil, err
}

toCompress := new(bytes.Buffer)
err = binary.Write(toCompress, binary.BigEndian, encodingCookie) // 0-3
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, int32(len(countsBytes))) // 3-7
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, h.getNormalizingIndexOffset()) // 8-11
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, int32(h.significantFigures)) // 12-15
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, h.lowestTrackableValue) // 16-23
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, h.highestTrackableValue) // 24-31
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, h.getIntegerToDoubleValueConversionRatio()) // 32-39
if err != nil {
return nil, err
}
err = binary.Write(toCompress, binary.BigEndian, countsBytes)
if err != nil {
return nil, err
}
return toCompress, err
}

func decodeCompressedFormat(compressedContents []byte, headerSize int) (rh *Histogram, err error) {
b := bytes.NewReader(compressedContents)
z, err := zlib.NewReader(b)
if err != nil {
return
}
defer z.Close()
decompressedSlice, err := ioutil.ReadAll(z)
if err != nil {
return
}
decompressedSliceLen := int32(len(decompressedSlice))
cookie, PayloadLength, _, NumberOfSignificantValueDigits, LowestTrackableValue, HighestTrackableValue, _, err := decodeDeCompressedHeaderFormat(decompressedSlice[0:headerSize])
if err != nil {
return
}
if cookie != V2EncodingCookieBase {
err = fmt.Errorf("Encoding not supported, only V2 is supported. Got %d want %d", cookie, V2EncodingCookieBase)
return
}
actualPayloadLen := decompressedSliceLen - int32(headerSize)
if PayloadLength != actualPayloadLen {
err = errors.New(fmt.Sprintf("PayloadLength should have the same size of the actual payload. Got %d want %d", actualPayloadLen, PayloadLength))
return
}
rh = New(LowestTrackableValue, HighestTrackableValue, int(NumberOfSignificantValueDigits))
payload := decompressedSlice[headerSize:]
fillCountsArrayFromSourceBuffer(payload, rh)
return rh, err
}

func fillCountsArrayFromSourceBuffer(payload []byte, rh *Histogram) {
var payloadSlicePos = 0
var dstIndex int64 = 0
for payloadSlicePos < len(payload) {
var zerosCount int64 = 0
count, n := zig_zag_decode_i64(payload[payloadSlicePos:])
payloadSlicePos += n
if count < 0 {
zerosCount = -count
dstIndex += zerosCount
} else {
rh.setCountAtIndex(int(dstIndex), count)
dstIndex += 1
}
}
}

func (rh *Histogram) fillBufferFromCountsArray() (buffer []byte, err error) {
buf := new(bytes.Buffer)
// V2 encoding format uses a ZigZag LEB128-64b9B encoded long. Positive values are counts,
// while negative values indicate a repeat zero counts.
var countsLimit int32 = int32(rh.countsIndexFor(rh.Max()) + 1)
var srcIndex int32 = 0
for srcIndex < countsLimit {
count := rh.counts[srcIndex]
srcIndex++

var zeros int64 = 0
// check for contiguous zeros
if count == 0 {
zeros = 1
for srcIndex < countsLimit && 0 == rh.counts[srcIndex] {
zeros++
srcIndex++
}
}
if zeros > 1 {
err = binary.Write(buf, binary.BigEndian, zig_zag_encode_i64(-zeros))
} else {
err = binary.Write(buf, binary.BigEndian, zig_zag_encode_i64(count))
}
}
buffer = buf.Bytes()
return
}

// Read an LEB128 ZigZag encoded long value from the given buffer
func zig_zag_decode_i64(buf []byte) (signedValue int64, n int) {
var value uint64 = 0
for shift := uint(0); ; shift += 7 {
if n >= len(buf) {
return 0, 0
}
b := uint64(buf[n])
n++
value |= (b & 0x7f) << shift
if (b & 0x80) == 0 {
break
}
}
signedValue = int64((value >> 1) ^ -(value & 1))
return
}

// Writes a int64_t value to the given buffer in LEB128 ZigZag encoded format
// ZigZag encoding maps signed integers to unsigned integers so that numbers with a small
// absolute value (for instance, -1) have a small varint encoded value too.
// It does this in a way that "zig-zags" back and forth through the positive and negative integers,
// so that -1 is encoded as 1, 1 is encoded as 2, -2 is encoded as 3, and so on.
func zig_zag_encode_i64(signedValue int64) (buffer []byte) {
buffer = make([]byte, 0)
var value uint64 = uint64((signedValue << 1) ^ (signedValue >> 63))
for {
c := byte(value & 0x7f)
value >>= 7
if value != 0 {
c |= 0x80
}
buffer = append(buffer, c)
if c&0x80 == 0 {
break
}
}
return
}

func decodeDeCompressedHeaderFormat(decoded []byte) (Cookie int32, PayloadLength int32, NormalizingIndexOffSet int32, NumberOfSignificantValueDigits int32, LowestTrackableValue int64, HighestTrackableValue int64, IntegerToDoubleConversionRatio float64, err error) {
rbuf := bytes.NewBuffer(decoded[0:40])
r32 := make([]int32, 4)
r64 := make([]int64, 2)
err = binary.Read(rbuf, binary.BigEndian, &r32)
if err != nil {
return
}
err = binary.Read(rbuf, binary.BigEndian, &r64)
if err != nil {
return
}
err = binary.Read(rbuf, binary.BigEndian, &IntegerToDoubleConversionRatio)
if err != nil {
return
}
Cookie = r32[0] & ^0xf0
PayloadLength = r32[1]
NormalizingIndexOffSet = r32[2]
NumberOfSignificantValueDigits = r32[3]
LowestTrackableValue = r64[0]
HighestTrackableValue = r64[1]
return
}
Loading

0 comments on commit 311d303

Please sign in to comment.