python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 149 forks source link

Aggregating data every n elements #351

Open argenisleon opened 4 years ago

argenisleon commented 4 years ago

Hi,

I would like to calculate the .value_counts() from a pandas dataframe in chunks of n elements and output the aggregated result. For example, I have 1000 elements. Calculate the value_counts() for the first 100 and output the result, aggregate the result to the next 100 elements and output it again.

I tried

def func(df):
    return df
stream.map(df["OFFENSE_CODE"]).partition(100).to_batch().map_partitions(func)

but I get

TypeError                                 Traceback (most recent call last)
<ipython-input-291-e68a39a9e8bb> in <module>
      1 def func(df):
      2     return df
----> 3 stream.map(df["OFFENSE_CODE"]).partition(100).to_batch().map_partitions(func)
      4 
      5 # a.value_counts()

~\Anaconda3\lib\site-packages\streamz\collection.py in map_partitions(func, *args, **kwargs)
     19     example = kwargs.pop('example', None)
     20     if example is None:
---> 21         example = func(*[getattr(arg, 'example', arg) for arg in args], **kwargs)
     22 
     23     streams = [arg for arg in args if isinstance(arg, Streaming)]

TypeError: func() missing 1 required positional argument: 'df'

BTW I am not sure this is the best approach to do this. Any hint?

martindurant commented 4 years ago

This does not seem to be a streaming problem, you should be able to achieve what you want with pandas alone.

nils-braun commented 4 years ago

Assuming that you have a stream of pandas dataframe rows (so, that would be pandas series) and not just a single dataframe (because then, @martindurant is right: you do not need to use streamz), you could do something like this:

from streamz import Stream
from operator import add

stream = Stream()
stream.partition(100).to_batch(example=pd.DataFrame(columns=["my_column"])) \
  .to_dataframe()["my_column"] \
  .value_counts() \
  .accumulate_partitions(add)

For example, if you feed it with your series:

import pandas as pd
from time import sleep

for i in range(200):
    stream.emit(pd.Series({"my_column": i % 16}))
    sleep(0.001)
martindurant commented 4 years ago

sleep(0.001)

This is to make it "real-time"ish :)

Thinking about it, you could ~aggregate~ accumulate with a collections.Counter, you don't really need dataframes at all here.