python-streamz / streamz

Real-time stream processing for python
https://streamz.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
1.24k stars 148 forks source link

What is difference between .emit() and ._emit() #256

Closed MikeChenfu closed 5 years ago

MikeChenfu commented 5 years ago

I am a new guy to the Streamz. I just try some examples and find ._emit() has a good performance than .emit(). I really appreciate if someone can give me some details about them.

CJ-Wright commented 5 years ago

_emit is a very simple implementation, all it does is calls .update for all the currently subscribed downstreams collates the results and then returns the results.

emit is a much more advanced implementation, which takes into account if the pipeline is being run asynchronously and handles the loop synchronization, couroutine generation, and such.

If you are expecting to use/support the async aspects of the library you should use .emit.

How different is the performance? Can you post a profile of the pipeline?

martindurant commented 5 years ago

To be sure, the "performance" will very likely depend upon what your pipeline is doing, and how you have set it up. In some circumstances (no async/ioloops), emit just calls _emit. If you are running async, however, each subsequent task in emit will generally be scheduled on the next ioloop tick, rather than running in blocking mode as in _emit.

MikeChenfu commented 5 years ago

Thanks @CJ-Wright and @martindurant. In my understanding, theoretically emit should have a better performance than _emit. But my result does not show that.

I process 50 files on two dask-workers in my code.

Here is the result I use emit. It consumes 90 seconds totally.

Screen Shot 2019-06-12 at 9 26 46 AM

Here is the result I use _emit. It consumes about 40 seconds and It can be found that there is overlap between reading files and kernel operations.

Screen Shot 2019-06-12 at 9 23 24 AM
martindurant commented 5 years ago

emit() calls _emit(), so no way it could be more efficient https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L306

It would be interesting to find out what is happening during those white stripes: I assume this is back-pressure in action, the system is waiting for futures to become finished and gathering results before firing off more work.

MikeChenfu commented 5 years ago

@martindurant I am also curious about these white stripes. Besides, I just tried .emit(fn,asynchronous=True). I got a good performance like_emit's result. Is it possible '.emit' treat the process as a synchronous process? I process 50 csv files in my code.

Here is my code to read files.

async def f():
    for fn in glob.glob('data/*.csv'):
        source.emit(fn,asynchronous=True) 
IOLoop.current().add_callback(f)
CJ-Wright commented 5 years ago

Would it be possible to post the pipeline you are using?

mrocklin commented 5 years ago

I suspect that by calling _emit you're effectively ignoring backpressure. My guess is that you want a buffer somewhere in your pipeline.

On Wed, Jun 12, 2019 at 6:12 PM Christopher J. Wright < notifications@github.com> wrote:

Would it be possible to post the pipeline you are using?

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/python-streamz/streamz/issues/256?email_source=notifications&email_token=AACKZTCLMJPBZPTXTMVCQ63P2EN5DA5CNFSM4HXAJE2KYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXQ6W2Q#issuecomment-501345130, or mute the thread https://github.com/notifications/unsubscribe-auth/AACKZTB4BVOMZ46ODDBML3LP2EN5DANCNFSM4HXAJE2A .

MikeChenfu commented 5 years ago

@CJ-Wright Here is the graph regarding to the workflow.

Screen Shot 2019-06-12 at 11 15 12 AM
CJ-Wright commented 5 years ago

Yep, @mrocklin is correct, you need a buffer before your gather nodes so multiple things can be processed at once. Otherwise the pipeline will wait for the computation to be finished before the next is processed.

MikeChenfu commented 5 years ago

@CJ-Wright @mrocklin Yeah, get a better result. Is there any tips to set the size of buffer? I try different size of buffer and get the different performance.

Screen Shot 2019-06-12 at 11 31 11 AM
CJ-Wright commented 5 years ago

The size of the buffer depends on how much compute and ram you have available. I usually go with larger numbers and let dask figure it out.

CJ-Wright commented 5 years ago

I think this is mostly resolved. Discussion of buffer size should most likely be an independent issue (and a PR into the docs if we are able to come up with a heuristic approach).

@MikeChenfu please feel free re-open if you need more help with this issue.