defnull / multipart

Multipart parser for Python 3
Other
119 stars 33 forks source link

Remove IO. #17

Open theelous3 opened 5 years ago

theelous3 commented 5 years ago

So I'm thinking through how to implement a version of this without any io.

Some initial thoughts:

A multipart parser is the wrong layer at which to try to control DoS attacks, generally.

In addition to the above archetectural error, I'm pretty sure the complete request body is required anyway, and the system would already have been DoSed before it reached the parser (incomplete streams error without requesting more data). Example:

from io import BytesIO
from multipart import MultipartParser

m = '--8banana133744910kmmr13a56!102!2405\r\nContent-Disposition: form-data; name="file_1"; filename="test_file1.txt"; Content-Type: application/octet-stream\r\n\r\nCompoo'

boundary = '8banana133744910kmmr13a56!102!2405'.encode()

parser = MultipartParser(BytesIO(m.encode()), boundary)

parser.parts()[0]

raised multipart.MultipartError: Unexpected end of multipart stream.

It should request more data from the stream instead of dying? If the whole thing is required to be in the stream at time of iteration, then we're already memory DoSed and making things worse if the stream's read isn't destructive (String/BytesIO, for example.)

Even if that's not the case, and I missed some trickery for incomplete streams, I don't like that the file system is secretly being used for larger requests and blocking on io without any indication.

We should have a system in place so that we can request events from the parser. There is a decision to be made: when a part needs more data, should we pass that data to the parser or to the part? At some point we always need to give data to the parser to begin the process, as by default returning an empty part that will never have data makes no sense. On the other hand, it seems a little odd to send data to the parser even though it's a part that's requesting more data.

An example of the flow when we send data always to the parser:


>>> parser = MultipartParser(boundary=boundary)
>>> maybe_part = parser.next_event()
>>> maybe_part
multipart.NEED_DATA # complaining we don't have any data whatsoever
>>> parser.recv_data(socket.read(100)) # headers are 101 bytes long. We probably don't want to hand out parts without headers, as they are the first point at which decisions can be made about the situation that doesn't essentially mandate just blindly reading from the socket.
>>> maybe_part = parser.next_event()
>>> maybe_part
multipart.NEED_DATA # complaining we still need to complete the headers for the first part, which requires blindly reading data
>>> parser.recv_data(socket.read(100)) # now we have enough data for the headers and some arbitrary amount of the content (it could also be the case we have the entire part and the beginning of the next part)
>>> maybe_part = parser.next_event()
>>> maybe_part
<multipart.MultipartPart object at 0xb70eba6c>  # we have a part. The part has headers, but an incomplete body.
>>> part = maybe_part # we have a part, alias for clarity
>>> maybe_part.headers # display headers for fun
Headers([('Content-Disposition', 'form-data; name="file_1"; filename="test_file1.txt"; Content-Type: application/octet-stream')])
>>> maybe_data = part.next_event()
>>> maybe_data
"blah blah bl"
>>> maybe_data = part.next_event()
multipart.NEED_DATA

# this is where we need to make a decision about where to send the new data. To the parser or to the part? Probably the parser.

>>> parser.recv_data(socket.read(100))
>>> maybe_data = part.next_event()
>>> maybe_data
"ah"
>>> maybe_data = part.next_event()
>>> maybe_data
multipart.PART_COMPLETE

# here we can repeat the process until we get all of the parts. In our example we only have one part.
>>> parser.next_event()
multipart.COMPLETE

# finished parsing.

The above is pretty easy to wrap in some small funcs. I've ripped this style from h2 and h11:

Example over a socket:

def parser_next_part(parser, sock):
    while True:
        event = parser.next_event()
        if event is multipart.NEED_DATA:
            parser.recv_data(sock.read(100))
            continue
        return event

def part_next_chunk(parser, part, sock):
    while True:
        event = part.next_event()
        if event is multipart.NEED_DATA:
            parser.recv_data(sock.read(100))
            continue
        return event

So you can just make requests for the next whatever from those funcs and it's all gucci.

Thoughts?

defnull commented 5 years ago

I'd imagine the following approach for a low-level non-blocking API:

An async consumer would look like this:

async def handle(request):
    with AIOMultipartParser(request.content_type, "utf8") as parser:
        current = None
        async for chunk in request.iterbody:
            for event in parser.parse(chunk):
                if isinstance(event, PartInfo):
                    current = open(...)
                elif event:
                    await current.write(event)
                else:
                    await current.close()
    # All done. Send response

Thoughts:

theelous3 commented 5 years ago

How do you propose dealing with a chunk of incomplete headers? We cannot presume that the headers will be complete and a PartInfo will be constructed reliably.

This is why I like the generic "gimmie data" approach.

defnull commented 5 years ago

PartInfo is only returned after all headers of a part were parsed and the \r\n\r\n was consumed. So, as long as the header part is not complete, an empty iterator is returned. If you did not feed enough data in, you won't get any events back. (zero or more events per call)

defnull commented 5 years ago

The iterator-of-events approach (in contrast to the poll-individual-events approach) does not need an "I need more data" signal, because it always accepts more data. If you feed in a small chunk, it can return an empty iterator or an incomplete part. If you feed in a large chunk, it can return multiple complete and perhaps another half completed part in one go. It only buffers headers, because returning individual headers is inconvenient for the consumer. Data chunks are immediately returned.

theelous3 commented 5 years ago

I've made an attempt to write a complete consumer for each suggestion, where our goal is to build a list of parts.

The examples begin with a little abstract version of multipart.py, followed by consumer code.

I began with your idea:

from typing import Union

from abc import abstractmethod

class AIOMultipartParser:

    @abstractmethod
    def parse(byte_chunk) -> Union[PartInfo, bytearray, None]: # our type annotation is a bit disparate
        ...

    @abstractmethod
    def finish():
        ...

    @abstractmethod
    @property
    def finished():
        ...

class PartInfo:
    ...

# thus ends multipart.py

from multipart import AIOMultipartParser

def main(socket, boundary):
    # where socket is some socket waiting to recv the request's body

    boundary = get_boundary_from_headers(r.headers)

    parser = AIOMultipartParser(boundary)

    parts = []

    # This is a list. The head will be a PartInfo followed by n bytearrays.
    current_part = []

    # run until finished
    while True:
        byte_chunk = socket.recv(1000)

        # I don't like the parser.parse being an iterable
        # It means we may get 0 or more events per byte_chunk instead of always just getting
        # the next actionable event. This makes things a bit non-liner in terms of thinking
        # about the inner and outer loop. Sometimes I get one part per run of the while loop,
        # sometimes no parts, sometimes n parts.

        # This try/except is another downside of having parser.parse be an interable instead of
        # just getting the single next event every time. When your iterator can throw an error
        # you want to handle, you've to wrap everything indented in it's for loop.
        try:
            for event in parser.parse(byte_chunk):

                # event is None in the case that There's nothing ready, for example
                # 1. We have no data at all
                # 2. We have some data but not enough to build a PartInfo
                # 3. We have completed the previous part.
                # I find this highly confusing. We need to build sublogic in to
                # the None case to handle these differences.

                if event is None:
                    # See if we have a current part to work with at all, or if we need
                    # more data to get the current part.
                    if current_part:
                        # Case of the previous part being complete
                        if isinstance(current_part[-1], bytearray):

                            # hidden away here in a two loops and some ifs, is where our parts complete.
                            # Me no likely.
                            parts.append(current_part)
                            current_part = []
                        else:
                            # We shouldn't ever get here.
                            # It would mean we've had two PartInfos in a row with no data, or
                            # the parser is broken etc.
                            raise Exception("lol")
                    else:
                        # This is the case where we're still building the PartInfo but don't yet
                        # have enough data. This logic is pretty implicit and unclear.
                        # we have to break to get tot he next iteration of the outer while loop :/
                        break

                # event is a PartInfo in the case that we've recvd at *least* enough data
                # so far to build a set of headers.
                if isinstance(event, PartInfo)
                    # We need to again handle the case that we already have a PartInfo,
                    # though it should never happen.
                    if current_part:
                        raise Exception("lol")
                    else:
                        current_part.append(event)

                # event is a bytearray in cases where we have a PartInfo already, and 0 or more bytearrays.
                if isinstance(event, bytearray):
                    # again we need to handle cases of badly constructed current parts
                    if not current_part:
                        raise Exception
                    if not isinstance(current_part[0], PartInfo):
                        riase Exception
                    # the happy path
                    else:
                        current_part.append(event)

        # handle any exceptions
        except Exception as e:
            some_exception_handler(e)

        # we need to check at the end of every loop to see if the parsing is complete
        # or we will hit an error on reading from a dead socket.
        if parser.finished:
            break

# In the interest of presenting something closer to
# actual code, here is the above main func without comments.
# It looks like a lot cleaner of course, but the logic is just as confusing,
# maybe now even moreso.
# Without keeping track of types and knowing the mechanisims by which the
# parser works (Union[PartInfo, bytearray, None] and what they stand for) the
# decisions look almost random.

def main(socket, boundary):
    parser = AIOMultipartParser(boundary)

    parts = []
    current_part = []

    while True:
        byte_chunk = socket.recv(1000)

        for event in parser.parse(byte_chunk):
            if event is None:
                if current_part:
                    if isinstance(current_part[-1], bytearray):
                        parts.append(current_part)
                        current_part = []
                    else:
                        raise Exception("lol")
                else:
                    continue

            if isinstance(event, PartInfo)
                if current_part:
                    raise Exception("lol")
                else:
                    current_part.append(event)

            if isinstance(event, bytearray):
                if not current_part:
                    raise Exception
                if not isinstance(current_part[0], PartInfo):
                    riase Exception
                else:
                    current_part.append(event)

        if parser.finished:
            break

I very much do not like the logic involved here, and the end result is a list of lists of [PartInfo] + [bytearray] * ns, which I really do not like.

All of the business logic we would need for the actual handling of data would be further intersperses between the logic of consumption, compounding on our already complex boilerplate.

The single event based system:

from typing import Union
from enum import Enum, auto
from abc import abstractmethod

class Events(Enum):
    NEED_DATA = auto()
    COMPLETE = auto()

class MultipartParser:

    @abstractmethod
    def recv(byte_chunk) -> None:
        ...

    @abstractmethod
    def next_event() -> Union[MultipartPart, Events.NEED_DATA, Events.COMPLETE]:
        ...

class PartData:
    """data class type of guy"""
    ...

class MultipartPart:

    # note that we get more flexibility about how to handle the data
    # because it's all part of the MultipartPart instead of being separate
    # and distinct bytearrays or info objects.

    @abstractmethod
    def next_event() -> Union[PartData, Events.NEED_DATA, Events.COMPLETE]:
        """spit out the next bit of data, or mark complete"""
        ...

    @abstractmethod()
    def store_next_event() -> Union[Events.NEED_DATA, Events.COMPLETE]:
        """request more data and store on the part, or mark complete"""
        ...

# thus ends multipart.py

from multipart import MultipartParser, MultipartPart, Events

# could also have something like this for working with the raw data directly
# instead of storing it on the MultipartPart

# def part_next_chunk(parser, part, socket):
#     while True:
#         event = part.next_event()
#         if event is multipart.NEED_DATA:
#             parser.recv_data(socket.read(100))
#             continue
#         return event

def parser_next_part(parser, socket) -> Union[MultipartPart, Events.COMPLETE]:
    while True:
        event = parser.next_event()
        if event is multipart.NEED_DATA:
            parser.recv_data(socket.read(100))
            continue
        return event

def complete_part(parser, part, socket) -> None:
    while True:
        event = part.store_next_event()
        if event is multipart.NEED_DATA:

            # I believe this is the only weirdness here, where we
            # pass all new data to the parser no matter what.
            # Still, nothing crazy, having a single entry point for all bytes.
            parser.recv_data(socket.read(100))
            continue
        break

def main(socket, boundary):
    # where socket is some socket waiting to recv the request's body

    boundary = get_boundary_from_headers(r.headers)

    parser = MultipartParser(boundary)

    parts = []

    while True:
        event = parser_next_part(parser, socket)

        if isinstance(event, MultipartPart):
            # alias for clarity
            part = event
            complete_part(parser, part, socket)
            parts.append(part)
        else:
            break

I find this much easier to deal with. Our business logic section has absolutely minimal boiler plate, leaving us free to work on the rest of our application. The parser should not dominate the code. We just get an event, do something with it.

Furthermore, this approach is already familiar to users of h11, h2, wsproto, (similar to) ircproto and fcgiproto. Pretty much: https://sans-io.readthedocs.io/ these impls.

defnull commented 5 years ago

Much of the error checking in your main() example for my approach can be removed if we actually trust the API to do what is documented. The parser works as a state-machine. For each part, it will yield exactly one PartInfo, zero or more non-empty byte strings and exactly one None end marker, in that order. For each chunk, it will yield as many events it can, which may be none if we cannot emit an event from a given chunk.

This allows us to skip most checks and if-statements in your version of my approach. For example, event is only None after we fully parsed a part, so current is guaranteed to be present. No need to check that. More importantly, event is not None if we need more data. In that case, the iterator itself is empty and the entire inner for-loop is skipped. No need to handle that case either. Then, there is no need to call parser.finish(), as it is called by __exit__() at the end of the with block. Parsing errors can only happen during parse() and at the end in __exit__(). Both are handled by the same expect block.

For examples, let us assume we want to store all data into files named after some header value. In an actual application, text fields would instead be buffered and encoded as strings and handled some other way, but that would quickly get too complicated for examples. let's also just ignore any obvious security issues with these examples for now.

Here is my version for my approach:

def main(socket, boundary):
    uploads = []
    current = None
    try:
        with AIOMultipartParser(boundary) as parser:
            for chunk in iter(partial(socker.recv, 1024), b''):
                for event in parser.parse(chunk):
                    if isinstance(event, PartInfo):
                        current = open(event.filename)
                    elif event: # must be non-empty bytes
                        current.write(event)
                    else: # must be None
                        current.close()
                        uploads.append(current)
    except MultipartError:
        if current:
            current.close()
        if uploads:
            remove_files(uploads)

As you can see, most of the complexity goes away if we trust the parser. Please correct me if I'm missing an edge case here.

Also note that we need to support three flavors of IO with this new parser:

The last case is actually the most important one. The first is already implemented, and the second could be implemented by throwing some await and async keywords around and otherwise leaving the parser as it is.

With my approach, the dataReceived callback would simply contain the inner for-loop of my last example, or call parser.finish() on EOF. A chunk of data comes in, and we simply handle all events that this chunk emits. If there are no events, we do nothing.

I actually tried to implement such an API with your approach and failed. My problems were:

Perhaps I am missing something here. For the case that we cannot call a blocking recv() but instead get bytes from the outside (or in a loop like my example), I really have no idea how to do that with your approach.

theelous3 commented 5 years ago

First of all, thanks for the clarification of the use of None in your approach, I clearly misunderstood.

Secondly, apologies for the incoming wall of text and rambling :)

On multiple parsers / many apis to one parser

I'm confused as to why we would need multiple types of parser for blocking / async etc.

If we have no io happening internally, it by default supports every means of concurrency. Taking your latest example, the __exit__ is only dealing with raising and throwing exceptions during parsing, the .parse is only dealing with computing bytes / spitting out parts or bytearrays, and the rest of the code is outside of the scope of the parser's internals - the user can do whatever they want here.

I don't see a need for anything other than a single rewrite of the current MultipartParser class in this respect.

In your example, we just replace iter(partial(socker.recv, 1024), b'') with some async sock stuff and we're good for async. Maybe there's some complexity regarding callbacks and leaving scope of the context manager with twisted and so on, but I think it's just complexity rather than requiring different parsers or many apis to one parser.

Am I missing something here?

On your example.

Off the bat, I certainly like it more now that I understand it a bit better, but I still think we can improve.

I think the most common use for forms isn't file sending by a long shot. Usually you'll be dealing with...forms! :D (Please see https://github.com/defnull/multipart/pull/19)

I bring this up, because just appending stuff to a file in your example is clean looking, but as a user I'd much prefer to be able to do something like MultipartPart.data to fetch my form, than have to ''.join(parts_list[0][1:]) or something to concat it all.

To expand on that; I strongly dislike the PartInfo + absolutely-indeterminate-size-list-of-bytearrays thing, per part. I think once you are dealing with a part, that part should encapsulate the entirety of the multipart block, from headers to body. Our apis are somewhat similar here, the difference being in mine we access a part's data through the part, and in yours we're accessing the part's data through the parser. I like the idea of keeping whole blocks associated with a part.

Regardless of what approach is ultimately taken, I think PartInfo should remain MultipartPart and come with methods for handling the body of the multipart part through the MultipartPart object, or have the list of bytearrays optionally appended to it, or something along those lines.

Regarding the iterator .parse method, some personal preference. I'd rather not be at the mercy of the iterator here. I find it cleaner to just request the next event, working one by one with events as I ask for them, not as they are given. There's a slightly different mode of thinking between those two ideas, and at least to me, one by one is simpler.

More importantly, event is not None if we need more data. In that case, the iterator itself is empty and the entire inner for-loop is skipped.

This is fine and makes sense, but is definitely not the most welcoming idiom. Instead of always being guaranteed an event when we ask for one, we have to think about the case of the empty iterator, and like, mentally run a continue statement to the outer loop. Woah dude.

This is why I like the simple conversation you have with the parser in the other approach. If it needs data, it says so, if it's finished, it says so, if it has an object, it says so. The code is extremely linear and easy to follow (except of course when we async it :D)

On your thoughts about my implementation.

I think there is a miscommunication happening here:

I need to feed bytes into the parser at two places: In the outer loop when waiting for the next part, and in an inner loop while copying data to a target file. I cannot do that in a non-blocking way, so I have to break out of and jump back into multiple levels of loops, which is really hard to do in python.

I'll write a close enough replica of what you did in your latest example with my implementation, in async with some curio sprinkled just for convenience and completeness' sake:

import curio
from multipart import MultipartParser, MultipartPart, Events

async def get_next_event(parser, socket, part=None):
    if part is not None:
        emitter = part
    else:
        emitter = parser
    while True:
        event = emitter.next_event()
        if event is multipart.NEED_DATA:
            parser.recv_data((await socket.read(100)))
            continue
        return event

async def save_part_to_file(parser, part, socket):
    async with curio.aopen(part.filename, "a+") as f:
        while True:
            event = await get_next_event(parser, socket, part)
            if isinstance(event, PartData):
                await file.write(event.raw)
            else:
                return file

async def main(socket, boundary):
    parser = MultipartParser(boundary)
    uploads = []
    while True:
        event = await get_next_event(parser, socket)
        if isinstance(event, MultipartPart):
            part = event
            uploads.append((await save_part_to_file(parser, part, socket)))
        else:  # Only other event from the parser is that it is complete.
            break

So there we have our non-blocking from socket to file thingy. We don't need to jump in and out of loops or anything.

Instead of three events (headers, data, end-of-part) I now have six (waiting-for-headers, headers, waiting-for-data, data, end-of-part, end-of-multipart) with the added complexity that the "data" events can be interrupted by any number of "waiting-for-data" events and that events come from two different sources.

I'm not sure what happened here. You don't need any more events than I've used above. (I never really explicitly used the Events.COMPLETE event, because it was the default event in the case it wasn't an part/part data or request for more data.)

To be clear, any waiting-for-x event is NEED_DATA and all NEED_DATA events tell you the parser needs more data, regardless of their source.

Any time we don't have enough data, we get a NEED_DATA. Any time we do have enough data, we get one of our actual objects (MultipartPart | PartData). When finished, a part emits a COMPLETE. When finished, the parser emits a COMPLETE. So we only need two "real" events, aside from our part and part data.

I doubt that a simple loop like my example above would even be possible without goto statements.

This style doesn't lend its self well to the iterator approach you've taken. I don't see this as a problem, as I don't like the sort of empty iterator continue outside loop thing. Also, requires deep nesting.

This is fun.

Another thought; not all content types are to be handled equally. Ideally we'd like to be able to factor our logic out to functions designed to deal with say, json, files, forms. Handling all of that in our loop would be a disaster. It's easy to factor these things out in my version:

import curio

from multipart import MultipartParser, MultipartPart, Events

async def get_next_event(parser, socket, part=None):
    if part is not None:
        emitter = part
    else:
        emitter = parser
    while True:
        event = emitter.next_event()
        if event is multipart.NEED_DATA:
            parser.recv_data((await socket.read(100)))
            continue
        return event

async def save_part_to_file(parser, part, socket):
    async with curio.aopen(part.filename, "a+") as f:
        while True:
            event = await get_next_event(parser, socket, part)
            if isinstance(event, PartData):
                await file.write(event.raw)
            else:
                return file

async def do_stuff_with_form_data(parser, part, socket):
    while True:
        event = await get_next_event(parser, socket, part)
        if isinstance(event, PartData):
            part.buffer_data(event.data)
        else:
            break

async def do_stuff_with_json(parser, part, socket):
    buffer = bytearray()
    while True:
        event = await get_next_event(parser, socket, part)
        if isinstance(event, PartData):
            buffer += event.data
        else:
            decode_and_send_to_third_party(buffer)

async def main(socket, boundary):
    parser = MultipartParser(boundary)
    uoloads = []
    while True:
        event = await get_next_event(parser, socket)
        if isinstance(event, MultipartPart):
            part = event
            if part.file:
                uploads.append((await save_part_to_file(parser, part, socket)))
            elif part.form_data:
                await do_stuff_with_form_data(parser, part, socket)
            elif part.json:
                await do_stuff_with_json(parser, part, socket)
        else:  # Only other event from the parser is that it is complete.
            break

In contrast, I find it difficult to do the same factoring out in the iterator version. Honestly I gave up. It's late! I'm sure you can probably work out a better way of doing this, as you've a better understanding of your version. Anyway:

async def socket_wrapper(socket):
    # neither of those async things exist, but I'm lazy
    yield from async_iter(async_partial(socket.recv, 1024), b'')

async def save_part_to_file(socket_gen, parser, part_info):
    async with curio.aopen(part_info.filename, 'a+') file:
        async for chunk in socket_gen:
            for event in parser.parse(chunk):
                if isinstance(event, bytearray):
                    await file.write(event)
                elif isinstance(event, PartInfo):
                    # god knows what happens here, I suppose we have to return the
                    # new part
                    return event

async def do_stuff_with_form_data(socket_gen, parser, part_info):
    ...

async def main(socket, boundary):
    socket_gen = socket_wrapper(socket)
    uploads = []
    try:
        with AIOMultipartParser(boundary) as parser:
            async for chunk in socket_gen:
                for event in parser.parse(chunk):
                    if isinstance(event, PartInfo):
                        if event.file:
                            new_part = await save_part_to_file(socket_gen, parser, part_info)
                            # I'm not even going to try and work out the logic here
                            # for dealing with a new part coming from somewhere else
                            # My brain is dead. It's late.
                        if event.form_data:
                            # more new part problems
                            new_part = await do_stuff_with_form_data(socket_gen, parser)

                    elif event: # must be non-empty bytes
                        current.write(event)
    except MultipartError:
       ...
defnull commented 5 years ago

I'm confused as to why we would need multiple types of parser for blocking / async etc.

We don't. We need one non-blocking parser, which can be used to implement both blocking and non-blocking APIs in various flavors. This is the confusion here I think: You try to define an API you like to use. I try to define an API that people can use to implement their own APIs, in any environment.

For example, you do not like the fact that PartInfo and the data chunks are siblings (both emitted by the parser) instead of parent/child (data chunks emitted by the part). This is a flavor. You can implement the latter easily with the first approach, by adding a very thin layer around it. It would also be possible the other way around, although a little bit more awkward I think. You can also wrap my approach easily to emit a single event at a time, like yours. These are flavors. I actually agree that my approach is not very convenient for end-users. There need to be additional higher-level public APIs optimized for the environment. Namely three: One for blocking io (as the current implementation), one for coroutines (async/await, but otherwise no big changes), and one for event-driven io (twisted).

And this is the most important point, actually: I find it unnecessary hard to implement your approach for twisted, or any other environment where you do not control IO yourself, but listening to events. All your examples have a blocking recv() buried deep into the callstack. Note that await recv() is still a 'blocking' API. It blocks the coroutines, not a thread, but the program flow is still sequential and you can call it in a deep call-stack without worrying to much. This is not possible in some environments, e.g. twisted.

Let's look at your get_next_events() helper-function. I think this is part of the parser, not the business logic. As part of the parser, it calls recv() in a (pseudo)blocking way (with or without await). So, consequently, that parser is not non-blocking (a call to get_next_events() may block (the coroutine)) and it is not decoupled from IO (the parser requires a data source it can ask for more data. It cannot be fed with data from the outside).

Again, it's trivial to modify the existing parser to support coroutine-based async frameworks. Just add async and await keywords to the existing parser. This should not be the goal. Although, if that is all you need, then why not. But this should be a different issue. "Parser for asyncio" instead of "Remove IO".

A real no-io non-blocking parser gets fed chunks of data from the outside. We do not have a recv() function that returns more data as we need it. Each chunk may emit zero or more events. A single chunk may be big enough to fit dozens of small text fields, or so small that it does not fit a single header line. You have to process all these events, or they pile up and yo run out of memory. This is why I find the iterator-approach more natural. You do not ask the parser for more events, the events are already there. (I just speak about an iterator to be more general. It may be a generator or just a list of events. The iterator itself never throws any exceptions. The events in it are already fully materialized the moment the parse function returns).

Please try to think about the following environment, very common to twisted:

class FormReader(protocol.Protocol):
    def dataReceived(self, data):
        # Hint: EOF is singaled by an empty chunk
        # Your code here

Some thoughts at the end: I am not sure if we actually need a parser that supports the pure, non-io approach required by twisted. Python has coroutines for a reason. They make everything so much easier nowadays. Last time I implemented a non-blocking multipart parser was in java, and there are no coroutines in Java. APIs are either blocking, or event-based. No fun at all :/

I think most python users would be happy with a multipart parser that simply supports async/await style IO, and otherwise works similar to the existing parser. It was already designed with usability in mind, but in a time where async/await were not available. It would be MUCH easier to implement, too.

theelous3 commented 5 years ago

So, consequently, that parser is not non-blocking (a call to get_next_events() may block (the coroutine)) and it is not decoupled from IO (the parser requires a data source it can ask for more data. It cannot be fed with data from the outside).

Ok, now I understand your everything-comes-from-the-parser idea. I was trying to build some convenience directly in to the parser. I still think it's a decent idea to build convenience in at some point, as requiring people to write random wrappers can lead to chaos and implementation errors.

All your examples have a blocking recv() buried deep into the callstack.

Let's look at your get_next_events() helper-function. I think this is part of the parser, not the business logic. As part of the parser, it calls recv() in a (pseudo)blocking way (with or without await). So, consequently, that parser is not non-blocking (a call to get_next_events() may block (the coroutine)) and it is not decoupled from IO (the parser requires a data source it can ask for more data. It cannot be fed with data from the outside).

Defo some crosstalk happening between us here. I make a distinction between the parser, the code someone needs to write to use the parser, and the business logic. In any protocol impl without batteries included, the user must write their own code for dealing with IO. It seems like you do not see my helper function as distinct from the parser, but I do. There are no bounds on what those helper functions look like, other than that at some point data gets to the parser, and at some other point the emitted events are dealt with. Specifically:

It cannot be fed with data from the outside

The parser is getting data from the outside :D At no point does the parser touch any io. The fact that we recv in that helper is incidental. We can write the same kind of thing without any io.

This is the confusion here I think: You try to define an API you like to use. I try to define an API that people can use to implement their own APIs, in any environment.

I think we're actually doing the same thing. I'll expand on this and my point above, by using my idea of the api to copy yours. Importantly, I'm not sure it's possible to do the reverse - at least not so simply.

from enum import Enum, auto

import curio

from multipart import MultipartParser, MultipartPart, # redefined events to add one, below

class Events(Enum):
    NEED_DATA = auto()
    PART_COMPLETE = auto()
    PARSER_FINISHED = auto()

def processor(gen):
    """just move the gen to the first yield"""

@processor
async def event_generator(boundary):
    parser = MultipartParser(boundary)
    data = b""
    while True:
        event = parser.next_event()
        if event is multipart.NEED_DATA or event is Events.PART_COMPLETE:
            data += (yield event) or b""
            parser.recv_data(data)
            continue
        elif event is Events.PARSER_FINISHED:
            raise StopAsyncIteration
        yield event
        data = b""

async def main(socket, boundary):
    g = event_generator(boundary)
    uoloads = []
    current = None
    try:
        async for chunk in aiter(partial(socket.recv, 1024), b''):
            for event in g.send(chunk):
                if isinstance(event, MultipartPart):
                    current = open(event.filename)
                elif isinstance(event, PartInfo):
                    current.write(event.raw)
                elif event is Events.PART_COMPLETE:
                    current.close()
                    uploads.append(current)
    except MultipartError:
        if current:
            current.close()
        if uploads:
            remove_files(uploads)

In my idea, the only way to deal with the parser is to ask for events, one at a time. Much like calling next() on an iterator gives you one thing at a time. We can use this similarity to write a generator such that it replicates the functionality of your parser.parse.

At the end of the day, all interactions with the parser are just regular function calls, and they can be used in a bajillion different ways.

defnull commented 5 years ago

So we can both build the other approach as a wrapper around our own. Nice trick with the generator as a way to send data into a deep callstack. I did not think of that. If the wrapper itself is simple or nice is not really important, as long as it is possible. So, nigher of our approaches is 'more powerful' than the other. It boils down to preference. Let's try and discuss on that level? (If that's possible. We are discussing about taste, after all)

List of events / one event at a time: I find it perfectly clear and intuitive that a single chunk of data produces multiple events, and that I get all these events in one step. This is what the parser does, internally, after all. It parses the entire chunk, as far as it can, and produces a number of events. My parser just returns the list of events. Your parser buffers these events internally, and returns them one by one. You could also delay the parsing to the point the event is actually requested, but I'd not do that either. See next point. Iterators are perfect for what we want to express: We have N of something (events), and want to handle these in order. Python has a very well established iterator protocol. Why not use it? Why ask a parser for individual events, one by one? I do not see the benefit of not using iterators (or lists) of events.

Parsing in one step / delayed parsing: I like to get errors as early as possible and not delay or split any significant work. Or in other words: I'd prefer parse(chunk) to be expensive and require error-checking, while get_next_event() is basically free and can never throw an error. Delaying parsing up until the event is actually requested would mean that both can be expensive and need error-checking, doubling the try/expect and cognitive overhead. I see no reason do not parse all data that is available, and return (or, in your case, buffer) all events that were found.

Stream of events / stream of steam of events I also find it easier to reason about a single parser emitting three possible events in a well defined order, instead of two entities (the parser, and the part), one emitting the other, as well as three more events. The additional level of nested for-loops and complexity is a high price for the detail that now a part emits data chunks, and not the parser. This is pure taste, though

Perhaps we can agree on some of these points easier that the whole complex :)

theelous3 commented 5 years ago

Lazy writing at the moment, but to summarise a response to the above, 95% yes.

I've rewritten much of it locally. Here are two examples of usage. One in your style, where parser.parse(chunk) gives a list of the events created by the parse, and one in my style, where each event is requested individually. Under the hood .parse and .recv are both just calling a method .queue_events, which does all the heavy lifting. The rest is just fluff regarding iteration or one-by-oneing.

Python 3.7.2 (default, Jan 11 2019, 21:31:15) 
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from multipart import MultipartParser
>>> 
>>> # setup data
>>>
>>> chunk_0 = '--8banana133744910kmmr13a56!102!2405\r\nContent-Disposition: form-data; name="file_1"; filename="test_file1.txt"; Content-Type: application/octet-strea'
>>> chunk_1 = 'm\r\n\r\nCompoo'
>>> chunk_2 = 'per\r\n--8banana'
>>> chunk_3 = '133744910kmmr13a5'
>>> chunk_4 = '6!102!2405--\r\n'
>>> 
>>> boundary = '8banana133744910kmmr13a56!102!2405'.encode()
>>>
>>> # parsing all at once with .parse
>>> 
>>> parser = MultipartParser(boundary)
>>> events0 = parser.parse(chunk_0.encode())
>>> events1 = parser.parse(chunk_1.encode())
>>> events2 = parser.parse(chunk_2.encode())
>>> events3 = parser.parse(chunk_3.encode())
>>> events4 = parser.parse(chunk_4.encode())
>>> 
>>> events0
[<Events.NEED_DATA: 1>]
>>> events1
[<multipart.Part object at 0xb6ffeb4c>, <Events.NEED_DATA: 1>]
>>> events2
[PartData(raw=bytearray(b'Compooper'), length=9), <Events.NEED_DATA: 1>]
>>> events3
[<Events.NEED_DATA: 1>]
>>> events4
[<Events.FINISHED: 2>]
>>> 
>>>
>>> # parsing one by one with .next_event 
>>> 
>>> parser = MultipartParser(boundary)
>>> parser.next_event()
<Events.NEED_DATA: 1>
>>> parser.recv(chunk_0.encode())
>>> parser.next_event()
<Events.NEED_DATA: 1>
>>> parser.recv(chunk_1.encode())
>>> parser.next_event()
<multipart.Part object at 0xb6ffea2c>
>>> parser.next_event()
<Events.NEED_DATA: 1>
>>> parser.recv(chunk_2.encode())
>>> parser.next_event()
PartData(raw=bytearray(b'Compooper'), length=9)
>>> parser.next_event()
<Events.NEED_DATA: 1>
>>> parser.recv(chunk_3.encode())
>>> parser.next_event()
<Events.NEED_DATA: 1>
>>> parser.recv(chunk_4.encode())
>>> parser.next_event()
<Events.FINISHED: 2>
>>> 
theelous3 commented 5 years ago

You can have a look-see here: https://github.com/theelous3/multipart/tree/remove_io

theelous3 commented 5 years ago

Question: what does this code cover?

        elif line[0] in ' \t' and self.headerlist:
            name, value = self.headerlist.pop()
            self.headerlist.append((name, value+line.strip()))

Something about spaces before the header name and attaching stuff to stuff... not really sure.