seznam / euphoria

Euphoria is an open source Java API for creating unified big-data processing flows. It provides an engine independent programming model which can express both batch and stream transformations.
Apache License 2.0
82 stars 11 forks source link

euphoria-core: unbounded source without explicit windowing defined #38

Open xitep opened 7 years ago

xitep commented 7 years ago

Executing a stateful operator which itself does not define a windowing strategy and which consumes - directly or indirectly - a non-windowed, unbounded data set is basically undefined. This is, such an operator, e.g. ReduceByKey, consumes an infinite stream of data never reaching a point at which results can be emitted. Note:

It'd be good to immediately fail the attempt to translate a flow with the described situation. The situation is mostly unintentional, but the mistake is hard to spot at runtime - basically leaving the programmer wondering why no output is produced.

One might argue, though, that the situation is practically valid (e.g. it works nicely on flink actually): the semantic of such a situation might well be that the results are produced when the unbounded data source is closed/cancelled. however, such computed results are non-deterministic and unsound with the theory.

je-ik commented 7 years ago

I'm not sure if this is directly related to this - but there is at least one valid use case of stateful non-windowed operation - that is a non-windowed stream join. The example would look like this:

  Dataset<?> stream = ...;
  Dataset<?> table = ...;
  Join.of(stream, table)
    .by(...)
    .using(...);    

This can work without windowing assuming that there is a possibility to convert the table into random access storage. This can be done in many ways, depending on the physical storage of the dataset (e.g. if it is a Kafka stream, then in can be cached locally in the way KTable works in KStreams). There is no windowing needed, because the stateful operation is effectively turned into a stateless, one-element-at-a-time operation.

xitep commented 7 years ago

true enough; looks like the special case mentioned in #41

xitep commented 7 years ago

@je-ik i think your case suggests that the validation mentioned in this ticket is to be done only after the expansion of to operators to their basic "atoms." (or rather more precisely spoken, after the translation layer performed optimizations.) if we can be optimize the join operator example into an "on-the-fly left/right map-side join" (basically turning it into a stateless operation) the validation would naturally not trigger, in any other case i still see the operation undefined, for which we'd like to trigger the validation error. do you agree?

je-ik commented 7 years ago

I agree there is a strong connection between my remark and the issue #41. The main difference that I see here, is that it is effectively possible (under some circumstances) to turn a stream into a random access storage locally (e.g. by reading the whole kafka topic and storing it to memory of local db). Therefore, it is kind of valid not to specify windowing in joining of two streams, provided that at least one of them is type of stream that is in fact a commit log (definition of a commit log is that you can seek inside it and therefore read old data from the stream). When user does not specify windowing in such a case, it is needed at the API level to make sure that one stream is treated as real stream, and the other one is treated as a stream of updates to a stateful database (that is called table-stream duality in the sense of KafkaStreams).

When we identify that we need to cache locally one of the input streams (and I am for simplicity omitting the problem of partitioning), then we can turn the join into "standard" map-side join as desribed in #41. Therefore I see my note as a little generalization of #41. Does that make any sense? :)

xitep commented 7 years ago

@je-ik sorry for the delay. i think i got your point. yes, it makes sense, absolutely.

i just don't have any particular idea about how we could differentiate the two situations - the one you mentioned and the one where none of the streams is "random access" - at the API level. maybe we don't need to. maybe a runtime check - as initially suggested - can handle both cases based on runtime properties of the two data sets. given we - on purpose - do not distinguish between bounded and unbounded data sets on API level, i even think we should not make a difference on API level between streams which do and don't support "random-access".

right now, we don't have any support for random-access, but that will naturally arise from #41. therefore, i think we can proceed to implement the suggested validation and only later extend it with support for "table-streams".