0101 / pipetools

Functional plumbing for Python
https://0101.github.io/pipetools/
MIT License
208 stars 17 forks source link

Streams #6

Closed RedKhan closed 5 years ago

RedKhan commented 5 years ago

Hi everyone, Just wanted to throw an idea out here - would pipetools be a good fit for stream-based/reactive programming? Specifically what I'm thinking of is piping streams of data together, rather than piping functions, where it's not necessarily the case that a stream will a emit a datum every time it consumes one, in the way that a piped function will always pass its result to the next function in the sequence.

Would it be enough to override the "compose" method of the Pipe class to adapt to such a use-case?

0101 commented 5 years ago

Hey, possibly... maybe if you can write down some examples of how you'd expect it to be used I could tell you more.

Maybe you could also somehow combine it with RxPY to get a nicer syntax on top of their functionality.

RedKhan commented 5 years ago

"example"

So, let's say I have a stream of bytes coming in from a TCP socket where we're expecting to see messages delimited by "\n\r\n\r", the idea would be to do something like this:

( TCPBytesStream(host="", port=5555) | concatenate | split(b"\n\r\n\r") | strip | decode("ascii") | validate_and_handle_queue.push)

The TCPBytesStream object will 'emit' data whenever it there is any coming in from its socket, and these may ofcourse represent only fragments of a complete message; so the "concatenate" operation will join the fragments together and the "split" operation will only send complete non-fragmented messages downstream. Some issues come up with this, since the "concatenate" operation will have to store its inputs in a buffer, and that buffer should be cleared when the b"\n\r\n\r" delimiter is reached, so it's difficult for me to imagine handling this kind of use case using a composed sequence of functions.

0101 commented 5 years ago

I suppose you could just use iterators/generators for that. I tried to come up with a simple example, so instead of a TCP stream I used user typing in the console for this simple chatbot, which will respond as soon as total number of words from the user since the last response exceeds 10 (in any number of messages). The listener function uses an internal buffer for this.

import random
from pipetools import pipe, foreach_do

def read_input_forever(prompt):
    print(prompt)
    while True:
        yield input("> ")

def listener(input_generator, word_count=10):
    buffer = []
    for x in input_generator:
        buffer += x.split()
        if len(buffer) > word_count:
            yield random.choice(["mmm", "yeah", "uh huh", "*nods*"])
            buffer = []

"Tell me a story!" > read_input_forever | pipe | listener | foreach_do(print)

Or another example where a stream of messages is converted to a stream of words and then filtered:

from pipetools import foreach, flatten, X, where

("Tell me a serious story!"
    > read_input_forever
    | foreach(X.split())
    | flatten
    | where(X._in_({"big", "hard", "stick", "long"}))
    | foreach(lambda _: "That's what she said!")
    | foreach_do(print))

So yeah this kind of thing can be achieved just by composing functions, as long as your stream producer is implemented as iterable.

RedKhan commented 5 years ago

That's exactly what I needed, thus closing. Thanks ;)