Skip to content

Commit

Permalink
Merge branch 'master' of github.com:/philhagen/pycommunityid
Browse files Browse the repository at this point in the history
* 'master' of github.com:/philhagen/pycommunityid:
  use variable for file removal
  add tests for community-id-pcapfilter
  add first attempt at pcap filtering by Community ID
  • Loading branch information
ckreibich committed Nov 29, 2023
2 parents 64a6fa1 + d6f610a commit 7bf48a8
Show file tree
Hide file tree
Showing 5 changed files with 210 additions and 1 deletion.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ The package includes three sample applications:
exercices the package's "low-level" API, using flow tuple values as
you'd encounter them in a typical network monitor.

- [community-id-pcapfilter](https://github.com/corelight/pycommunityid/blob/master/scripts/community-id-pcapfilter),
which iterates over a pcap via dpkt and produces a pcap of
only those packets whose Community IDs have a specific value,
filtering out all others.

- [community-id-tcpdump](https://github.com/corelight/pycommunityid/blob/master/scripts/community-id-tcpdump),
which takes tcpdump output on stdin and
augments it with Community ID values on stdout. This exercices the
Expand Down
183 changes: 183 additions & 0 deletions scripts/community-id-pcapfilter
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
#! /usr/bin/env python
"""
This script filters one or more provided pcap files (not pcapng), creating
an output file containing only packets with a supplied Community ID hash.
The output file's packets retain timestamp and all packet data from the
originally supplied pcap file(s).
This is based heavily on the "community-id-pcap" script in the same
directory and retains all of its limitations and caveats at the time this
script was created.
Currently supported protocols include IP, IPv6, ICMP, ICMPv6, TCP,
UDP, SCTP.
Please note: the protocol parsing implemented in this script relies
on the dpkt module and is somewhat simplistic:
- dpkt seems to struggle with some SCTP packets, for which it fails
to register SCTP even though its header is correctly present.
- The script doesn't try to get nested network layers (IP over IPv6,
IP in IP, etc) right. It expects either IP or IPv6, and it expects
a transport-layer protocol (including the ICMPs here) as the
immediate next layer.
"""
import argparse
import gzip
import sys

import communityid

try:
import dpkt
except ImportError:
print('This needs the dpkt Python module')
sys.exit(1)

from dpkt.ethernet import Ethernet #pylint: disable=import-error
from dpkt.ip import IP #pylint: disable=import-error
from dpkt.ip6 import IP6 #pylint: disable=import-error
from dpkt.icmp import ICMP #pylint: disable=import-error
from dpkt.icmp6 import ICMP6 #pylint: disable=import-error
from dpkt.tcp import TCP #pylint: disable=import-error
from dpkt.udp import UDP #pylint: disable=import-error
from dpkt.sctp import SCTP #pylint: disable=import-error

class PcapFilter(object):
def __init__(self, commid, pcap, commidfilter, outputwriter):
self._commid = commid
self._pcap = pcap
self._commidfilter = commidfilter
self._outputwriter = outputwriter

def process(self):
if self._pcap.endswith('.gz'):
opener=gzip.open
else:
opener=open

with opener(self._pcap, 'r+b') as inhdl:
reader = dpkt.pcap.Reader(inhdl)
for tstamp, pktdata in reader:
self._process_packet(tstamp, pktdata, self._outputwriter)

def _process_packet(self, tstamp, pktdata, outputwriter):
pkt = self._packet_parse(pktdata)

if not pkt:
return

if IP in pkt:
saddr = pkt[IP].src
daddr = pkt[IP].dst
elif IP6 in pkt:
saddr = pkt[IP6].src
daddr = pkt[IP6].dst
else:
return

tpl = None

if TCP in pkt:
tpl = communityid.FlowTuple(
dpkt.ip.IP_PROTO_TCP, saddr, daddr,
pkt[TCP].sport, pkt[TCP].dport)

elif UDP in pkt:
tpl = communityid.FlowTuple(
dpkt.ip.IP_PROTO_UDP, saddr, daddr,
pkt[UDP].sport, pkt[UDP].dport)

elif SCTP in pkt:
tpl = communityid.FlowTuple(
dpkt.ip.IP_PROTO_SCTP, saddr, daddr,
pkt[SCTP].sport, pkt[SCTP].dport)

elif ICMP in pkt:
tpl = communityid.FlowTuple(
dpkt.ip.IP_PROTO_ICMP, saddr, daddr,
pkt[ICMP].type, pkt[ICMP].code)

elif ICMP6 in pkt:
tpl = communityid.FlowTuple(
dpkt.ip.IP_PROTO_ICMP6, saddr, daddr,
pkt[ICMP6].type, pkt[ICMP6].code)

if tpl is None:
# Fallbacks to other IP protocols:
if IP in pkt:
tpl = communityid.FlowTuple(pkt[IP].p, saddr, daddr)
elif IP6 in pkt:
tpl = communityid.FlowTuple(pkt[IP].nxt, saddr, daddr)

if tpl is None:
return

res = self._commid.calc(tpl)

if res == self._commidfilter:
outputwriter.writepkt(pktdata, tstamp)

def _packet_parse(self, pktdata):
"""
Parses the protocols in the given packet data and returns the
resulting packet (here, as a dict indexed by the protocol layers
in form of dpkt classes).
"""
layer = Ethernet(pktdata)
pkt = {}

if isinstance(layer.data, IP):
pkt[IP] = layer = layer.data
elif isinstance(layer.data, IP6):
# XXX This does not correctly skip IPv6 extension headers
pkt[IP6] = layer = layer.data
else:
return pkt

if isinstance(layer.data, ICMP):
pkt[ICMP] = layer.data
elif isinstance(layer.data, ICMP6):
pkt[ICMP6] = layer.data
elif isinstance(layer.data, TCP):
pkt[TCP] = layer.data
elif isinstance(layer.data, UDP):
pkt[UDP] = layer.data
elif isinstance(layer.data, SCTP):
pkt[SCTP] = layer.data

return pkt

def main():
parser = argparse.ArgumentParser(description='Community ID pcap filtering utility')
parser.add_argument('pcaps', metavar='PCAP', nargs='+',
help='PCAP packet capture files')
parser.add_argument('--filter', metavar='FILTER', required=True,
help='Community ID string in base64 format to filter from input pcap file(s)')
parser.add_argument('--output', metavar='OUTPUT', required=True,
help='Output pcap file to create and place matching packets into')
parser.add_argument('--seed', type=int, default=0, metavar='NUM',
help='Seed value for hash operations')
args = parser.parse_args()

commid = communityid.CommunityID(args.seed)

# if outfile exists, quit
try:
outhdl = open(args.output, 'xb')
except FileExistsError:
print('Error: output file %s already exists. Exiting.' % (args.output))
return 2
else:
writer = dpkt.pcap.Writer(outhdl)

for pcap in args.pcaps:
itr = PcapFilter(commid, pcap, args.filter, writer)
itr.process()

return 0

if __name__ == '__main__':
sys.exit(main())
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
scripts=[
'scripts/community-id',
'scripts/community-id-pcap',
'scripts/community-id-pcapfilter',
'scripts/community-id-tcpdump'],
test_suite="tests.communityid_test",
classifiers=[
Expand Down
22 changes: 21 additions & 1 deletion tests/communityid_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import subprocess
import sys
import unittest
import hashlib

try:
import pylint.epylint
Expand Down Expand Up @@ -402,6 +403,22 @@ def _check_output_community_id_pcap(self, args):
raise
self.skipTest("This test requires dpkt")

def _check_output_community_id_pcapfilter(self, args):
try:
args = [self._scriptpath('community-id-pcapfilter')] + args
subprocess.run(args, env=self.env)
outfileposition = args.index("--output") + 1
outfilename = args[outfileposition]
with open(outfilename, "rb") as f:
bytes = f.read()
checksum = hashlib.sha256(bytes).hexdigest()
os.remove(outfilename)
return checksum
except subprocess.CalledProcessError as err:
if err.output.find(b'This needs the dpkt Python module') < 0:
raise
self.skipTest("This test requires dpkt")

def test_communityid_pcap(self):
# This only works if we have dpkt
out = self._check_output_community_id_pcap([self._testfilepath('tcp.pcap')])
Expand All @@ -412,6 +429,10 @@ def test_communityid_pcap_json(self):
out = self._check_output_community_id_pcap(['--json', self._testfilepath('tcp.pcap')])
self.assertEqual(out, b'[{"proto": 6, "saddr": "128.232.110.120", "daddr": "66.35.250.204", "sport": 34855, "dport": 80, "communityid": "1:LQU9qZlK+B5F3KDmev6m5PMibrg="}, {"proto": 6, "saddr": "66.35.250.204", "daddr": "128.232.110.120", "sport": 80, "dport": 34855, "communityid": "1:LQU9qZlK+B5F3KDmev6m5PMibrg="}]\n')

def test_communityid_pcapfilter(self):
out = self._check_output_community_id_pcapfilter(['--filter', '1:p78FQ5Gn8XFgjlKgugj92+uTUDk=', '--output', 'output.pcap', self._testfilepath('tcp_multi.pcap')])
self.assertEqual(out, 'f46ba2303318c400c257c08a2b70f412fc307694ede788baa96142b118b28a94')

def test_communityid_tcpdump(self):
# This uses subprocess.check_output(..., input=...) which was added in 3.4:
if sys.version_info[0] < 3 or sys.version_info[1] < 4:
Expand All @@ -424,6 +445,5 @@ def test_communityid_tcpdump(self):
first_line = out.decode('ascii').split('\n')[0].strip()
self.assertEqual(first_line, '1071580904.891921 IP 1:LQU9qZlK+B5F3KDmev6m5PMibrg= 128.232.110.120:34855 > 66.35.250.204.80: Flags [S], seq 3201037957, win 5840, options [mss 1460,sackOK,TS val 87269134 ecr 0,nop,wscale 0], length 0')


if __name__ == '__main__':
unittest.main()
Binary file added tests/tcp_multi.pcap
Binary file not shown.

0 comments on commit 7bf48a8

Please sign in to comment.