Closed jbaiera closed 6 years ago
This is a good idea, but looking at all referenced pull requests they are all requesting a specific, rather simple, capability of ignoring specific HTTP response codes. Is it not possible to implement this feature first as a short-term solution until the proposed error handlers are implemented?
@dmarkhas Thanks for your input here. We're apprehensive toward introducing a short term solution that would need to be very quickly deprecated and replaced when this solution arrives.
While the above linked PR's are on their own centered around ignoring HTTP responses, there have been other discussions in the community around other problems like serialization issues killing streaming processes, and the desire to increase control over handling data related exceptions.
@jbaiera That makes sense. The proposed solution seems comprehensive enough to cover all of these cases. Some questions:
1) Would it be possible to register more than one handler? 2) When would these handlers be invoked? Your description mentions "when potentially recoverable error states pertaining to data are encountered", but that seems to leave the definition of what is a recoverable state to the connector author.
Thanks for the response :-)
@dmarkhas To give some answers:
Would it be possible to register more than one handler?
While the structure of this isn't too hard (just one handler that runs a chained set of handlers), the configuration for this could be fairly verbose/clunky. I think the simplest way to do this would be to set a single configuration property with a comma separated list of the handlers in the order that they should run.
When would these handlers be invoked? Your description mentions "when potentially recoverable error states pertaining to data are encountered", but that seems to leave the definition of what is a recoverable state to the connector author.
We'll have to start off with defining the error states that we want to handle. We could define a universal error handler/listener interface, but my concern here is that it would either be a confusingly broad interface, or hard to extend for an error handling situation that doesn't conform to it.
Obviously, we want to catch/handle an exception as close to the location that it occurred so that the response to the error is valuable. These locations are diverse, and may be dealing with different objects from different domains. Each handler may have to be structured differently from each other. Ultimately, the failure scenarios are going to dictate the interfaces that handle them.
My thoughts are that there will be two handler interfaces to start off with: A handler interface for serialization errors going from Tuple/Row/Writable/etc to JSON, and a handler interface for failed bulk entries on write operations. The sections of code enclosed and managed by these handlers would most likely be within the RestRepository
at the writeToIndex method, and within the flush method.
Guys,
This issue is referenced for ES-Hadoop 6.1 which is already available. But at the same time the status is still "open". So I am just wondering when such functionality will be available? And/or if it is already available could somebody point me to the documentation on how to use it?
Thanks, Myron
@mchelyada This is mis labelled. It will be coming out in a future release.
While there are many ways to address this problem (@jbaiera mentions actually two problems: serialization to JSON and rejected operations), I am currently trying to make the connector behave like a Spark RDD/DStream filter. The records rejected by ES would be returned by the filter. Since the result would be an RDD/DStream, then the Spark application can handle those as any other RDD/DStream. (print them, save them to a file, save them to Kafka, ...etc). I'm new to Spark, but this approach seems to me consistent with Spark data structures and architecture)
@cerebrotecnologico good luck in that endeavor. If you get it working, feel free to let us know! PR's are always welcome.
For everyone else, I have opened a PR (#1095) for the work I have done to support this feature for bulk writes. We will circle back to serialization error handlers in a later contribution. I want to try and tackle the problem with the most impact first.
With the addition of the experimental failure handler API for bulk write failures this is marked as completed. More failure handlers will be coming out as we clean up, simplify, and solidify the API's.
ES-Hadoop is a project that often embraces the fail-fast mantra, that exceptions and errors are often exceptional circumstances that need direct user interaction to fix and should not be ignored. This works great for batch jobs that are idempotent and easy to re-run, but the Hadoop ecosystem has since moved on to streaming applications being just as common as batch with patterns like lambda architecture and near-realtime processing.
As such, users are finding it difficult to use the connector as every time the connector experiences a non-recoverable exception such as in serializing malformed data, or data that is rejected by Elasticsearch under consistency guarantees.
We want to implement something close to Logstash's Dead Letter Queues, but persisting bad data can be a challenge when working on a distributed system like Hadoop. Logstash's Dead Letter Queues takes advantage of Logstash's internal binary formats and the fact that they are installed on a system with a (often) non-ephemeral filesystem. ES-Hadoop has no such internal binary format, opting to use each integration's data abstraction, and supports a plethora of integrations where HDFS or some other persistent storage is not always guaranteed to be available.
We are planning instead to implement something like a "Dead Letter Handler" instead of queue. Users would be able to implement interfaces for certain classes of errors or certain critical sections within the connector. These handlers would allow users to provide implementations that decide the connector's actions when potentially recoverable error states pertaining to data are encountered. Users would register the handlers with the connector by implementation name, and the connector will instantiate and configure the handlers at start up by means of reflection or SPI.
The project will also provide out of the box implementations of these handlers to cover the most common use cases, such as
Ignore errors and log them
,Ignore all errors
, and the existing behaviorraise all exceptions
.Users that wish to gain something similar to the Logstash Dead Letter Queue, would be able to implement their own handlers to persist the troubled data, such as sending it to HDFS, storing it in a queuing system like Kafka, or storing it on the local filesystem for example.