From 45dae4acd6b9277b546cb4603460b66aaceabd49 Mon Sep 17 00:00:00 2001 From: Nils Weiss Date: Thu, 24 Aug 2017 07:58:53 -0400 Subject: [PATCH] add CAN support --- scapy/layers/can.py | 133 ++++++++++++++++++++++++++++++++++++++++++++ test/can.uts | 91 ++++++++++++++++++++++++++++++ 2 files changed, 224 insertions(+) create mode 100644 scapy/layers/can.py create mode 100644 test/can.uts diff --git a/scapy/layers/can.py b/scapy/layers/can.py new file mode 100644 index 0000000..2d5757a --- /dev/null +++ b/scapy/layers/can.py @@ -0,0 +1,133 @@ +#! /usr/bin/env python + +## This file is part of Scapy +## See http://www.secdev.org/projects/scapy for more informations +## Copyright (C) Nils Weiss +## This program is published under a GPLv2 license + +""" +CANSocket. +""" +from scapy.packet import * +from scapy.fields import * +from scapy.supersocket import SuperSocket +from scapy.sendrecv import sndrcv +from scapy.arch.linux import get_last_packet_timestamp + +############ +## Consts ## +############ +CAN_FRAME_SIZE = 16 +LINKTYPE_CAN_SOCKETCAN = 227 # From pcap spec + + +class CAN(Packet): + name = 'SocketCAN' + fields_desc = [ + XIntField('id', 0), + PadField(FieldLenField('dlc', None, length_of='data', fmt='B'), 4), + PadField(StrLenField('data', '', length_from=lambda pkt: min(pkt.dlc, 8)), 8) + ] + + def extract_padding(self, p): + return '', p + + +class CANSocket(SuperSocket): + desc = "read/write packets at a given CAN interface using PF_CAN sockets" + can_frame_fmt = " 0: + return a[0][1] + else: + return None + + def sniff(self, *args, **kargs): + return sniff(opened_socket=self, *args, **kargs) + + +@conf.commands.register +def srcan(pkt, iface=None, receive_own_messages=False, filter=None, nofilter=0, *args, **kargs): + if not "timeout" in kargs: + kargs["timeout"] = -1 + s = conf.CANSocket(iface, receive_own_messages, filter, nofilter) + a, b = sndrcv(s, pkt, *args, **kargs) + s.close() + return a, b + + +@conf.commands.register +def srcanloop(pkts, *args, **kargs): + """Send a packet at can layer in loop and print the answer each time +srloop(pkts, [prn], [inter], [count], ...) --> None""" + return scapy.sendrecv.__sr_loop(srcan, pkts, *args, **kargs) + + +conf.l2types.register(LINKTYPE_CAN_SOCKETCAN, CAN) +conf.CANiface = "can0" +conf.CANSocket = CANSocket diff --git a/test/can.uts b/test/can.uts new file mode 100644 index 0000000..629e4b9 --- /dev/null +++ b/test/can.uts @@ -0,0 +1,91 @@ +# CAN unit tests +# +# Type the following command to launch start the tests: +# $ sudo bash test/run_tests -t test/can.uts -F + +% CAN unit tests + ++ Configuration of scapy3 + += Load CAN_addon +~ conf command +from scapy.layers.can import CAN, CANSocket, srcan + += Setup string for vcan +~ conf command +bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'" + += Load os +~ conf command +import os +import threading +from time import sleep + += Setup vcan0 +~ conf command +0 == os.system(bashCommand) + ++ Basic Packet Tests() += CAN Packet init + +canframe = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' + += DLC greater than 8 +canframe = CAN(id=0x7ff,dlc=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +print(len(canframe.data)) +canframe.dlc = len(canframe.data) +bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08' + ++ Basic Socket Tests() += CAN Socket Init + +sock1 = CANSocket(iface="vcan0") + += CAN Socket send recv + +def sender(): + sleep(0.1) + sock2 = CANSocket(iface="vcan0") + sock2.send(CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')) + +thread = threading.Thread(target=sender) +thread.start() + +rx = sock1.recv() +rx == CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') + += CAN Socket sr1 + +tx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +tx.sent_time == 0 +thread = threading.Thread(target=sender) +thread.start() +rx = None +rx = sock1.sr1(tx) +tx == rx +tx.sent_time > rx.time +rx.time > 0 + += srcan + +tx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +tx.sent_time == 0 +thread = threading.Thread(target=sender) +thread.start() +rx = None +rx = srcan(tx, "vcan0", timeout=1) +rx = rx[0][0][1] +tx == rx +tx.sent_time > rx.time +rx.time > 0 + + ++ PCAP CAN Tests() += Write pcap file + +rx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08') +wrpcap('/tmp/scapyPcapTest.pcap', rx, append=False) +readPack = rdpcap('/tmp/scapyPcapTest.pcap', 1) +rx == readPack[0] +