Skip to content

Commit

Permalink
Merge pull request #15 from negbie/master
Browse files Browse the repository at this point in the history
Add rtp to pcap output
  • Loading branch information
negbie authored Dec 7, 2017
2 parents fd28bee + f2fc38e commit 403a28a
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 76 deletions.
86 changes: 48 additions & 38 deletions decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package decoder

import (
"fmt"
"net"
"os"
"runtime/debug"

"github.com/coocood/freecache"
"github.com/google/gopacket"
Expand All @@ -22,6 +22,7 @@ type Decoder struct {
dupCount int
dnsCount int
ip4Count int
ip6Count int
rtcpCount int
rtcpFailCount int
tcpCount int
Expand All @@ -38,14 +39,14 @@ type Decoder struct {

type Packet struct {
Host string
HEPType byte
Tsec uint32
Tmsec uint32
Vlan uint16
Version uint8
Protocol uint8
SrcIP uint32
DstIP uint32
ProtoType uint8
SrcIP net.IP
DstIP net.IP
SrcPort uint16
DstPort uint16
CorrelationID []byte
Expand All @@ -71,22 +72,15 @@ func NewDecoder(datalink layers.LinkType) *Decoder {
cSIP := freecache.NewCache(20 * 1024 * 1024) // 20MB
cSDP := freecache.NewCache(20 * 1024 * 1024) // 20MB
cRTCP := freecache.NewCache(60 * 1024 * 1024) // 60MB
debug.SetGCPercent(20)
//debug.SetGCPercent(20)

d := &Decoder{
Host: host,
LayerType: lt,
defragger: ip4defrag.NewIPv4Defragmenter(),
fragCount: 0,
dupCount: 0,
ip4Count: 0,
udpCount: 0,
tcpCount: 0,
dnsCount: 0,
unknownCount: 0,
SIPCache: cSIP,
SDPCache: cSDP,
RTCPCache: cRTCP,
Host: host,
LayerType: lt,
defragger: ip4defrag.NewIPv4Defragmenter(),
SIPCache: cSIP,
SDPCache: cSDP,
RTCPCache: cRTCP,
}
go d.flushFragments()
go d.printStats()
Expand Down Expand Up @@ -126,23 +120,20 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
pkt.Vlan = dot1q.VLANIdentifier
}

if ipLayer := packet.Layer(layers.LayerTypeIPv4); ipLayer != nil {
ip4, ok := ipLayer.(*layers.IPv4)
if ipv4Layer := packet.Layer(layers.LayerTypeIPv4); ipv4Layer != nil {
ip4, ok := ipv4Layer.(*layers.IPv4)
ip4Len := ip4.Length
if !ok {
return nil, nil
}

pkt.Version = ip4.Version
pkt.Protocol = uint8(ip4.Protocol)
pkt.SrcIP = ip2int(ip4.SrcIP)
pkt.DstIP = ip2int(ip4.DstIP)
pkt.ProtoType = 1
pkt.SrcIP = ip4.SrcIP
pkt.DstIP = ip4.DstIP
d.ip4Count++

if config.Cfg.Mode == "SIP" || config.Cfg.Mode == "SIPRTCP" {
pkt.HEPType = 1
}

d.FlowSrcIP = ip4.SrcIP.String()
d.FlowDstIP = ip4.DstIP.String()

Expand All @@ -162,8 +153,8 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error

pkt.Version = ip4New.Version
pkt.Protocol = uint8(ip4New.Protocol)
pkt.SrcIP = ip2int(ip4New.SrcIP)
pkt.DstIP = ip2int(ip4New.DstIP)
pkt.SrcIP = ip4New.SrcIP
pkt.DstIP = ip4New.DstIP

pb, ok := packet.(gopacket.PacketBuilder)
if !ok {
Expand All @@ -174,6 +165,20 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
}
}

if ipv6Layer := packet.Layer(layers.LayerTypeIPv6); ipv6Layer != nil {
ip6, ok := ipv6Layer.(*layers.IPv6)
if !ok {
return nil, nil
}

pkt.Version = ip6.Version
pkt.Protocol = uint8(ip6.NextHeader)
pkt.ProtoType = 1
pkt.SrcIP = ip6.SrcIP
pkt.DstIP = ip6.DstIP
d.ip6Count++
}

if udpLayer := packet.Layer(layers.LayerTypeUDP); udpLayer != nil {
udp, ok := udpLayer.(*layers.UDP)
if !ok {
Expand All @@ -188,18 +193,23 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
d.FlowSrcPort = fmt.Sprintf("%d", udp.SrcPort)
d.FlowDstPort = fmt.Sprintf("%d", udp.DstPort)

if config.Cfg.Mode == "SIPRTCP" {
if config.Cfg.Mode == "SIPRTCP" || config.Cfg.Mode == "SIPRTP" {
d.cacheSDPIPPort(udp.Payload)
if (udp.Payload[0]&0xc0)>>6 == 2 && udp.SrcPort%2 != 0 && udp.DstPort%2 != 0 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) {
pkt.Payload, pkt.CorrelationID, pkt.HEPType = d.correlateRTCP(udp.Payload)
if pkt.Payload == nil {
d.rtcpFailCount++
if (udp.Payload[0]&0xc0)>>6 == 2 {
if udp.SrcPort%2 != 0 && udp.DstPort%2 != 0 && (udp.Payload[1] == 200 || udp.Payload[1] == 201) {
pkt.Payload, pkt.CorrelationID, pkt.ProtoType = d.correlateRTCP(udp.Payload)
if pkt.Payload == nil {
d.rtcpFailCount++
} else {
d.rtcpCount++
}
} else {
d.rtcpCount++
logp.Debug("rtp", "\n%v", packet)
pkt.Payload = nil
return nil, nil
}
}
}

} else if tcpLayer := packet.Layer(layers.LayerTypeTCP); tcpLayer != nil {
tcp, ok := tcpLayer.(*layers.TCP)
if !ok {
Expand All @@ -211,7 +221,7 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
pkt.Payload = tcp.Payload
d.tcpCount++

if config.Cfg.Mode == "SIPRTCP" {
if config.Cfg.Mode == "SIPRTCP" || config.Cfg.Mode == "SIPRTP" {
d.cacheSDPIPPort(tcp.Payload)
}
}
Expand All @@ -224,13 +234,13 @@ func (d *Decoder) Process(data []byte, ci *gopacket.CaptureInfo) (*Packet, error
}
d.dnsCount++
pkt.Payload = protos.ParseDNS(dns)
pkt.HEPType = 53
pkt.ProtoType = 53
}

if config.Cfg.Mode == "TLS" {
if appLayer := packet.ApplicationLayer(); appLayer != nil {
pkt.Payload = protos.NewTLS(appLayer.Payload())
pkt.HEPType = 100
pkt.ProtoType = 100

}
}
Expand Down
18 changes: 9 additions & 9 deletions decoder/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ func fastHash(s []byte) (h uint64) {
return
}

func ip2int(ip net.IP) uint32 {
func IP2int(ip net.IP) uint32 {
if len(ip) == 16 {
return binary.BigEndian.Uint32(ip[12:16])
}
return binary.BigEndian.Uint32(ip)
}

func int2ip(nn uint32) net.IP {
func Int2IP(nn uint32) net.IP {
ip := make(net.IP, 4)
binary.BigEndian.PutUint32(ip, nn)
return ip
Expand All @@ -46,12 +46,12 @@ func (d *Decoder) flushFragments() {
func (p *Packet) MarshalJSON() ([]byte, error) {
return json.Marshal(&struct {
Host string
HEPType byte
Tsec uint32
Tmsec uint32
Vlan uint16
Version uint8
Protocol uint8
ProtoType uint8
SrcIP net.IP
DstIP net.IP
SrcPort uint16
Expand All @@ -60,14 +60,14 @@ func (p *Packet) MarshalJSON() ([]byte, error) {
Payload string
}{
Host: p.Host,
HEPType: p.HEPType,
Tsec: p.Tsec,
Tmsec: p.Tmsec,
Vlan: p.Vlan,
Version: p.Version,
Protocol: p.Protocol,
SrcIP: int2ip(p.SrcIP),
DstIP: int2ip(p.DstIP),
ProtoType: p.ProtoType,
SrcIP: p.SrcIP,
DstIP: p.DstIP,
SrcPort: p.SrcPort,
DstPort: p.DstPort,
CorrelationID: string(p.CorrelationID),
Expand All @@ -76,9 +76,9 @@ func (p *Packet) MarshalJSON() ([]byte, error) {
}

func (d *Decoder) printPacketStats() {
logp.Info("Packets since last minute IPv4: %d, UDP: %d, RTCP: %d, RTCPFail: %d, TCP: %d, DNS: %d, duplicate: %d, fragments: %d, unknown: %d",
d.ip4Count, d.udpCount, d.rtcpCount, d.rtcpFailCount, d.tcpCount, d.dnsCount, d.dupCount, d.fragCount, d.unknownCount)
d.ip4Count, d.udpCount, d.rtcpCount, d.rtcpFailCount, d.tcpCount, d.dnsCount, d.dupCount, d.fragCount, d.unknownCount = 0, 0, 0, 0, 0, 0, 0, 0, 0
logp.Info("Packets since last minute IPv4: %d, IPv6: %d, UDP: %d, TCP: %d, RTCP: %d, RTCPFail: %d, DNS: %d, duplicate: %d, fragments: %d, unknown: %d",
d.ip4Count, d.ip6Count, d.udpCount, d.tcpCount, d.rtcpCount, d.rtcpFailCount, d.dnsCount, d.dupCount, d.fragCount, d.unknownCount)
d.ip4Count, d.ip6Count, d.udpCount, d.tcpCount, d.rtcpCount, d.rtcpFailCount, d.dnsCount, d.dupCount, d.fragCount, d.unknownCount = 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
}

func (d *Decoder) printSIPCacheStats() {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"github.com/negbie/heplify/sniffer"
)

const version = "heplify 0.95"
const version = "heplify 0.96"

func parseFlags() {

Expand Down
54 changes: 28 additions & 26 deletions publish/hep.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type HepOutputer struct {
func NewHepOutputer(serverAddr string) (*HepOutputer, error) {
ho := &HepOutputer{
addr: serverAddr,
hepQueue: make(chan []byte, 10000),
hepQueue: make(chan []byte, 20000),
}
err := ho.Init()
if err != nil {
Expand Down Expand Up @@ -134,29 +134,38 @@ func makeChunck(chunckVen uint16, chunckType uint16, h *decoder.Packet) []byte {
// Chunk IP protocol family (0x02=IPv4, 0x0a=IPv6)
case 0x0001:
chunck = make([]byte, 6+1)
chunck[6] = 0x02
//chunck[6] = h.Version
if h.Version == 4 {
chunck[6] = 0x02
} else if h.Version == 6 {
chunck[6] = 0x0a
} else {
chunck[6] = 0x02
}

// Chunk IP protocol ID (0x11=UDP)
// Chunk IP protocol ID (0x06=TCP, 0x11=UDP)
case 0x0002:
chunck = make([]byte, 6+1)
chunck[6] = h.Protocol

// Chunk IPv4 source address
case 0x0003:
chunck = make([]byte, 6+4)
binary.BigEndian.PutUint32(chunck[6:], h.SrcIP)
chunck = make([]byte, 6+len(h.SrcIP))
copy(chunck[6:], h.SrcIP)

// Chunk IPv4 destination address
case 0x0004:
chunck = make([]byte, 6+4)
binary.BigEndian.PutUint32(chunck[6:], h.DstIP)
chunck = make([]byte, 6+len(h.DstIP))
copy(chunck[6:], h.DstIP)

// Chunk IPv6 source address
// case 0x0005:
case 0x0005:
chunck = make([]byte, 6+len(h.SrcIP))
copy(chunck[6:], h.SrcIP)

// Chunk IPv6 destination address
// case 0x0006:
case 0x0006:
chunck = make([]byte, 6+len(h.DstIP))
copy(chunck[6:], h.DstIP)

// Chunk protocol source port
case 0x0007:
Expand All @@ -178,22 +187,10 @@ func makeChunck(chunckVen uint16, chunckType uint16, h *decoder.Packet) []byte {
chunck = make([]byte, 6+4)
binary.BigEndian.PutUint32(chunck[6:], h.Tmsec)

// Chunk protocol type (SIP/H323/RTP/MGCP/M2UA)
// Chunk protocol type (DNS, LOG, RTCP, SIP)
case 0x000b:
chunck = make([]byte, 6+1)
switch h.HEPType {
case 1:
chunck[6] = 1 // SIP
case 5:
chunck[6] = 5 // RTCP
case 53:
chunck[6] = 53 // DNS
case 100:
chunck[6] = 100 // LOG
default:
chunck[6] = 66 // Unknown

}
chunck[6] = h.ProtoType

// Chunk capture agent ID
case 0x000c:
Expand Down Expand Up @@ -244,8 +241,13 @@ func newHEPChuncks(h *decoder.Packet) []byte {

buf.Write(makeChunck(0x0000, 0x0001, h))
buf.Write(makeChunck(0x0000, 0x0002, h))
buf.Write(makeChunck(0x0000, 0x0003, h))
buf.Write(makeChunck(0x0000, 0x0004, h))
if h.Version == 4 {
buf.Write(makeChunck(0x0000, 0x0003, h))
buf.Write(makeChunck(0x0000, 0x0004, h))
} else if h.Version == 6 {
buf.Write(makeChunck(0x0000, 0x0005, h))
buf.Write(makeChunck(0x0000, 0x0006, h))
}
buf.Write(makeChunck(0x0000, 0x0007, h))
buf.Write(makeChunck(0x0000, 0x0008, h))
buf.Write(makeChunck(0x0000, 0x0009, h))
Expand Down
4 changes: 2 additions & 2 deletions sniffer/sniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ func (sniffer *SnifferSetup) setFromConfig(cfg *config.InterfacesConfig) error {
sniffer.filter = "greater 256 and portrange " + sniffer.config.PortRange + " or ip[6:2] & 0x1fff != 0"
case "SIPDNS":
sniffer.filter = "(greater 256 and portrange " + sniffer.config.PortRange + " or ip[6:2] & 0x1fff != 0) or (greater 32 and ip and dst port 53)"
case "RTCP":
sniffer.filter = "(ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)"
case "SIPRTP":
sniffer.filter = "(greater 256 and portrange " + sniffer.config.PortRange + " or ip[6:2] & 0x1fff != 0) or (ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80)"
case "SIPRTCP":
sniffer.filter = "(greater 256 and portrange " + sniffer.config.PortRange + " or ip[6:2] & 0x1fff != 0) or (ip and ip[6] & 0x2 = 0 and ip[6:2] & 0x1fff = 0 and udp and udp[8] & 0xc0 = 0x80 and udp[9] >= 0xc8 && udp[9] <= 0xcc)"
case "SIPLOG":
Expand Down

0 comments on commit 403a28a

Please sign in to comment.