Closed ejoncas closed 8 years ago
I'm aware that I can extend KinesisConnectorRecordProcessorFactory and KinesisConnectorRecordProcessor to modify that behavior. But I'll need to extend 3 classes just to add something that for me it doesn't hurt to put in the library. In addition, I'll be copy pasting 99% of the code from the base classes.
The emitter interface contains a #fail(List records) method. That's quite useful because if for some reason one message cannot be emitted after N retries, we just publish that to an sqs dead letter queue.
We would like to do the same but also when the message cannot be transformed to T. At the moment, if the transformer throws an exception, the KinesisConnectorRecordProcessor class just logs the exception and that's it.
It would be cool if in addition to log the exception, it can call a callback in the transformer or in the interface that you guys think is better, so then we can do something with the messages that can not be transformed (for example, invalid json, or a json without a field that the pipeline assumes present).