Euphoria is an open source Java API for creating unified big-data processing flows. It provides an engine independent programming model which can express both batch and stream transformations.
Apache License 2.0
82
stars
11
forks
source link
UnboundedDataSource has no way of reporting stream error #262
Hi,
I am facing an issue where I have KafkaSource with not yet available broker. The call
List<UnboundedPartition<T, OFFSET>> getPartitions(); (interface UnboundedDataSource)
returns emptyList instead of reporting an error. There is even a comment in KafkaSource.class line 225:
// ~ FIXME a leader might not be available (check p.leader().id() == -1)
// ... fail in this situation
A stream can fail for a number of reasons. One of them is connection error (in this case kafka broker was starting and was no yet ready which resulted that List<PartitionInfo> ps = c.partitionsFor(topicId); returned empty list). Returning an empty list hides the error completely and the operation is successfully completed without any data.
Therefore I propose a change in the UnboundedDataSource interafce. The getPartitions() should throw an checked exception for example PartitionsUnavailableException.
Hi Pavel,
thanks for the issue, would you be interested in participating in PR for this? If yes, would you please open a PR so that we can iterate over this?
Many thanks!
Hi, I am facing an issue where I have KafkaSource with not yet available broker. The call
List<UnboundedPartition<T, OFFSET>> getPartitions();
(interface UnboundedDataSource) returns emptyList instead of reporting an error. There is even a comment in KafkaSource.class line 225:A stream can fail for a number of reasons. One of them is connection error (in this case kafka broker was starting and was no yet ready which resulted that
List<PartitionInfo> ps = c.partitionsFor(topicId);
returned empty list). Returning an empty list hides the error completely and the operation is successfully completed without any data.Therefore I propose a change in the UnboundedDataSource interafce. The getPartitions() should throw an checked exception for example PartitionsUnavailableException.