alex-petrenko / faster-fifo

Faster alternative to Python's multiprocessing.Queue (IPC FIFO queue)
MIT License
179 stars 29 forks source link

Alternate serialization support #28

Closed beasteers closed 2 years ago

beasteers commented 2 years ago

This would be a really simple change - basically if you take this line

https://github.com/alex-petrenko/faster-fifo/blob/8f59ead3206a2a8e8a377e31a09be4a3ac8f993f/faster_fifo.pyx#L12

and instead do:

class Queue:
    dumps = _ForkingPickler.dumps
    loads = _ForkingPickler.loads

Then someone else could do:

import msgpack

class MsgpackQueue(Queue):
    dumps = msgpack.dumps
    loads = msgpack.loads
alex-petrenko commented 2 years ago

Good change! Is msgpack faster btw?

Consider submitting a PR, otherwise I'll get to it in a few days!

beasteers commented 2 years ago

You know it's funny? I thought it was supposed to be, but after submitting the issue, I was like hmmmm maybe I should test it and I guess they improved pickling big numpy arrays cuz in my informal tests, pickle was faster (must be one of the newer protocols idk)

So bad example withstanding, it's basically free flexibility and there are still other optimizations that people could add in -

e.g. do something with a shared memory store? https://arrow.apache.org/docs/python/plasma.html

The other thing that I see this really useful for is being able to skip the serialization of messages that you don't need (task is to always process the latest frame and drop previous):

class LatestQueue(Queue):
    actually_loads = Queue.loads

    def loads(self, x): return x

    def get_latest(self, *a, **kw):
        messages = self.get_many(*a, **kw)
        return self.actually_loads(messages[-1]) if messages else None

Edit: I just re-ran the benchmark script this morning on a range of array sizes and msgpack is slightly faster (2x) for smaller arrays (like (100, 100), but pickle was faster for (100, 100, 100) - so weird lol)

Another use-case is to change the pickle protocol (e.g. use protocol 5 with out-of-band buffers).

alex-petrenko commented 2 years ago

I think regardless of whether this particular serializer is faster or not, your proposed change is valid. Having the ability to use custom serializers has value.

Maybe even have the ability to specify the serializers on a particular Queue object? Such that you could have different queues with different serializers.

Regarding the serialization of big numpy arrays: I'd definitely consider using some sort of shared memory. If you'd consider using something like pytorch, you can do this with Pytorch tensors out-of-the-box: https://pytorch.org/docs/stable/generated/torch.Tensor.share_memory_.html This is what we do in SampleFactory to move a lot of data around. Serialization of an shmem tensor only serializes the headers and the pointer to memory, it's an O(1) thing.

Even better - preallocate a big buffer in shared memory and only send indices through the queue.

beasteers commented 2 years ago

Yeah sorry for the msgpack tangent lol.

Maybe even have the ability to specify the serializers on a particular Queue object?

You mean like pass it to the constructor instead? That's a nice option.

Yeah I absolutely agree with you that queues passing shared memory indices is fantastic idea. I didn't know PyTorch had that built in!

The only shared memory library I've used is plasma so idk what other ones there are and the pros and cons, but maybe supporting a few options as optional dependencies would be great.

alex-petrenko commented 2 years ago

It might be somewhat beyond the scope of the queue itself... I mean that you can consider these two problems separately. I.e. if the serializer for shared memory tensor knows how to just save the pointer to shared memory arena, the queue does not need to know about that stuff. It will just serialize the object and deserialize on the other end. It is not different from let's say standard Multiprocessing.queue.

You mean like pass it to the constructor instead? That's a nice option. Ctor or a setter method. But you gotta be careful not to change the serializer while you already have something in the queue. So probably a ctor.

Regarding Plasma, I'd say it's probably overkill for what you're trying to do, and it's likely going to be slower than plain shared memory. But I don't have much experience with Plasma. Again, I don't think a queue should know anything about this, as long as you can register custom serializers for your objects that use Plasma of whatever, the queue can do its job.

I found some examples here: https://stackoverflow.com/questions/48772727/python-3-6-pickling-custom-procedure/ Not sure if this is the best way to do it in modern python.

beasteers commented 2 years ago

Yeah that's true and I agree that it's out of scope for a queue. That line of thinking was more about having presets I guess, but that could be remedied with a nice "recipes" page showing a few different tested ideas.

Thanks for your insights about the topic! I'll see if I can get a PR pushed up soon.