Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support_flow_sample_expanded_and_counter_sample_expanded #170

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion sflow/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ import (
const (
// DataFlowSample defines packet flow sampling
DataFlowSample = 1
DataFlowSampleExpanded = 3

// DataCounterSample defines counter sampling
DataCounterSample = 2
DataCounterSampleExpanded = 4
)

// SFDecoder represents sFlow decoder
Expand Down Expand Up @@ -120,7 +122,19 @@ func (d *SFDecoder) SFDecode() (*SFDatagram, error) {
}
datagram.Samples = append(datagram.Samples, d)
case DataCounterSample:
d, err := decodeFlowCounter(d.reader)
d, err := decodeFlowCounter(d.reader, false)
if err != nil {
return datagram, err
}
datagram.Counters = append(datagram.Counters, d)
case DataFlowSampleExpanded:
d, err := decodeFlowSampleExpand(d.reader)
if err != nil {
return datagram, err
}
datagram.Samples = append(datagram.Samples, d)
case DataCounterSampleExpanded:
d, err := decodeFlowCounter(d.reader, true)
if err != nil {
return datagram, err
}
Expand Down
34 changes: 22 additions & 12 deletions sflow/flow_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,21 +146,21 @@ type ProcessorCounters struct {
// CounterSample represents the periodic sampling or polling of counters associated with a Data Source
type CounterSample struct {
SequenceNo uint32
SourceIDType byte
SourceIDType uint32
SourceIDIdx uint32
RecordsNo uint32
Records map[string]Record
}

func decodeFlowCounter(r io.ReadSeeker) (*CounterSample, error) {
func decodeFlowCounter(r io.ReadSeeker, expanded bool) (*CounterSample, error) {
var (
cs = new(CounterSample)
rTypeFormat uint32
rTypeLength uint32
err error
)

if err = cs.unmarshal(r); err != nil {
if err = cs.unmarshal(r, expanded); err != nil {
return nil, err
}

Expand Down Expand Up @@ -441,23 +441,33 @@ func (pc *ProcessorCounters) unmarshal(r io.Reader) error {
return nil
}

func (cs *CounterSample) unmarshal(r io.Reader) error {
func (cs *CounterSample) unmarshal(r io.Reader, expanded bool) error {

var err error

if err = read(r, &cs.SequenceNo); err != nil {
return err
}

if err = read(r, &cs.SourceIDType); err != nil {
return err
}

buf := make([]byte, 3)
if err = read(r, &buf); err != nil {
return err
if expanded {
if err = read(r, &cs.SourceIDType); err != nil {
return err
}
if err = read(r, &cs.SourceIDIdx); err != nil {
return err
}
} else {
buf := make([]byte, 1)
if err = read(r, &buf); err != nil {
return err
}
cs.SourceIDType = uint32(buf[0])
buf = make([]byte, 3)
if err = read(r, &buf); err != nil {
return err
}
cs.SourceIDIdx = uint32(buf[2]) | uint32(buf[1])<<8 | uint32(buf[0])<<16
}
cs.SourceIDIdx = uint32(buf[2]) | uint32(buf[1])<<8 | uint32(buf[0])<<16

err = read(r, &cs.RecordsNo)

Expand Down
132 changes: 128 additions & 4 deletions sflow/flow_sample.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ const (
// FlowSample represents single flow sample
type FlowSample struct {
SequenceNo uint32 // Incremented with each flow sample
SourceID byte // sfSourceID
SourceID uint32 // sfSourceID
SamplingRate uint32 // sfPacketSamplingRate
SamplePool uint32 // Total number of packets that could have been sampled
Drops uint32 // Number of times a packet was dropped due to lack of resources
Expand All @@ -54,6 +54,29 @@ type FlowSample struct {
Records map[string]Record
}

type sflowDataSourceExpand struct{
sourceIdType uint32; /* sFlowDataSource type */
sourceIdIndex uint32; /* sFlowDataSource index */
}

type interfaceExpand struct{
format uint32; /* interface format */
value uint32; /* interface value */
}

// FlowSampleExpand represents single flow sample expand
type FlowSampleExpand struct {
SequenceNo uint32 // Incremented with each flow sample
SourceID sflowDataSourceExpand // sfSourceID
SamplingRate uint32 // sfPacketSamplingRate
SamplePool uint32 // Total number of packets that could have been sampled
Drops uint32 // Number of times a packet was dropped due to lack of resources
Input interfaceExpand // SNMP ifIndex of input interface
Output interfaceExpand // SNMP ifIndex of input interface
RecordsNo uint32 // Number of records to follow
Records map[string]Record
}

// SampledHeader represents sampled header
type SampledHeader struct {
Protocol uint32 // (enum SFLHeader_protocol)
Expand Down Expand Up @@ -89,11 +112,13 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker) error {
return err
}

if err = read(r, &fs.SourceID); err != nil {
return err
buf := make([]byte, 1)
if err = read(r, &buf); err != nil {
return err
}
fs.SourceID = uint32(buf[0])
r.Seek(3, 1) // skip unused bytes

r.Seek(3, 1) // skip counter sample decoding

if err = read(r, &fs.SamplingRate); err != nil {
return err
Expand All @@ -120,6 +145,54 @@ func (fs *FlowSample) unmarshal(r io.ReadSeeker) error {
return err
}

func (fs *FlowSampleExpand) unmarshalExpand(r io.ReadSeeker) error {
var err error

if err = read(r, &fs.SequenceNo); err != nil {
return err
}

if err = read(r, &fs.SourceID.sourceIdType); err != nil {
return err
}

if err = read(r, &fs.SourceID.sourceIdType); err != nil {
return err
}

if err = read(r, &fs.SamplingRate); err != nil {
return err
}

if err = read(r, &fs.SamplePool); err != nil {
return err
}

if err = read(r, &fs.Drops); err != nil {
return err
}

if err = read(r, &fs.Input.format); err != nil {
return err
}

if err = read(r, &fs.Input.value); err != nil {
return err
}

if err = read(r, &fs.Output.format); err != nil {
return err
}

if err = read(r, &fs.Output.value); err != nil {
return err
}

err = read(r, &fs.RecordsNo)

return err
}

func (sh *SampledHeader) unmarshal(r io.Reader) error {
var err error

Expand Down Expand Up @@ -248,6 +321,57 @@ func decodeFlowSample(r io.ReadSeeker) (*FlowSample, error) {
return fs, nil
}

func decodeFlowSampleExpand(r io.ReadSeeker) (*FlowSampleExpand, error) {
var (
fs = new(FlowSampleExpand)
rTypeFormat uint32
rTypeLength uint32
err error
)

if err = fs.unmarshalExpand(r); err != nil {
return nil, err
}

fs.Records = make(map[string]Record)

for i := uint32(0); i < fs.RecordsNo; i++ {
if err = read(r, &rTypeFormat); err != nil {
return nil, err
}
if err = read(r, &rTypeLength); err != nil {
return nil, err
}

switch rTypeFormat {
case SFDataRawHeader:
d, err := decodeSampledHeader(r)
if err != nil {
return fs, err
}
fs.Records["RawHeader"] = d
case SFDataExtSwitch:
d, err := decodeExtSwitchData(r)
if err != nil {
return fs, err
}

fs.Records["ExtSwitch"] = d
case SFDataExtRouter:
d, err := decodeExtRouterData(r, rTypeLength)
if err != nil {
return fs, err
}

fs.Records["ExtRouter"] = d
default:
r.Seek(int64(rTypeLength), 1)
}
}

return fs, nil
}

func decodeSampledHeader(r io.Reader) (*packet.Packet, error) {
var (
h = new(SampledHeader)
Expand Down