IBMStreams / streamsx.topology

Develop streaming applications for IBM Streams in Python, Java & Scala.
http://ibmstreams.github.io/streamsx.topology
Apache License 2.0
29 stars 43 forks source link

I want to be able to join arbitrary streams, without going through a TWindow #93

Closed scotts closed 4 years ago

scotts commented 9 years ago

What I think I want is something like TStream<T>.join(TStream<U> other).

In LogWatch, I need to implement a deterministic join. That is, I want the results of my join to be independent of the tuple rates on either side of the join. The windows I need to maintain are also dependent not just on the data in that window, but on the data I see on the other stream. I cannot express this with the window clause in SPL. But, in SPL, because I can have arbitrary streams converge on different ports of an operator, I can implement my own ad-hoc windows and implement a deterministic join.

The SPL code for this is here: https://github.com/scotts/streamsx.demo.logwatch/blob/master/language/com.ibm.streamsx.demo.logwatch.language/DeterministicJoin.spl

What I'm doing there is not obvious. I'm maintaining partitioned windows for each stream (which is why I have a map that goes from an rstring to a list of tuples). When I receive a tuple on the RealTime side, I check to see if there is a match in logins, which is the window for the Successes side. If there is no match, then I add the RealTime tuple to the suspects. When I receive a tuple on the Successes side, I check to see if there is a match in suspects. If there is no match, then I add the Successes tuple to logins, so that it can be matched when/if I receive a tuple on the RealTime side.

Note that I evict tuples from the window for the RealTime side based on tuples seen on the Successes stream, which is something I cannot express using SPL window clauses.

I tried implementing this in the Java API, and I thought I was able to do it, but @hildrum and I just talked it over at length and convinced ourselves it does not work. My attempt is here: https://github.com/scotts/streamsx.demo.logwatch/blob/master/topology/src/streamsx/demo/logwatch/topology/DeterministicJoin.java

The technique that we came up with was to doing a TStream.join on both streams, and call TStream.last on the other stream. That way we had mirrored joins, and we did a TStream.union on the mirrored joins. That produces the correct results when I execute it, but we don't think it's guaranteed to do so.

We thoroughly confused ourselves (well, at least I was confused) by trying to reason about when tuples were received when, and when exactly we would see tuples on the TStream.last() window.

What convinced me that in the Java code, when I receive a failure tuple, I look for matches on the success side, but then I never store the failure tuple. And I can't, because the logic that needs to see that failure tuple lives in a completely different BiFunction, in a completely different join. Because I don't store that tuple, it's possible for me to miss matches.

My conclusion, then, is that I can't implement a deterministic join where I maintain my own windows. I think that an interface like TStream<T>.join(TStream<U> other) would allow me to do that. I'm not entirely sure how it would work - at first it seems like it would be okay for users to provide a BiFunction as before, but the problem is that interface does not provide a way for the user to know which side received the tuple. We could ask users to provide two BiFunctions, one for each side, but that may be going to much towards the SPL side. (Since they're essentially onTuple Left and onTuple Right.)

I know this may not be a high priority at the moment, as we're focusing on making the simple things simple, and this is not a simple thing.

ddebrunner commented 9 years ago

Maybe the correct approach is the ability to have a TWindow where the contents are defined as a function (somehow), so it's application defined what is in the window. That's been a customer request in SPL as well, but might be easier to do here.

ddebrunner commented 9 years ago

One option is to drop into the more complex Java primitive operator mode, so implement a Java primitive operator that performs the join, and then invoke it from the topology. Though that might have to wait until multiple ports are supported.

scotts commented 9 years ago

I was thinking it would make sense to have something like TStream<T>.last(Predicate<T> pred) to support some notion of delta-based windows. But that does not actually solve my current problem, because I need to define the contents for window A based on the tuples seen on window B. In order to do what I want to do, I need to have access to the tuples and windows for both A and B at the same time.

I had not considered a Java primitive - I did not look in detail on how to integrate one, but I did look a bit. I could see that being a workaround. Although that also opens up using an SPL operator as a workaround. An unspoken rule I was adhering to was only using the Java Topology API.