vasia / gelly-streaming

An experimental Graph Streaming API for Apache Flink
Apache License 2.0
138 stars 44 forks source link

Adding data-parallel window graph aggregation #26

Closed drfloob closed 7 years ago

drfloob commented 7 years ago

@vasia I added a windowAssigner-agnostic slice method to GraphStream / SimpleEdgeStream, hoping to generalize slice for both use cases (see here), but much of the existing logic revolves around neighborhoods, and is apparently spread across a couple classes. I'll open another issue to discuss refactoring it.

I started over and hacked together a non-reducing connected components sliding window example in a unit test, using only the DisjointSet implementation from gelly-streaming (see here). This essentially reproduces most of WindowGraphAggregation with a No-Op reduce step.

I now (again) think PR #25 is the best way forward for these sorts of non-reducing window folds. What do you think?

As an aside, I think there's probably a more powerful abstraction underneath WindowGraphAggregation (other than GraphAggregation), but I'm not sure what to call it. EdgeStream -> distribute edges uniformly -> apply an arbitrary window -> local fold on each graph partition-window. For global connected components aggregation, add a reduce step at the end.

senorcarbone commented 7 years ago

Hey @drfloob!

Big thumbs up for the interest btw!

drfloob commented 7 years ago

Hey @senorcarbone! Excellent explanations, I appreciate it. You're right, what I have in mind is essentially a "data parallel window graph aggregation". My use case is finding clusters of collaborators and their indirect networks from GitHub event data as it's happening, for live community metrics and collaborator recommendations. My slides aren't finalized, these may help you get a feel for the idea: drfloob.com/pulse/slides. The big picture is that "whole-world" computations can be run in batch offline, and that low-latency graph snapshot analysis of recent activity can provide an interesting context in which to interpret those batch results.

I wasn't aware of the v2 changes, I'll have to take a look at that branch. Sounds exciting!

I totally get the original intention for window graph aggregations, that they were meant to produce a global metric, hence the tight coupling to tumbling windows. It's also fairly straightforward to modify it a bit to support both global and window-local use cases (hence this PR), which makes it a much more powerful abstraction IMHO, and doesn't hurt otherwise :-). To implement this feature in gelly-streaming, outside of WindowGraphAggregation right now, I think would either duplicate a bunch of code or require refactoring out a new base to both classes.

I'd love to see a distributed min-cut / max-flow algorithm. I found a few papers for potential implementations, but it'll be a while before I have a chance to work on it.

P.S. I'm interested in what you think a sophisticated sliding window aggregation implementation would look like. Would it involve retractions (from the Dataflow paper)?

senorcarbone commented 7 years ago

Very interesting slides @drfloob and cool use case! I like the way you think :)

Btw regarding your use case, you might not really need a window aggregation after all, you could simply do a continuous keyed aggregation on the github events (i am assuming that you already partition your stream by some attribute otherwise you have to stick to the global aggregation solution). For example, you keyBy the github event type (fork, PR, etc) and then you continuously compute the CCs per key which is defacto parallel. That means that for the same computational complexity you generate a more up-to-date stream (by 'folding' continuously to update local CCs). Again, if you really need to compute CCs in the complete github graph the only way to go is to collect everything in one place (as with the existing WindowGraphAggregation), otherwise your metric will be only partial/inconsistent.

Now for implementing the parallel window aggregation (if it is really necessary for you) I would suggest you create a new ParallelWindowGraphAggregation and simply wrap the WindowGraphAggregation (without changing the existing implementation) and pass a NoOp Reduce in the end. That, I think we can easily merge and there will be no conflicts with the changes we are going to add.

Teaser: Gelly-Stream v2 will allow you to run BSP/BAP iterative algorithms on graph streams. So probably anything related to windows among others will be changed. This is of course part of our research so we cannot share yet all the internals (we got to take a phd at some point :P )

Please create a new Issue on the Min-Cut/Max-Flow Algorithms (with citations) that you have in mind and we discuss it there. Sounds promising 👍

On sophisticated sliding window aggregation: it means that you do not aggregate too many times the same thing (<=log(#partialAggregates)). There has been extensive work on this, unfortunately in sequential stream processing and not so much in out-of-order processing. Currently, most systems apply panes but there is also pairs and higher-order sliding window trees for efficient aggregation sharing. Here is some recent work from our side. That is just to give you an idea of what I mean. There are some groups that are already working hard on applying some of these ideas to out-of-order processing so we will probably see a Flink-compatible efficient aggregation soon.

drfloob commented 7 years ago

@senorcarbone The same "BSP" as is in Google Pregel, yes? Very neat! Please send me a link to your dissertation when it's finished, I'd like to read it. And thank you for the links! I had no idea there was so much more potential for sophistication in sliding windows, I look forward to digging in.

In my project, I've simplified the problem to assuming that any kind of github event is as good as any other for the purpose of suggesting collaborators, so the type of event is stripped away. And I really am interested in just the graph formed over the past [N] [Time units], with an N/5 slide to show change over time. The global aggregation hasn't been a bottleneck, actually. I'm pushing 15,000 records / second, of which about 2000 on average make it to Flink due to filtering, and gelly-streaming chugs along nicely. It's been great.

I'll be glad to give you a patch that's easier to merge. From an engineering perspective, deep nesting is a code smell that should most likely be solved with composition, and it's especially bad when there's no intention of utilizing the inheritance in a "ParallelWindowGraphAggregation is a WindowGraphAggregation" context (see The Liskov Substitution Principle).

My current PR essentially adds 2 very thin constructors to an existing class, which solves the problem well with very little code, and is fully backwards compatible. I can clean up the PR a bit to make that more clear if you'd like. If this is difficult to merge due to future plans, I think the lesser of two evils (code duplication vs deep inheritance) is code duplication, so I'd opt to create a new class parallel to WindowGraphAggregation that is essentially identical. But I hope the more elegant solution will be accepted :-). Do please let me know.

I'd also be happy to brainstorm around the challenges these changes present, if you'd like. Regardless, I think this is a powerful addition; I'd like to see this feature merged in whichever form it needs to be in.

senorcarbone commented 7 years ago

@drfloob thanks for the clarification, even though...I got a bit confused now.

I thought your initial PR is unrelated to what we are discussing here but I guess I am wrong, sorry about that. I simply skimmed through it (we really need to find some free time to review it properly) but I think that was only about adding general window assigners instead of restricting to tumbling windows. For that case, simply putting additional constructors is ok I guess.

Let's limit this to the parallel aggregation to avoid confusion. In your example you seem to produce connected components per partition. Talking in Map Reduce terms it is similar to skipping the shuffle+reduce in a map/combine-reduce job and simply writing down the results of the mappers/combiners. That means that when a window triggers, a seperate DisjointSet is being written to a file, per partition. How do you use exactly the result of that partial computation? Do you do any post-processing? If not, how is this useful to reason about CC in a complete dynamic graph?

P.S. Gelly-Stream BSP is close to Pregel BSP, but for streams... :)

drfloob commented 7 years ago

@senorcarbone Sorry, I fundamentally misunderstood what was going on. it hadn't dawned on me that these windows were emitted per partition. I thought the windows ended up being global despite the partitioning, due to the flatmap that joins the streams at the end. These results are not so useful. Thank you for helping me understand, I closed the PR. This issue should probably close as well, since it's based on the same misunderstanding.

For what it's worth, after realizing my mistake, I found ParallelMerge for ConnectedStreams which seems like it performs the sort of window merge I need, but I haven't been able to determine if it's applicable to keyed/partitioned window streams yet.

senorcarbone commented 7 years ago

No worries @drfloob ! The Flink API still has a lot of improvements to go and can be confusing at times.

Don't hesitate to start new discussions for contributions, especially on adding new algorithms. I think there are lots of cool things to do there.