kuba2k2 / datastruct

Combination of struct and dataclasses for easy parsing of binary formats
MIT License
6 stars 1 forks source link

How can I use substructures? #2

Open disazoz opened 1 month ago

disazoz commented 1 month ago

How can I use WString as substructure for CommandReadVariable? And is it possible to get length of WString inside CommandReadVariable?

def wstring(length: int, *, default: str = ...):
    return adapter(
        encode=lambda value, ctx: value.encode("utf-16le"),
        decode=lambda value, ctx: value.decode("utf-16le"),
    )(field(length, default=default))

@dataclass
class WString(DataStruct):
    variable_length: int = built("H", lambda ctx: len(ctx.variable_name))
    variable_name: str = wstring(lambda ctx: ctx.variable_length * 2)

@dataclass
class CommandReadVariable(DataStruct):
  tag_id: int = field('H', default= 1)
  message_length: int = built("H", lambda ctx: len(ctx.variable_name) * 2 + 3)
  message_type: MessageType = const(MessageType.CommandReadVariable)(field("B"))
  variable_name: WString
kuba2k2 commented 1 month ago

I assume you want the message_length to represent the length of variable_name without calculating it manually? You can either use len() with the raw string - using len() for structures is not implemented yet (but it could be). To access the raw string use len(ctx.variable_name.variable_name) since that's the inner property name. Another option is to use .sizeof(), like:

message_length: int = built("H", lambda ctx: ctx.variable_name.sizeof())

It will count the entire subfield's length in bytes. Since it has to "virtually" pack the subfield, it could have slightly worse performance, but it shouldn't be noticeable at all.

To use WString as a "substructure" use:

variable_name: WString = subfield()

I guess that MessageType is an enum of some kind and that you'll want to have another subfield depending on the message_type field? You shouldn't need to make separate models for the entire message (like CommandReadVariable), you can use the switch() field for that.

disazoz commented 1 month ago

Thanks a lot. That works.

About MessageType enum, yes. I'm trying to implement this protocol https://c3.ulsu.tech/protocol/latest and messages shape quite different depending on type. But I don't want to combine them in one model, to keep models simple to read. But any way, thanks for suggestion.

kuba2k2 commented 1 month ago

You can (must) also keep models separate when using switch(). It still makes it cleaner and easier to maintain, because you don't have to repeat tag_id, message_length and message_type. If you create a generic Message model with the switch() field in it, you can put the header fields in there. Then, the switch() field will deserialize the rest of the message as one of the models, based on the message_type. So in your case you would simply set the type for CommandReadVariable to WString.

Something like:

@dataclass
class Message(DataStruct):
  tag_id: int = field('H', default= 1)
  message_length: int = built("H", lambda ctx: ctx.body.sizeof())
  message_type: MessageType = field("B")
  body: Union[WString, MessageTypeA, MessageTypeB] = switch(lambda ctx: ctx.message_type)(
    CommandReadVariable=(WString, subfield()),
    CommandSomeType=(MessageTypeA, subfield()),
    CommandSomeOtherType=(MessageTypeB, subfield()),
  )

"Documentation" here: https://github.com/kuba2k2/datastruct?tab=readme-ov-file#switch-fields

robots commented 2 weeks ago

Hello, I am trying to implement very similar packet structure (although different project). I have hit problem:

 @dataclass
 class Packet(DataStruct):
     src: bytes = field("8s")
     typ: PacketType = field("B")
     dst: bytes = field("8s")
     length: int = built("B", lambda ctx: len(ctx.body))
     body: Union[PacketMajak, PacketMajakRsp] = switch(lambda ctx: ctx.typ)(
             MAJAK = (PacketMajak, subfield()),
             MAJAK_RSP = (PacketMajakRsp, subfield()),
             )

The PacketType is enumerator, PacketMajak and PacketMajakRsp are 2 structures representing body of the packets. With the "body: Union..." in place i get this error:

  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/main.py", line 455, in unpack
    raise e
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/main.py", line 451, in unpack
    return cls(**values)
           ^^^^^^^^^^^^^
  File "<string>", line 8, in __init__
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/main.py", line 48, in __post_init__
    value = field_decode(value, field_type)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/utils/fields.py", line 24, in field_decode
    if issubclass(cls, Enum):
       ^^^^^^^^^^^^^^^^^^^^^
TypeError: issubclass() arg 1 must be a class; while unpacking 'Packet()'

Cls is typing.Union, and issubclass will throw exception.

When i remove the Union, it will correctly parse the selected packet. (either one, not both at the same time)

I have found projects where you use this module. And it inspired my to try the following construct: https://github.com/tuya-cloudcutter/bk7231tools/blob/main/bk7231tools/analysis/kvstorage.py#L182

If i rewrite the body to be list of 1 element:

@dataclass
class Packet(DataStruct):
    src: bytes = field("8s")
    typ: PacketType = field("B")
    dst: bytes = field("8s")
    length: int = built("B", lambda ctx: len(ctx.body))
    body: List[Union[PacketMajak, PacketMajakRsp]] = repeat(count=1)(
            switch(lambda ctx: ctx.typ)(
                MAJAK = (PacketMajak, subfield()),
                MAJAK_RSP = (PacketMajakRsp, subfield()),
            )
        )

It works correctly. Am i missing something obvious? I am using Python 3.12.4.

kuba2k2 commented 2 weeks ago

Hi, It seems that the switch() field doesn't work correctly with Union[] types. I have noticed that too, while trying to use Python's union operator | with several types. I have a pending fix for that issue - I've published it on a separate feature/union-types branch. I have successfully used it like this:

@dataclass
@datastruct(endianness=NETWORK, padding_pattern=b"\x00")
class TlsHandshake(DataStruct):
    class Type(IntEnum):
        CLIENT_HELLO = 1
        SERVER_HELLO = 2
        CERTIFICATE = 11

    type: Type = field("B")
    _1: ... = padding(1)
    length: int = field("H")
    data: bytes | TlsHandshakeHello | TlsHandshakeCertificate = switch(
        lambda ctx: ctx.type
    )(
        CLIENT_HELLO=(TlsHandshakeHello, subfield()),
        SERVER_HELLO=(TlsHandshakeHello, subfield()),
        CERTIFICATE=(TlsHandshakeCertificate, subfield()),
        default=(bytes, field(lambda ctx: ctx.length)),
    )

On the same branch I've also published some other improvements:

Let me know if this resolves your issue :slightly_smiling_face:

P.S.: instead of field("8s") you can (and should) simply use field(8) P.S. #2: len(ctx.body) won't work, because it's not a string. If you need to know the packed byte length of the body, use ctx.body.sizeof(). Be aware that this will pack the structure (virtually) in order to find out the size. If the structure is very large, it could have a performance penalty. Usually it's not a problem, though.

robots commented 2 weeks ago

Thanks for the prompt response. The problem was fixed, and moved to another line in __post_init__.

Traceback (most recent call last):
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/packet.py", line 87, in <module>
    unpak = Packet.unpack(pktMajakRsp)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/main.py", line 462, in unpack
    raise e
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/main.py", line 458, in unpack
    return cls(**values)
           ^^^^^^^^^^^^^
  File "<string>", line 8, in __init__
  File "/home/robot/devel-galatech/cukrovar_rf_fw/tool/pytool2/datastruct/main.py", line 51, in __post_init__
    if not isinstance(value, field_type):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.12/typing.py", line 510, in __instancecheck__
    raise TypeError(f"{self} cannot be used with isinstance()")
TypeError: typing.Union cannot be used with isinstance(); while unpacking 'Packet()'

print(value, field_type) is "PacketMajakRsp(loadState=2, latitude=825307441, longitude=842150450) typing.Union" So for union there needs to be other check to check type is part of union.

robots commented 2 weeks ago

This might be hint on how to fix it. https://stackoverflow.com/questions/45957615/how-to-check-a-variable-against-union-type-during-runtime

kuba2k2 commented 2 weeks ago

For now, I think you can use Any instead of the union type. It should not attempt type validation if it's Any.

robots commented 2 weeks ago

That works :) thanks! I will keep an eye open for any updates!