CaringCaribou / caringcaribou

A friendly car security exploration tool for the CAN bus
GNU General Public License v3.0
738 stars 193 forks source link

Add standalone mock ECU module #37

Closed kasperkarlsson closed 8 months ago

kasperkarlsson commented 6 years ago

It would be nice to be able to simply start a mock ECU directly through a separate module for testing, e.g. ./cc.py -i vcan0 mock uds to start a UDS mock ECU on interface vcan0. This could prove useful when playing around, learning new protocols or developing new modules.

polybassa commented 4 years ago

Hello, I'm a contributor of Scapy and working on automotive related features for it. I've implemented a mechanism to simulate the UDS behaviour of an ECU in Scapy. Most of my work is documented here: https://scapy.readthedocs.io/en/latest/layers/automotive.html

How to simulate an ECU can be seen in some of my unit tests. A list of supported responses can be defined https://github.com/secdev/scapy/blob/876d9b86f96df6df532a50303f76fad05d2a0539/test/contrib/automotive/ecu_am.uts#L288-L298 and forwarded to a ECU_am (ECU answering machine) object https://github.com/secdev/scapy/blob/876d9b86f96df6df532a50303f76fad05d2a0539/test/contrib/automotive/ecu_am.uts#L302-L303

I would be happy, if you found some of my work useful.

kasperkarlsson commented 4 years ago

Cool! Thanks a lot @polybassa - I'm sure this will indeed prove useful :+1:

polybassa commented 4 years ago

Thanks for your answer. I would be glad, if you can give me feedback, once you used Scapy.

vk-gst commented 4 years ago

Did someone manage to get the mock ECU working? I get a name error when I try to reference the ECU() object. Is this issue known? This is the bare minimum code that I was trying to run using python:

from scapy.all import *

load_contrib("isotp")
load_contrib("automotive.uds")
load_layer("can")
load_contrib("automotive.ecu")

ecu = ECU()  #NameError: name 'ECU' is not defined
assert ecu.current_session == 1
assert ecu.current_security_level == 0
assert ecu.communication_control == 0

The end result, I want to have a mock ECU with some UDS services available. This mock ECU, I want to scan for the UDS services using CaringCaribou on a virtual CAN interface. Is this possible? How should I proceed with this one?

polybassa commented 4 years ago

Hi, Thanks for your question. Could you provide a full debug output. I can't explain myself, why this is not working.

load_layer("can")
conf.contribs['CAN']['swap-bytes'] = False
import os, threading, six, subprocess, sys
from subprocess import call
from scapy.consts import LINUX

iface0 = "vcan0"
iface1 = "vcan1"

ISOTP_KERNEL_MODULE_AVAILABLE = False
def exit_if_no_isotp_module():
    if not ISOTP_KERNEL_MODULE_AVAILABLE:
        sys.stderr.write("TEST SKIPPED: can-isotp not available" + os.linesep)
        warning("Can't test ISOTP native socket because kernel module is not loaded")
        exit(0)

if 0 != call(["cansend", iface0,  "000#"]):
    # vcan0 is not enabled
    if 0 != call(["sudo", "modprobe", "vcan"]):
        raise Exception("modprobe vcan failed")
    if 0 != call(["sudo", "ip", "link", "add", "name", iface0, "type", "vcan"]):
        print("add %s failed: Maybe it was already up?" % iface0)
    if 0 != call(["sudo", "ip", "link", "set", "dev", iface0, "up"]):
        raise Exception("could not bring up %s" % iface0)

if 0 != call(["cansend", iface0,  "000#"]):
    raise Exception("cansend doesn't work")

if 0 != call(["cansend", iface1,  "000#"]):
    # vcan1 is not enabled
    if 0 != call(["sudo", "modprobe", "vcan"]):
        raise Exception("modprobe vcan failed")
    if 0 != call(["sudo", "ip", "link", "add", "name", iface1, "type", "vcan"]):
        print("add %s failed: Maybe it was already up?" % iface1)
    if 0 != call(["sudo", "ip", "link", "set", "dev", iface1, "up"]):
        raise Exception("could not bring up %s" % iface1)

if 0 != call(["cansend", iface1,  "000#"]):
    raise Exception("cansend doesn't work")

print("CAN should work now")

if six.PY3 and LINUX:
    from scapy.contrib.cansocket_native import *
    new_can_socket = lambda iface: CANSocket(iface)
    new_can_socket0 = lambda: CANSocket(iface0)
    new_can_socket1 = lambda: CANSocket(iface1)

s = new_can_socket(iface0)
s.close()

p1 = subprocess.Popen(['lsmod'], stdout = subprocess.PIPE)
p2 = subprocess.Popen(['grep', '^can_isotp'], stdout = subprocess.PIPE, stdin=p1.stdout)
p1.stdout.close()
if p1.wait() == 0 and p2.wait() == 0 and b"can_isotp" in p2.stdout.read():
    p = subprocess.Popen(["isotpsend", "-s1", "-d0", iface0], stdin = subprocess.PIPE)
    p.communicate(b"01")
    if p.returncode == 0:
        ISOTP_KERNEL_MODULE_AVAILABLE = True

conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': ISOTP_KERNEL_MODULE_AVAILABLE}
load_contrib("isotp")

from scapy.contrib.isotp import ISOTPNativeSocket
ISOTPSocket = ISOTPNativeSocket
assert ISOTPSocket == ISOTPNativeSocket

load_contrib('automotive.uds')
load_contrib('automotive.ecu')

example_responses = \
    [ECUResponse(session=2, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
     ECUResponse(session=range(3,5), security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
     ECUResponse(session=[5,6,7], security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
     ECUResponse(session=lambda x: 8 < x <= 10, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))]

success = False

with new_can_socket0() as isocan1, ISOTPSocket(isocan1, sid=0x700, did=0x600, basecls=UDS) as ecu, \
        new_can_socket0() as isocan2, ISOTPSocket(isocan2, sid=0x600, did=0x700, basecls=UDS) as tester:
    answering_machine = ECU_am(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
    sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
    sim.start()
    # Simulator is running for 60s, see timeout parameter
    time.sleep(60)
    # Kill SIMULATOR
    tester.send(UDS(service=0xff))
    sim.join(timeout=10)

The ECU simulator is parametrized through a list of ECUResponses. The order of this list is important. As soon as the answering machine finds a response that fits to a request, the response is sent. This means, a more detailed response has to be on top of a general response. This code shows two ISOTPSockets, one acts as ECU socket, the other as Tester socket. You can also create your own Tester socket and send commands via caringcaribou. The kwargs of the ECU_am object are important for the runtime behavior. You can specify timeout, count or stop_filter to determine, when the Simulator should be killed.

polybassa commented 4 years ago

Here is some code from a unit test file, I'm using: https://github.com/secdev/scapy/blob/master/test/contrib/automotive/ecu_am.uts The code in the beginning is to setup the test system. This should run without problems on a system with Linux, python3 and Hartkopps ISOTP Kernel modul loaded (check lsmod)

vk-gst commented 4 years ago

I gave another try to use automotive.ecu from the scapy interface directly and this is the behaviour:

>>> load_contrib('automotive.uds')                                                                                         
INFO: Specify "conf.contribs['UDS'] = {'treat-response-pending-as-answer': True}" to treat a negative response 'requestCorrectlyReceived-ResponsePending' as answer of a request. 
The default value is False.
>>> load_contrib('automotive.ccp')                                                                                         
>>> load_contrib('automotive.ecu')                                                                                         
ERROR: Loading module scapy.layers.automotive.ecu
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/scapy/main.py", line 188, in load_contrib
    importlib.import_module("scapy.contrib." + name)
  File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'scapy.contrib.automotive.ecu'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/scapy/main.py", line 143, in _load
    mod = importlib.import_module(module)
  File "/usr/lib/python3.6/importlib/__init__.py", line 126, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 941, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "<frozen importlib._bootstrap>", line 994, in _gcd_import
  File "<frozen importlib._bootstrap>", line 971, in _find_and_load
  File "<frozen importlib._bootstrap>", line 953, in _find_and_load_unlocked
ModuleNotFoundError: No module named 'scapy.layers.automotive'
>>>         
vk-gst commented 4 years ago

Hi, Thanks for your question. Could you provide a full debug output. I can't explain myself, why this is not working.

load_layer("can")
conf.contribs['CAN']['swap-bytes'] = False
import os, threading, six, subprocess, sys
from subprocess import call
from scapy.consts import LINUX

iface0 = "vcan0"
iface1 = "vcan1"

ISOTP_KERNEL_MODULE_AVAILABLE = False
def exit_if_no_isotp_module():
    if not ISOTP_KERNEL_MODULE_AVAILABLE:
        sys.stderr.write("TEST SKIPPED: can-isotp not available" + os.linesep)
        warning("Can't test ISOTP native socket because kernel module is not loaded")
        exit(0)

if 0 != call(["cansend", iface0,  "000#"]):
    # vcan0 is not enabled
    if 0 != call(["sudo", "modprobe", "vcan"]):
        raise Exception("modprobe vcan failed")
    if 0 != call(["sudo", "ip", "link", "add", "name", iface0, "type", "vcan"]):
        print("add %s failed: Maybe it was already up?" % iface0)
    if 0 != call(["sudo", "ip", "link", "set", "dev", iface0, "up"]):
        raise Exception("could not bring up %s" % iface0)

if 0 != call(["cansend", iface0,  "000#"]):
    raise Exception("cansend doesn't work")

if 0 != call(["cansend", iface1,  "000#"]):
    # vcan1 is not enabled
    if 0 != call(["sudo", "modprobe", "vcan"]):
        raise Exception("modprobe vcan failed")
    if 0 != call(["sudo", "ip", "link", "add", "name", iface1, "type", "vcan"]):
        print("add %s failed: Maybe it was already up?" % iface1)
    if 0 != call(["sudo", "ip", "link", "set", "dev", iface1, "up"]):
        raise Exception("could not bring up %s" % iface1)

if 0 != call(["cansend", iface1,  "000#"]):
    raise Exception("cansend doesn't work")

print("CAN should work now")

if six.PY3 and LINUX:
    from scapy.contrib.cansocket_native import *
    new_can_socket = lambda iface: CANSocket(iface)
    new_can_socket0 = lambda: CANSocket(iface0)
    new_can_socket1 = lambda: CANSocket(iface1)

s = new_can_socket(iface0)
s.close()

p1 = subprocess.Popen(['lsmod'], stdout = subprocess.PIPE)
p2 = subprocess.Popen(['grep', '^can_isotp'], stdout = subprocess.PIPE, stdin=p1.stdout)
p1.stdout.close()
if p1.wait() == 0 and p2.wait() == 0 and b"can_isotp" in p2.stdout.read():
    p = subprocess.Popen(["isotpsend", "-s1", "-d0", iface0], stdin = subprocess.PIPE)
    p.communicate(b"01")
    if p.returncode == 0:
        ISOTP_KERNEL_MODULE_AVAILABLE = True

conf.contribs['ISOTP'] = {'use-can-isotp-kernel-module': ISOTP_KERNEL_MODULE_AVAILABLE}
load_contrib("isotp")

from scapy.contrib.isotp import ISOTPNativeSocket
ISOTPSocket = ISOTPNativeSocket
assert ISOTPSocket == ISOTPNativeSocket

load_contrib('automotive.uds')
load_contrib('automotive.ecu')

example_responses = \
    [ECUResponse(session=2, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
     ECUResponse(session=range(3,5), security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=3) / Raw(b"deadbeef2")),
     ECUResponse(session=[5,6,7], security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=5) / Raw(b"deadbeef3")),
     ECUResponse(session=lambda x: 8 < x <= 10, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=9) / Raw(b"deadbeef4"))]

success = False

with new_can_socket0() as isocan1, ISOTPSocket(isocan1, sid=0x700, did=0x600, basecls=UDS) as ecu, \
        new_can_socket0() as isocan2, ISOTPSocket(isocan2, sid=0x600, did=0x700, basecls=UDS) as tester:
    answering_machine = ECU_am(supported_responses=example_responses, main_socket=ecu, basecls=UDS)
    sim = threading.Thread(target=answering_machine, kwargs={'timeout': 60, 'stop_filter': lambda p: p.service==0xff})
    sim.start()
    # Simulator is running for 60s, see timeout parameter
    time.sleep(60)
    # Kill SIMULATOR
    tester.send(UDS(service=0xff))
    sim.join(timeout=10)

The ECU simulator is parametrized through a list of ECUResponses. The order of this list is important. As soon as the answering machine finds a response that fits to a request, the response is sent. This means, a more detailed response has to be on top of a general response. This code shows two ISOTPSockets, one acts as ECU socket, the other as Tester socket. You can also create your own Tester socket and send commands via caringcaribou. The kwargs of the ECU_am object are important for the runtime behavior. You can specify timeout, count or stop_filter to determine, when the Simulator should be killed.

I tried running the above script as well, with can-isotp loaded. Here is the error I get :

CAN should work now
Traceback (most recent call last):
  File "ecu_test.py", line 74, in <module>
    [ECUResponse(session=2, security_level=0, responses=UDS() / UDS_RDBIPR(dataIdentifier=2) / Raw(b"deadbeef1")),
NameError: name 'ECUResponse' is not defined

I think for some reason, my scapy does not load the automotive.ecu package. Any idea?

polybassa commented 4 years ago

Yes, might be, that this code is not yet part of the latest release. Could you try to pull from github and install manually? https://github.com/secdev/scapy.git Maybe you have to remove scapy from your site-packages manually, in order to ensure, the correct version is started.

polybassa commented 4 years ago

Hi, did you had success?

vk-gst commented 4 years ago

@polybassa Not really. I did not proceed with the implementation. I found some alternatives here : https://github.com/zombieCraig/UDSim and https://github.com/zombieCraig/uds-server

I was wanting to ask you as well these things :

  1. I have some diagnostics logs from a real car that is in workshop. There are a couple of requests and responses. Would it be possible to provide these files to the ECU module in scapy and then it can simulate the real ECU. ?

  2. What other features are provided by the ECU module, apart from the UDS functionality?

polybassa commented 4 years ago

Hi,

  1. Yes, this was the original use case of the ECU module. Hope the documentation helps: https://scapy.readthedocs.io/en/latest/layers/automotive.html#ecu-utility-examples A list of supported responses can be used as input for a ECU_am

  2. The ECU module is basically protocol independent. Everything that works for UDS also works for GMLAN

kasperkarlsson commented 8 months ago

Closing this issue, as standalone mock ECUs are more suitable to keep in separate projects.