Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add CAN support for Automotive Penetration Testing #189

Merged
merged 1 commit into from
Aug 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions scapy/layers/can.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
#! /usr/bin/env python

## This file is part of Scapy
## See http://www.secdev.org/projects/scapy for more informations
## Copyright (C) Nils Weiss <[email protected]>
## This program is published under a GPLv2 license

"""
CANSocket.
"""
from scapy.packet import *
from scapy.fields import *
from scapy.supersocket import SuperSocket
from scapy.sendrecv import sndrcv
from scapy.arch.linux import get_last_packet_timestamp

############
## Consts ##
############
CAN_FRAME_SIZE = 16
LINKTYPE_CAN_SOCKETCAN = 227 # From pcap spec


class CAN(Packet):
name = 'SocketCAN'
fields_desc = [
XIntField('id', 0),
PadField(FieldLenField('dlc', None, length_of='data', fmt='B'), 4),
PadField(StrLenField('data', '', length_from=lambda pkt: min(pkt.dlc, 8)), 8)
]

def extract_padding(self, p):
return '', p


class CANSocket(SuperSocket):
desc = "read/write packets at a given CAN interface using PF_CAN sockets"
can_frame_fmt = "<IB3x8s"

def __init__(self, iface=None, receive_own_messages=False, filter=None, nofilter=0):
if iface is None:
iface = conf.CANiface
self.ins = socket.socket(socket.PF_CAN, socket.SOCK_RAW, socket.CAN_RAW)

try:
self.ins.setsockopt(socket.SOL_CAN_RAW, socket.CAN_RAW_RECV_OWN_MSGS,
struct.pack('i', receive_own_messages))
except Exception as e:
Scapy_Exception("Could not receive own messages (%s)", e)

if filter is None or nofilter == 0:
filter = [{
'can_id': 0,
'can_mask': 0
}]

can_filter_fmt = "={}I".format(2 * len(filter))
filter_data = []
for can_filter in filter:
filter_data.append(can_filter['can_id'])
filter_data.append(can_filter['can_mask'])

self.ins.setsockopt(socket.SOL_CAN_RAW,
socket.CAN_RAW_FILTER,
struct.pack(can_filter_fmt, *filter_data)
)

self.ins.bind((iface,))
self.outs = self.ins

def recv(self, x=CAN_FRAME_SIZE):
# Fetching the Arb ID, DLC and Data
try:
pkt, sa_ll = self.ins.recvfrom(x)
except BlockingIOError:
warning('Captured no data, socket in non-blocking mode.')
return None
except socket.timeout:
warning('Captured no data, socket read timed out.')
return None
except OSError:
# something bad happened (e.g. the interface went down)
warning("Captured no data.")
return None

can_id, can_dlc, data = struct.unpack(self.can_frame_fmt, pkt)

q = CAN(id=can_id, dlc=can_dlc, data=data[:can_dlc])
q.time = get_last_packet_timestamp(self.ins)
return q

def send(self, x):
can_dlc = len(x.data)
data = x.data.ljust(8, b'\x00')
sx = struct.pack(self.can_frame_fmt, x.id, can_dlc, data)
if hasattr(x, "sent_time"):
x.sent_time = time.time()
return self.outs.send(sx)

def sr(self, *args, **kargs):
return sndrcv(self, *args, **kargs)

def sr1(self, *args, **kargs):
a, b = sndrcv(self, *args, **kargs)
if len(a) > 0:
return a[0][1]
else:
return None

def sniff(self, *args, **kargs):
return sniff(opened_socket=self, *args, **kargs)


@conf.commands.register
def srcan(pkt, iface=None, receive_own_messages=False, filter=None, nofilter=0, *args, **kargs):
if not "timeout" in kargs:
kargs["timeout"] = -1
s = conf.CANSocket(iface, receive_own_messages, filter, nofilter)
a, b = sndrcv(s, pkt, *args, **kargs)
s.close()
return a, b


@conf.commands.register
def srcanloop(pkts, *args, **kargs):
"""Send a packet at can layer in loop and print the answer each time
srloop(pkts, [prn], [inter], [count], ...) --> None"""
return scapy.sendrecv.__sr_loop(srcan, pkts, *args, **kargs)


conf.l2types.register(LINKTYPE_CAN_SOCKETCAN, CAN)
conf.CANiface = "can0"
conf.CANSocket = CANSocket
91 changes: 91 additions & 0 deletions test/can.uts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# CAN unit tests
#
# Type the following command to launch start the tests:
# $ sudo bash test/run_tests -t test/can.uts -F

% CAN unit tests

+ Configuration of scapy3

= Load CAN_addon
~ conf command
from scapy.layers.can import CAN, CANSocket, srcan

= Setup string for vcan
~ conf command
bashCommand = "/bin/bash -c 'sudo modprobe vcan; sudo ip link add name vcan0 type vcan; sudo ip link set dev vcan0 up'"

= Load os
~ conf command
import os
import threading
from time import sleep

= Setup vcan0
~ conf command
0 == os.system(bashCommand)

+ Basic Packet Tests()
= CAN Packet init

canframe = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08'

= DLC greater than 8
canframe = CAN(id=0x7ff,dlc=9,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
print(len(canframe.data))
canframe.dlc = len(canframe.data)
bytes(canframe) == b'\x00\x00\x07\xff\x08\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08'

+ Basic Socket Tests()
= CAN Socket Init

sock1 = CANSocket(iface="vcan0")

= CAN Socket send recv

def sender():
sleep(0.1)
sock2 = CANSocket(iface="vcan0")
sock2.send(CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08'))

thread = threading.Thread(target=sender)
thread.start()

rx = sock1.recv()
rx == CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')

= CAN Socket sr1

tx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
tx.sent_time == 0
thread = threading.Thread(target=sender)
thread.start()
rx = None
rx = sock1.sr1(tx)
tx == rx
tx.sent_time > rx.time
rx.time > 0

= srcan

tx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
tx.sent_time == 0
thread = threading.Thread(target=sender)
thread.start()
rx = None
rx = srcan(tx, "vcan0", timeout=1)
rx = rx[0][0][1]
tx == rx
tx.sent_time > rx.time
rx.time > 0


+ PCAP CAN Tests()
= Write pcap file

rx = CAN(id=0x7ff,dlc=8,data=b'\x01\x02\x03\x04\x05\x06\x07\x08')
wrpcap('/tmp/scapyPcapTest.pcap', rx, append=False)
readPack = rdpcap('/tmp/scapyPcapTest.pcap', 1)
rx == readPack[0]