pylessard / python-udsoncan

Python implementation of UDS (ISO-14229) standard.
MIT License
561 stars 194 forks source link

Read Data by Identifier as an unformatted raw content dump? #152

Closed jmccorm closed 1 year ago

jmccorm commented 1 year ago

I have an existing shell script where you provide the name (address) of a module, and you provide an identifier, and it prints the contents of that location as a series of hexadecimal characters. (No matter what the location, it's encoding, or it's length, it simply provides a raw hex output of it's contents.) For reference, I recently republished it here:

https://old.reddit.com/r/CarHacking/comments/11rgykl/a_gift_from_the_wrangler_community_for_other/

So it's general purpose. You don't have to specify how the data is encoded. You just receive the data, and from there you can work with it as you wish and use other tools to manipulate it into a workable format. (Basically, in the spirit of your standard UNIX tool which does something small and can be chained with other things.)

How can/would I do this with python-udsoncan?

PS: I was so happy to make my first SocketCAN / ISO-TP / UDSonCAN connection to pull the VIN (0xF190) from my BCM. I'm hoping to convert everything we've be doing so far with the Jeep Wrangler JL to take advantage of your modules!

jmccorm commented 1 year ago

Here's my existing Python code...

`#!/usr/bin/python3

import sys import os import time import can import isotp import udsoncan from udsoncan.client import Client from udsoncan.connections import PythonIsoTpConnection

def error(): print("N/A") sys.exit(1)

def initialize(module, identifier): module_info = { "bcm": (0x620, 0x504, "can1"), "sccm": (0x763, 0x4E3, "can1"), "radio": (0x7BF, 0x53F, "can0"), "ipcm": (0x742, 0x4C2, "can1"), "evic": (0x742, 0x4C2, "can1"), "hvac": (0x783, 0x503, "can0"), }

tx_id, rx_id, bus_name = module_info.get(module.lower())
if tx_id is None:
    print(f"INVALID MODULE SPECIFIED: {module}")
    sys.exit(99)

# CONFIGURE UDS WITH DATA IDENTIFIER
config = {
    'data_identifiers': {
        0xF190: udsoncan.AsciiCodec(17)
    }
}

bus = can.interface.Bus(channel=bus_name, bustype='socketcan')

tp_addr = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=tx_id, rxid=rx_id)
isotp_params = {
    'stmin': 0,  # Minimum separation time between frames
    'blocksize': 0,  # Number of frames per block
}
stack = isotp.CanStack(bus=bus, address=tp_addr, params=isotp_params)
conn = PythonIsoTpConnection(stack)
conn.open()
client = Client(conn, request_timeout=8,config=config)

try:
    response = client.read_data_by_identifier_first(identifier)
    if response is not None:
        print("SUCCESS")
        print(f"Req  for Identifier 0x{identifier:x}: {response}")
        print(" ".join(f"{b:02X}" for b in response.service_data.values[0].data))
        return 0
except Exception as e:
    print("UNKNOWN RESPONSE")
    return 99

if len(sys.argv) < 3: print("Usage: script.py MODULE IDENTIFIER") sys.exit(1)

module = sys.argv[1] identifier = int(sys.argv[2], 16) initialize(module, identifier) `

Here's some sample output: /tmp# ./rid.py radio f190 SUCCESS Req for Identifier 0xf190: 1C4HJXFN6MW756530 UNKNOWN RESPONSE /tmp# ./rid.py radio f191 [ConfigError] : Actual data identifier configuration contains no definition for data identifier 0xf191 [ConfigError] : Actual data identifier configuration contains no definition for data identifier 0xf191 UNKNOWN RESPONSE /tmp#

(Ignore the "UNKNOWN RESPONSE".) Basically, what I need is, regardless of DID, for it to print a raw hex dump of the output.

pylessard commented 1 year ago

It's doable, but the end result may not respect the standard. Here's why. The length of the DID data is never transmitted over UDS. It is assumed by the server and the device. It is also possible to read multiple DIDs at once. Therefore, the prior knowledge of the DID size is required to parse the response correctly.

I'm sure you can see the problem right away. Either you pass the DID size as an input parameter of your program, or you read one DID at the time and assume that the device is not giving you extra bytes; in that case, you can print the response payload. The latter is the part that is "non-standard".

Basically, you need a Codec that does 2 things. Pass the data as is (no interpretation) and reads the whole payload.

  1. This example show you how to read a DID. Make a RawCodec class that simply do nothing.

  2. To read the whole payload, there's a non-standard trick. See here. The trick is to raise DidCodec.ReadAllRemainingData in the codec len function.

Cheers

pylessard commented 1 year ago

Another approach, maybe more suited for your use case, would be to not use the client object and interract directly with the service level and the connection (since you seem to want to process the data outside of python).

This example should be useful to you.

Since you want to craft a request but not interpret the resposne (which the client does), you could do something like :

request = services.ReadDataByIdentifier.make_request(0x1234, my_didconfig)
my_connection.send(request)
response = my_connection.wait_frame(timeout=1, exception=False)
if response:
    print("Gotten data : %s" % response.data)

In the above example, didconfig is the same config used by the client. The didconfig is not necessary to craft the request, it is just validated to fail early instead of sending a request and waiting for a response to then fail. I should have made that parameter optional.. you can put any valid configuration that won't affect the request.

You could even be more clever and use the client send_request method. All client method does the same thing. It calls : service.make_request, client.send_request, service.interpret_response + do some sanity checks. Meaning, something like that would work.

request = services.ReadDataByIdentifier.make_request(0x1234, my_didconfig)
try:
    response = client.send_request(request)
    print("Gotten data : %s" % response.data)
except TimeoutException as e:
    print("No response")
except NegativeResponseException as e:
    print("server said no. Payload : " + str(e.response.data))
except InvalidResponseException as e:
    print("Server is drunk. Payload: " + str(e.response.data))
except UnexpectedResponseException:    # Only happen if you use the client methods. `send_request` will never raise this.
    pass

You may have notice the usage of the exception. I wanted to show it to you because you use none in your above code. :) The client raises exception. You can disable them with the configuration, bu you still need to check yourself if the response is valid/negative/unexpected. The response will only be None if you use SuppressPositiveResponse.

TimeoutException cannot be disabled in the client.

Ok, that's enough. Have fun

jmccorm commented 1 year ago

I'll admit, I'm a less than stellar programmer. But at first blush, I think you've given me what I'm looking for... and then gone the extra mile to give me even more! Thank you.

A question, since I have your attention. I'm not at that point just yet, but If I find myself over my head with udsoncan or can-isotp issues, do you (or do your know someone) that'll do minor works-for-hire with these modules? I'm doing some reverse-engineering work on the Wrangler JL as a hobby, but I don't know Python. If I end up reaching a dead-end, I wouldn't mind seeking a higher power with dollars in hand. (Assuming such a statement does not violate GitHub TOS or anything.)

Thanks again, Josh McCormick

pylessard commented 1 year ago

Alright. You can ping me here. I won't mind helping you a bit or answering questions/doing code review. As for the hiring aspect. I'm no against, but look.. I spent many many months writing this codes for free, I can answer questions for free. If you ask for too much, I'll let you know.

This being said, I will kindly ask that you still do an effort on your side. Many people reach out to me without having done their homework. I have very little patience for that.

As for how to use all of this. I suggest you go through the doc from beginning to end. What needs to be understood if that everything works in layer. udsoncan is on top of isotp and this is on top of the CAN layer (python-can, socketcan). Each layer can be configured.

Your application logic comes on top of UDS. I tried to built the udsoncan project in 3 levels

  1. request/response : raw data
  2. Services : Meaningful messages
  3. Client : Meaningful message, data validation and user-friendly shortcuts.

You can hook at the level you want. The client is the nicest.

jmccorm commented 1 year ago

You're a good man, thank you! I'll put in the work with what you've given me so far, and I'll let you know if I need a bit more help.

BTW, I thought you should know, I've had some mixed success using ChatGPT to write code with your modules. I actually used ChatGPT (in the example above) to convert my BASH script into code which at least reads the VIN from my BCM. ChatGPT version 3.5 (and the new 4.0) are both aware of your module. Here's the writing prompt I've been having some success with. Unfortunately, much of the time, ChatGPT gets confused how some of the various parts of your modules fit together.

Anyhow, I thought you'd find this interesting. Here's my writing prompt...

For now on, you will always write to the following standard unless told otherwise:

The target vehicle is a Jeep Wrangler JL which has two CAN buses: can0 (CAN-IHS) and can1 (CAN-C). Its CAN data is big-endian. Please write Python 3.9.2 scripts that use the Python-CAN module version 4.1.0, the can-isotp module version 1.8, and the can-isotp module 1.8 as needed. Configure for a SocketCAN interface and use a Raspberry Pi environment. The CAN data is big-endian and the CAN bus has an 11 bit ID. Use the set_filters() method for setting a CAN filter and set timeout=0.001 for recv() calls unless specified otherwise. Do not specify channels or bitrate unless necessary and avoid using the channel_info attribute. Please ask for clarifications when needed. Always begin Python3 scripts with #!/usr/bin/python3.

Do you understand this directive (YES or NO)?

jmccorm commented 1 year ago

I thought of another way around the problem. Let's keep on using the VIN as an example. A normal (healthy) DID 0xF190 request from my BCM would look like this (candump output):

can1 620 [8] 03 22 F1 90 00 00 00 00 can1 504 [8] 10 14 62 F1 90 31 43 34 can1 620 [8] 30 08 20 00 00 00 00 00 can1 504 [8] 21 48 4A 58 46 4E 36 4D can1 504 [8] 22 57 37 35 36 35 33 30

So I thought, maybe if I keep on using iso-tp but don't use udsoncan for this, it might work out better? So I tried this piece of code...

    bus = can.interface.Bus('can1', bustype='socketcan', can_filters=can_filters)
    address = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=txid, rxid=rxid)
    stack = isotp.CanStack(bus=bus, address=address)

    # Send the UDS Request Data by Identifier message with padding
    uds_request = bytearray([0x22, (identifier >> 8) & 0xFF, identifier & 0xFF ])
    stack.send(uds_request)

But when I did, my CAN bus message looked like this, which my ECU ignored:

can1 620 [4] 03 22 F1 90

So I decided to pad things with zeros... uds_request = bytearray([0x22, (identifier >> 8) & 0xFF, identifier & 0xFF, 0x00, 0x00, 0x00, 0x00])

That produced the following CAN bus message which my ECU also ignored:

can1 620 [8] 07 22 F1 90 00 00 00 00

So I know that I should be telling isotp to always pad my messages so that they're eight bytes long, and I know that there's got to be an option for it, but I'm just not finding it. What am I overlooking here?

I need to be producing THIS, and I want to use ISO-TP so that I can handle the long responses from the ECU:

can1 620 [8] 03 22 F1 90 00 00 00 00

pylessard commented 1 year ago

Yes. If you are interested in the raw payload and read 1 DID at the time, you can skip the UDS layer and hook directly on Isotp layer.

The CanStack accepts an optional Config dict. All here : https://can-isotp.readthedocs.io/en/latest/isotp/implementation.html#parameters

You want tx_data_min_length and tx_padding

pylessard commented 1 year ago

I'm thinking about your use case. You basically want the isotp stack and the UDS logic in 2 separate program and pipe them with bash.

request_vin | isotp_send.py | vin_decode

or more generically :

request_did.py F190 | isotp_send.py | did_decode.py

You can achieve that

# wrangle_did_config.py
from udsoncan import AsciiCodec
didconfig = {
 0xF190 : AsciiCodec(15)
}
# request_did.py

from wrangle_did_config import didconfig
from udsoncan.services import ReadDataByIdentifier
import binascii

# ... Parse stdin

request  =ReadDataByIdentifier.make_request(did_number, didconfig)
payload_ascii = binascii.hexlify(request.get_payload()).decode('ascii')
printf(payload_ascii )
#did_decode.py

from wrangle_did_config import didconfig
from udsoncan.services import ReadDataByIdentifier
import binascii

# ... Parse stdin
response_payload = binascii.unhexlify(input_payload_ascii)
response = udsoncan.Response.from_payload(response_payload)
ReadDataByIdentifier.interpret_response(response, didconfig)
for did in response.service_data.values:
    printf("0x%04x: %s" % (did,  response.service_data.values[did]))    # Need to be convertible to string
#isotp_send.py

from isotp import CanStack
# ... setup you can stack
# parse stdin
data= binascii.unhexlify(input_payload_ascii)
stack.send(data)
while stack.transmitting():
   stack.process()
   time.sleep(stack.sleep_time())

if stack.available():
    payload = app.stack.recv()
    payload_ascii = binascii.hexlify(payload).decode('ascii')
    print(payload_ascii )

I haven't tested this piece of code, but you get the idea. You can even replace the isotp_Send.py by the linux utility isotp_send written by @hartkopp.

hartkopp commented 1 year ago

But when I did, my CAN bus message looked like this, which my ECU ignored:

can1 620 [4] 03 22 F1 90

So I decided to pad things with zeros... uds_request = bytearray([0x22, (identifier >> 8) & 0xFF, identifier & 0xFF, 0x00, 0x00, 0x00, 0x00])

That produced the following CAN bus message which my ECU also ignored:

can1 620 [8] 07 22 F1 90 00 00 00 00

So I know that I should be telling isotp to always pad my messages so that they're eight bytes long, and I know that there's got to be an option for it, but I'm just not finding it. What am I overlooking here?

When you do some padding up to the 8 byte length, the padding is usually done with fill bytes 0xAA or 0x55 to prevent stuff-bits on the CAN bus bitstream layer.

isotpsend has an option -p AA:55 or -p 55:AA for that (don't know the direction right now - please check on your own).

jmccorm commented 1 year ago

Pier,

With a bunch of work, I managed to make an even better Python version of my 'rid' script without the udsoncan package. I've included it below. But, wow, given all the help you're providing and all that good information, I think I'm going to go back and reprogram it with the udsoncan package. Especially because I want to take advantage of all the other services that are available.

Thank you so much for your help... especially the tx_data_min_length and tx_padding tip that got my over my initial hurdles. Despite telling ChatGPT what version of the modules I'm writing for, it sometimes insists on trying to use ll_data_length, tx_dl, rx_dl, and rx_padding. I don't know if it was hallucinating those or not, but I now actively tell it not to try that. Anyhow, here's version 1 of my code without udsoncan. Next version coming will be with it!

#!/usr/bin/python3

# READ DATA BY IDENTIFIER SCRIPT

# DESIGNED FOR THE JEEP WRANGLER JL ON A RASPBERRY PI
# WITH SOCKETCAN.

# Written by Josh McCormick on March 16th, 2023 with
# help from a semi-hallucinating ChatGPT version 4.0
# And thanks to Pier-Yves Lessard for his help, too!

import can
import isotp
import time
import argparse

def send_request_data_by_identifier(can_channel, rxid, txid, identifier,timeout):
    # Set up the ISOTP connection with padding configuration
    can_filters = [{"can_id": rxid, "can_mask": 0x7FF}]
    bus = can.interface.Bus(can_channel, bustype='socketcan', can_filters=can_filters)
    address = isotp.Address(isotp.AddressingMode.Normal_11bits, txid=txid, rxid=rxid)

    # Set up the ISOTP connection parameters
    params = {'tx_data_min_length': 8, 'tx_padding': 0x00}
    stack = isotp.CanStack(bus=bus, address=address, params=params)

    # Send the UDS Request Data by Identifier message with padding
    uds_request = bytearray([0x22, (identifier >> 8) & 0xFF, identifier & 0xFF])
    stack.send(uds_request)

    # Wait for a response
    start_time = time.time()
    response = None
    while time.time() - start_time < timeout:
        stack.process()  # Process any received messages
        response = stack.recv()
        if response is not None:
            break
    bus.shutdown()
    return response

MODULE_INFO = {
    "bcm": ("620", "504", "can1"),   # BODY CONTROL MODULE
    "rf": ("740", "4C0", "can1"),    # RF HUB (UNCONFIRMED)
    "ipcm": ("742", "4C2", "can1"),  # INSTRUMENT PANEL CLUSTER MODULE
    "evic": ("742", "4C2", "can1"),  # EVIC is an alias for IPCM
    "airbag": ("744", "4C4", "can1"),   # OCCUPANT RESTRAINT / AIRBAG MODULE
    "shift": ("749", "4C9", "can1"), # ELECTRONIC SHIFTER MODULE
    "train": ("74B", "4CB", "can1"), # DRIVE TRAIN CONTROL MODULE (UNCONFIRMED)
    "sccm": ("75A", "4DA", "can1"), # STEERING CONTROL MODULE (UNCONFIRMED)
    "unknown1": ("763", "4E3", "can1"),  # UNKNOWN
    "hvac": ("783", "503", "can0"),  # HEATING/VENT/AIR-CONDITIONING MODULE
    # "unknown2": ("792", "512", "can1"),  # UNKNOWN
    "stack": ("7BC", "53C", "can0"),  # INTEGRATED CENTERE STACK MODULE
    # "unknown3": ("7BE", "53E", "can1"),  # UNKNOWN
    "radio": ("7BF", "53F", "can0"), # UCONNECT RADIO MODULE
    "pcm": ("7e0", "7e8", "can1"),   # POWERTRAIN CONTROL MODULE
    "tcm": ("7e1", "7e9", "can1"),   # TRANSMISSION CONTROL MODULE
}

def parse_identifier(identifier):
    if identifier.startswith("0x") or identifier.startswith("0X"):
        base = 16
    elif identifier.startswith("0o") or identifier.startswith("0O"):
        base = 8
    elif identifier.startswith("0b") or identifier.startswith("0B"):
        base = 2
    else:
        base = 16
    try:
        return int(identifier, base)
    except ValueError:
        raise argparse.ArgumentTypeError(f"Invalid identifier value: {identifier}")

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("module", type=str, help="module name, or actual location as txid:rxid:bus", nargs='?')
    parser.add_argument("identifier", type=parse_identifier, help="identifier in hexadecimal (e.g., 0xf190)", nargs='?')
    parser.add_argument("-a", "--ascii", action="store_true", help="prints the payload as an ASCII string")
    parser.add_argument("-t", "--timeout", type=float, default=8, help="response timeout in seconds (default: 8)")
    parser.add_argument("-m", "--modules", action="store_true", help="list available modules and exit")
    args = parser.parse_args()

    if args.modules:
        print("Pre-defined modules:", " ".join(MODULE_INFO.keys()))
        exit(0)

    if not args.module or not args.identifier:
        parser.print_help()
        exit(10)

    module = args.module.lower()
    identifier = args.identifier

    if module in {k.lower(): v for k, v in MODULE_INFO.items()}:
        txid, rxid, can_channel = MODULE_INFO[module]
        txid = int(txid, 16)
        rxid = int(rxid, 16)
    elif ':' in module:
      parts = module.split(':')
      if len(parts) == 3:
          try:
              txid = int(parts[0], 16)
              rxid = int(parts[1], 16)
              can_channel = parts[2]
          except ValueError:
              print("Invalid module format. Please provide a valid module name or format like '0x620:0x504:can1' or '620:504:can1'.")
              print("Pre-defined modules:", " ".join(MODULE_INFO.keys()))
              exit(11)
      else:
          print("Invalid module format. Please provide a valid module name or format like '0x620:0x504:can1' or '620:504:can1'.")
          print("Pre-defined modules:", " ".join(MODULE_INFO.keys()))
          exit(11)
    else:
      print("No such module found. Please provide a valid module name or format like '0x620:0x504:can1' or '620:504:can1'.")
      print("Pre-defined modules:", " ".join(MODULE_INFO.keys()))
      exit(11)

    response = send_request_data_by_identifier(can_channel, rxid, txid, identifier,args.timeout)

    if response is not None:
        reply = response[0:3]
        status = reply[0:1]
        id = reply[1:3]

        # DEBUG LINES TO PRINT THE UDS RESPONSE CODE AND ASSOCIATED DATA ID
        # print("Status: ", ' '.join(f'{byte:02X}' for byte in status))
        # print("DataID: ", ' '.join(f'{byte:02X}' for byte in id))

        # NEGATIVE RESPONSE - REQUESTED DATAID DOES NOT EXIT
        if status == bytearray([0x7F]):
            print("NEGATIVE RESPONSE")
            exit(1)

        # ANY OTHER RESPONSE IS UNKNOWN OR UNEXPECTED, SO WE PRINT IT
        if status != bytearray([0x62]):
            print('UNKNOWN RESPONSE CODE:', ' '.join(f'{byte:02X}' for byte in status))
            exit(2)

        # Check if id matches the supplied identifier
        if (identifier >> 8) == id[0] and (identifier & 0xFF) == id[1]:
            payload = response[3:]

            if args.ascii:
                print(payload.decode('ascii', errors='replace'))
            else:
                print(' '.join(f'{byte:02X}' for byte in payload))
        else:
            print("ERROR: A different Data ID was received.")
            exit(3)

    else:
        print("No response received within",args.timeout,"seconds.")
        exit(9)

if __name__ == "__main__":
    main()
pylessard commented 1 year ago

That's nice :) I do find the AI part interesting (like everybody), but I'm not yet sold to the idea of having it write code. Nothing beats knowledge and riguors to achieve quality

martinjthompson commented 1 year ago

Do try it out - I oscillate between amazed and very disappointed, depending on the task I give it :)

On Fri, 17 Mar 2023 at 13:22, Pier-Yves Lessard @.***> wrote:

That's nice :) I do find the AI part interesting (like everybody), but I'm not yet sold to the idea of having it write code. Nothing beats knowledge and riguors to achieve quality

— Reply to this email directly, view it on GitHub https://github.com/pylessard/python-udsoncan/issues/152#issuecomment-1473836234, or unsubscribe https://github.com/notifications/unsubscribe-auth/AADIII7CDEXO4HS4VNG4QODW4RQPTANCNFSM6AAAAAAV4GUXVE . You are receiving this because you are subscribed to this thread.Message ID: @.***>

-- @.*** https://parallelpoints.com/ http://parallelpoints.com/

jmccorm commented 1 year ago

I think I have it. I'll be working on this some more tomorrow. Yes, I'm only doing a single call now, but I see the value in calling multiple DIDs at the same time, and it's something that I'll want to demonstrate to my community so that they can take advantage of such a powerful feature!

#!/usr/bin/python3
import argparse
import can
import udsoncan
from udsoncan.connections import IsoTPSocketConnection
from udsoncan.client import Client
from udsoncan import DidCodec

class PassThroughCodec(DidCodec):
    def encode(self, data):
        return data

    def decode(self, data):
        return data

    def __len__(self):
        return 0  # Default length; udsoncan will handle variable-length data

BCM_TXID = 0x620
BCM_RXID = 0x504
BCM_CHANNEL = 'can1'

def send_read_data_by_identifier(client, did):
    try:
        response = client.read_data_by_identifier(did)
        return response
    except udsoncan.exceptions.NegativeResponseException as e:
        print("Failed to read data: %s" % e)
        return None
    except Exception as e:
        print("Error occurred: %s" % e)
        return None

parser = argparse.ArgumentParser(description='Read data from an ECU using a CAN
bus and the ISO-TP protocol.')
parser.add_argument('did', type=str, help='the data identifier in hexadecimal')

args = parser.parse_args()

# Remove the '0x' prefix if it exists
if args.did.startswith('0x') or args.did.startswith('0X'):
    args.did = args.did[2:]

# Convert hexadecimal strings to bytes
did = int(args.did, 16)

rxid = BCM_RXID
txid = BCM_TXID
can_channel = BCM_CHANNEL

config = dict(udsoncan.configs.default_client_config)
config['data_identifiers'] = {i: PassThroughCodec() for i in range(0x100, 0x600)
}

conn = IsoTPSocketConnection(can_channel, rxid=rxid, txid=txid)
with Client(conn, request_timeout=5, config=config) as client:
    read_response = send_read_data_by_identifier(client, did)

    if read_response is None:
        print("Failed to read data.")
    else:
        # Print the result as a hexadecimal string
        print('Data: ' + read_response.hex())
pylessard commented 1 year ago

Carefull with the codec len of 0. That's not supported

pylessard commented 1 year ago

Is it still relevant to keep this issue open?

jmccorm commented 1 year ago

Looks like I got tied up. Let's put a pin in it for now. I'm sure I'll be revisiting this topic soon. It will be interesting and useful to query multiple DIDs at once, so I'm sure I'm going to be pulled back this way again soon! Thank you for such useful CAN modules!