Skip to content

Commit

Permalink
feat(pcapng): Read interface name from pcapng file and optimize inter…
Browse files Browse the repository at this point in the history
…face handling when writing (#165)

* chore(output/pcapng): remove needless dummy interfaces to decrease file size

* feat(pcapng): read interface name from pcapng file and tidy pcapng file

* fix test
  • Loading branch information
mozillazg authored Oct 19, 2024
1 parent 2cb31ff commit 119581c
Show file tree
Hide file tree
Showing 30 changed files with 134 additions and 89 deletions.
16 changes: 8 additions & 8 deletions bpf/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,13 @@ func (b *BPF) PullNewNetDeviceEvents(ctx context.Context, chanSize int) (<-chan
go func() {
defer close(ch)
defer reader.Close()
b.handleNetNetDeviceEvents(ctx, reader, ch)
b.handleNewNetDeviceEvents(ctx, reader, ch)
}()

return ch, nil
}

func (b *BPF) handleNetNetDeviceEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfNewNetdeviceEventT) {
func (b *BPF) handleNewNetDeviceEvents(ctx context.Context, reader *perf.Reader, ch chan<- BpfNewNetdeviceEventT) {
for {
select {
case <-ctx.Done():
Expand All @@ -240,12 +240,12 @@ func (b *BPF) handleNetNetDeviceEvents(ctx context.Context, reader *perf.Reader,
log.Infof("got EOF error: %s", err)
continue
}
log.Errorf("read go tls keylog event failed: %s", err)
log.Errorf("read new net device event failed: %s", err)
continue
}
event, err := parseNewNetDeviceEvent(record.RawSample)
if err != nil {
log.Errorf("parse go tls keylog event failed: %s", err)
log.Errorf("parse new net device event failed: %s", err)
} else {
ch <- *event
dev := event.Dev
Expand Down Expand Up @@ -307,12 +307,12 @@ func (b *BPF) handleNetDeviceChangeEvents(ctx context.Context, reader *perf.Read
log.Infof("got EOF error: %s", err)
continue
}
log.Errorf("read go tls keylog event failed: %s", err)
log.Errorf("read net device change event failed: %s", err)
continue
}
event, err := parseNetDeviceChangeEvent(record.RawSample)
if err != nil {
log.Errorf("parse go tls keylog event failed: %s", err)
log.Errorf("parse net device change event failed: %s", err)
} else {
ch <- *event
oldDev := event.OldDevice
Expand Down Expand Up @@ -376,12 +376,12 @@ func (b *BPF) handleMountEvents(ctx context.Context, reader *perf.Reader, ch cha
log.Infof("got EOF error: %s", err)
continue
}
log.Errorf("read go tls keylog event failed: %s", err)
log.Errorf("read mount event failed: %s", err)
continue
}
event, err := parseMountEvent(record.RawSample)
if err != nil {
log.Errorf("parse go tls keylog event failed: %s", err)
log.Errorf("parse mount event failed: %s", err)
} else {
ch <- *event
log.Infof("new BpfMountEventT: (source %s, dest %s, fstype, %s)",
Expand Down
24 changes: 8 additions & 16 deletions cmd/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,21 +76,13 @@ func newPcapNgWriter(w io.Writer, pcache *metadata.ProcessCache, opts *Options)
return nil, fmt.Errorf(": %w", err)
}

// to avoid "Interface id 9 not present in section (have only 7 interfaces)"
maxIndex := 0
for _, dev := range devices.Devs() {
if dev.Ifindex > maxIndex {
maxIndex = dev.Ifindex
}
}
interfaces := make([]pcapgo.NgInterface, maxIndex+1)
for _, dev := range devices.Devs() {
interfaces[dev.Ifindex] = metadata.NewNgInterface(dev, opts.pcapFilter)
}
for i, iface := range interfaces {
if iface.Index == 0 {
interfaces[i] = metadata.NewDummyNgInterface(i)
}
interfaceIds := map[string]int{}
interfaces := []pcapgo.NgInterface{metadata.NewDummyNgInterface(0, opts.pcapFilter)}
for i, dev := range devices.Devs() {
index := i + 1
intf := metadata.NewNgInterface(dev, opts.pcapFilter, index)
interfaces = append(interfaces, intf)
interfaceIds[dev.Key()] = index
}

pcapNgWriter, err := pcapgo.NewNgWriterInterface(w, interfaces[0], pcapgo.NgWriterOptions{
Expand All @@ -115,7 +107,7 @@ func newPcapNgWriter(w io.Writer, pcache *metadata.ProcessCache, opts *Options)
return nil, fmt.Errorf("writing pcapNg header: %w", err)
}

wt := writer.NewPcapNGWriter(pcapNgWriter, pcache, interfaces).WithPcapFilter(opts.pcapFilter)
wt := writer.NewPcapNGWriter(pcapNgWriter, pcache, interfaceIds).WithPcapFilter(opts.pcapFilter)
return wt, nil
}

Expand Down
44 changes: 27 additions & 17 deletions internal/metadata/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,29 @@ func (d *DeviceCache) GetByIfindex(ifindex int, netNsInode uint32) (types.Device
ns = types.NewNetNsWithInode(netNsInode)
}

devs, ok := d.allLinks[netNsInode]
if !ok {
for _, links := range d.allLinks {
for _, dev := range links {
devs = append(devs, dev)
for inode, links := range d.allLinks {
if netNsInode != inode {
continue
}
for _, dev := range links {
if dev.Index == ifindex {
return types.Device{
Name: dev.Name,
Ifindex: ifindex,
NetNs: ns,
}, true
}
}
}
for _, dev := range devs {
if dev.Index == ifindex {
return types.Device{
Name: dev.Name,
Ifindex: ifindex,
NetNs: ns,
}, true
for _, links := range d.allLinks {
for _, dev := range links {
if dev.Index == ifindex {
return types.Device{
Name: dev.Name,
Ifindex: ifindex,
NetNs: ns,
}, true
}
}
}

Expand Down Expand Up @@ -193,13 +201,14 @@ func (d *DeviceCache) getAllLinks(inode uint32) ([]net.Interface, error) {
return d.allLinks[inode], nil
}

func NewNgInterface(dev types.Device, filter string) pcapgo.NgInterface {
comment := ""
func NewNgInterface(dev types.Device, filter string, index int) pcapgo.NgInterface {
comment := fmt.Sprintf("ifIndex: %d", dev.Ifindex)
if dev.NetNs != nil {
comment = fmt.Sprintf("netNsInode: %d, netNsPath: %s", dev.NetNs.Inode(), dev.NetNs.Path())
comment = fmt.Sprintf("%s, netNsInode: %d, netNsPath: %s",
comment, dev.NetNs.Inode(), dev.NetNs.Path())
}
return pcapgo.NgInterface{
Index: dev.Ifindex,
Index: index,
Name: dev.Name,
Comment: comment,
Filter: filter,
Expand All @@ -210,10 +219,11 @@ func NewNgInterface(dev types.Device, filter string) pcapgo.NgInterface {
}
}

func NewDummyNgInterface(index int) pcapgo.NgInterface {
func NewDummyNgInterface(index int, filter string) pcapgo.NgInterface {
return pcapgo.NgInterface{
Index: index,
Name: fmt.Sprintf("dummy-%d", index),
Filter: filter,
OS: runtime.GOOS,
LinkType: layers.LinkTypeEthernet,
SnapLength: uint32(math.MaxUint16),
Expand Down
13 changes: 13 additions & 0 deletions internal/parser/pcapng.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parser

import (
"github.com/mozillazg/ptcpdump/internal/types"
"io"

"github.com/gopacket/gopacket/pcapgo"
Expand Down Expand Up @@ -29,12 +30,24 @@ func (p *PcapNGParser) Parse() (*event.Packet, error) {
if err != nil {
return nil, err
}

e, err := event.FromPacket(ci, data)
if err != nil {
return nil, err
}

interf, err := p.r.Interface(ci.InterfaceIndex)
if err == nil {
e.Device = types.Device{
Name: interf.Name,
Ifindex: 0,
NetNs: nil,
}
}

exec, ctx := event.FromPacketOptions(opts)
e.Pid = exec.Pid
p.pcache.AddItemWithContext(exec, ctx)

return e, nil
}
10 changes: 5 additions & 5 deletions internal/types/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func NewInterfaces() *Interfaces {
}

func (i *Interfaces) Add(dev Device) {
k := i.key(dev)
k := dev.Key()
i.devs[k] = dev
}

Expand All @@ -45,10 +45,6 @@ func (i *Interfaces) Devs() []Device {
return devs
}

func (i *Interfaces) key(dev Device) string {
return fmt.Sprintf("%d.%d", dev.NetNs.Inode(), dev.Ifindex)
}

func (i *Interfaces) GetByIfindex(index int) Device {
for _, v := range i.devs {
if v.Ifindex == index {
Expand All @@ -58,6 +54,10 @@ func (i *Interfaces) GetByIfindex(index int) Device {
return Device{}
}

func (d *Device) Key() string {
return fmt.Sprintf("%d.%d", d.NetNs.Inode(), d.Ifindex)
}

func (d *Device) String() string {
return fmt.Sprintf("{Device ifindex: %d, name: %s, ns: %s}", d.Ifindex, d.Name, d.NetNs)
}
3 changes: 3 additions & 0 deletions internal/types/netns.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,8 @@ func (n *NetNs) Path() string {
}

func (n *NetNs) Inode() uint32 {
if n == nil {
return 0
}
return n.inode
}
59 changes: 38 additions & 21 deletions internal/writer/pcapng.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"github.com/mozillazg/ptcpdump/internal/types"
"strings"
"sync"

"github.com/gopacket/gopacket"
Expand All @@ -14,19 +15,19 @@ import (
)

type PcapNGWriter struct {
pw *pcapgo.NgWriter
pcache *metadata.ProcessCache
interfaces []pcapgo.NgInterface
pcapFilter string
pw *pcapgo.NgWriter
pcache *metadata.ProcessCache
interfaceIds map[string]int
pcapFilter string

noBuffer bool
lock sync.Mutex
lock sync.RWMutex
keylogs bytes.Buffer
}

func NewPcapNGWriter(pw *pcapgo.NgWriter, pcache *metadata.ProcessCache,
interfaces []pcapgo.NgInterface) *PcapNGWriter {
return &PcapNGWriter{pw: pw, pcache: pcache, interfaces: interfaces, lock: sync.Mutex{}}
interfaceIds map[string]int) *PcapNGWriter {
return &PcapNGWriter{pw: pw, pcache: pcache, interfaceIds: interfaceIds, lock: sync.RWMutex{}}
}

func (w *PcapNGWriter) Write(e *event.Packet) error {
Expand All @@ -35,7 +36,7 @@ func (w *PcapNGWriter) Write(e *event.Packet) error {
Timestamp: e.Time.Local(),
CaptureLength: payloadLen,
Length: e.Len,
InterfaceIndex: e.Device.Ifindex,
InterfaceIndex: w.getInterfaceIndex(e.Device),
}
p := w.pcache.Get(e.Pid, e.MntNs, e.NetNs, e.CgroupName)

Expand Down Expand Up @@ -109,24 +110,40 @@ func (w *PcapNGWriter) AddDev(dev types.Device) {
w.lock.Lock()
defer w.lock.Unlock()

log.Infof("new dev: %+v, currLen: %d", dev, len(w.interfaces))
if len(w.interfaces) > dev.Ifindex {
log.Infof("new dev: %+v, currLen: %d", dev, len(w.interfaceIds))
key := dev.Key()
if w.interfaceIds[key] > 0 {
return
}

for i := len(w.interfaces); i <= dev.Ifindex; i++ {
var intf pcapgo.NgInterface
if i == dev.Ifindex {
intf = metadata.NewNgInterface(dev, w.pcapFilter)
} else {
intf = metadata.NewDummyNgInterface(i)
}
log.Debugf("add interface: %+v", intf)
if _, err := w.pw.AddInterface(intf); err != nil {
log.Errorf("error adding interface %s: %+v", intf.Name, err)
index := len(w.interfaceIds) + 1
intf := metadata.NewNgInterface(dev, w.pcapFilter, index)
log.Infof("add interface: %+v", intf)

if _, err := w.pw.AddInterface(intf); err != nil {
log.Errorf("error adding interface %s: %+v", intf.Name, err)
}

w.interfaceIds[key] = index
}

func (w *PcapNGWriter) getInterfaceIndex(dev types.Device) int {
w.lock.RLock()
defer w.lock.RUnlock()

log.Infof("interfaceIds: %+v, dev: %+v", w.interfaceIds, dev)

index := w.interfaceIds[dev.Key()]
if index > 0 {
return index
}
suffix := fmt.Sprintf(".%d", dev.Ifindex)
for k, index := range w.interfaceIds {
if strings.HasSuffix(k, suffix) {
return index
}
w.interfaces = append(w.interfaces, intf)
}
return 0
}

func (w *PcapNGWriter) WithPcapFilter(filter string) *PcapNGWriter {
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_arp.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function test_tcpdump_read() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} -v -r "${FNAME}" > "${RNAME}"
diff "${EXPECT_NAME}" "${RNAME}"
}
Expand Down
6 changes: 4 additions & 2 deletions testdata/test_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ function test_tcpdump_read() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} -v -r "${FNAME}" > "${RNAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} -v -r "${FNAME}" |tee "${RNAME}"
diff "${EXPECT_NAME}" "${RNAME}"

${CMD} -r ${FNAME}
}

function main() {
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_containerd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ function test_ptcpdump() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} --oneline -v -r "${FNAME}" > "${RNAME}"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 10 1.1.1.1.* $cid1"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 5 1.1.1.1.* $cid2"
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_containerd_container_id_filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ function test_ptcpdump() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} --oneline -v -r "${FNAME}" > "${RNAME}"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 10 1.1.1.1.* $cid1"
}
Expand Down
2 changes: 1 addition & 1 deletion testdata/test_containerd_container_name_filter.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ function test_ptcpdump() {

function test_ptcpdump_read() {
EXPECT_NAME="${LNAME}.read.expect"
sed 's/ [a-zA-Z0-9_-]\+ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
sed 's/ \(In\|Out\) / /g' "${LNAME}" > "${EXPECT_NAME}"
timeout 30s ${CMD} --oneline -v -r "${FNAME}" > "${RNAME}"
cat "${RNAME}" | grep "> 1.1.1.1.80: Flags .*, args wget -T 10 1.1.1.1.* $cid1"
}
Expand Down
Loading

0 comments on commit 119581c

Please sign in to comment.