pystorm / streamparse

Run Python in Apache Storm topologies. Pythonic API, CLI tooling, and a topology DSL.
http://streamparse.readthedocs.io/
Apache License 2.0
1.5k stars 218 forks source link

Running bolts with async code. #330

Closed Darkless012 closed 7 years ago

Darkless012 commented 7 years ago

This is more like question than issue, but I think it is worth solving.

We have bolt that is querying some server, which will respond in ~2s. (Not much we can do about that)

But this results in one bolt waiting for "IO" while it could produce more results. Creating more paralel bolts is not an option, since over certain limit it didn't have any benefit - it takes more to maintain connection with Storm than to do real work.

Can somehow async be used to signalize that this bolt can process other tuples, while doing "emit" after the response is complete?

I'm having trouble to combine streamparse loop with eg. asyncio loop.

Or is there another way to solve problems with "long-running" processing that is waiting basically on IO?

tdhopper commented 7 years ago

@Darkless012 One option is making the queries in the tuple and using tick tuples to collect the results.

Darkless012 commented 7 years ago

@tdhopper I'm not sure if I'm catching up. Can you explain the concept a bit more please?

I have something like this: Topology

So when I'm in GPSBolt.process() I will make a query to external service, but this will wait untill the query is returned and then I can run self.emit([GPS, Address])

If I understand you, I will have to run new process on background (Multiprocessing) and return from process() method. And use process_tick() to: check if there are some result (in Queue) and if so call self.emit([GPS, Address]).

Wouldn't return call in process() method automatically ACK the tuple to Spout (if auto_ack is set)? So I will have to also implement custom Acking of tuples?

I will need to call process_tick() every second - will it impact performance?

I was rather thinking about running asyncio on background and instead of running process_tick() every second I would use callbacks to "emit" message when the request is done. But I don't see a way how this is possible.

Darkless012 commented 7 years ago

I guess using asyncio would require to rewrite Bolt's run() method into asyncio loop.

dan-blanchard commented 7 years ago

If I ever finish pystorm/pystorm#24, we'll be in much better shape to support these sorts of workflows.

Darkless012 commented 7 years ago

Wow, didn't have clue that there is something like this in WIP. Definitely +1 for Asyncio based ones. I would like to (will) contribute into this since it will be probably a must in out processing stream.