python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

Real tail of stream (last n elements) #415

Open JulianWgs opened 3 years ago

JulianWgs commented 3 years ago

To my knowledge there is currently no way to show (or plot) the last n elements of a Streamz DataFrame.

I think this is a very useful function for debugging, but also for slower Streamz, for example CFD results from OpenFOAM.

Here is a naive implementation:

def tail(length):
    def tail_func(x, y):
        x = x.append(y, ignore_index=True)
        return x.iloc[-length:].reset_index(drop=True)
    return tail_func

Here is an example: (thanks for the feedback @martindurant)

import pandas as pd
from streamz import Stream
import numpy as np

names = ["Mike", "Tim", "Anna", "Kim", "Andy"]

def emitter():
    n = np.random.randint(1, 5)
    return {"name": np.random.choice(names, n), "age": np.random.randint(18, 32, n)}

stream = Stream.from_periodic(emitter, 1)
df1 = pd.DataFrame({"name": ["test"], "age": [40]})
out = stream.map(pd.DataFrame).accumulate(tail(6), start=pd.DataFrame()).to_dataframe(df1)
# out.stream.sink(print)  # optional, or some other output

stream.start()

# if using a Jupyter Lab
out

This could either be the bare representation of window or could replace the tail function. I think the name tail suits this function better than the current one. What do you think?

Implementation wise one could be much more efficient by using .loc and round robin replacing values and saving the order somewhere else, but for a first go, I would stick by the naive version.

martindurant commented 3 years ago

You may well be right, and I encourage others to offer an opinion. First, may I nit-pick and suggest this version as a simpler example

import pandas as pd
from streamz import Stream
import numpy as np

names = ["Mike", "Tim", "Anna", "Kim", "Andy"]

def emitter():
    n = np.random.randint(1, 5)
    return {"name": np.random.choice(names, n), "age": np.random.randint(18, 32, n)}

stream = Stream.from_periodic(emitter, 1)
df1 = pd.DataFrame({"name": ["test"], "age": [40]})
out = stream.map(pd.DataFrame).accumulate(tail(6), start=pd.DataFrame()).to_dataframe(df1)
out.stream.sink(print)  # optional, or some other output

stream.start()
JulianWgs commented 2 years ago

Hey @martindurant ,

it seams that there are not many other opinions on this issue.

How should we proceed? For a pull request two things must be discussed before hand:

  1. Round robin, naive or other implementation
  2. How would the API look like?
martindurant commented 2 years ago
  1. I am fine with naive - we can always come up with something better in the future
  2. your API looks OK? You mean you would like to have a DataFrame.tail method that does the same thing? That would be OK with me.

To my knowledge there is currently no way to show (or plot) the last n elements of a Streamz DataFrame.

I should have noted, that hvplot on a dataframe has a backlog= argument (see here) which can be displayed in a table of plot, and might have been what you wanted all along.