krasserm / streamz

A combinator library for integrating Functional Streams for Scala (FS2), Akka Streams and Apache Camel
Apache License 2.0
282 stars 44 forks source link

Bug in producing to aws-kinesis? #59

Closed PaulAtFormation closed 5 years ago

PaulAtFormation commented 5 years ago

I don't really know if this is a streamz issue; my apologies for the error if not.

I'm attaching a small project that uses test-containers and LocalStack to try to do some very preliminary experimentation with streamz-camel-fs2. Specifically, it just tries to send a String to a Kinesis stream, being careful to first create the stream in beforeAll. This fails in the guts of the KinesisProducer with a NullPointerException that strongly suggests the PARTITION_KEY header is not set on the message. And that might be true--I certainly don't do anything to set it. But neither does the documentation suggest I need to, or tell me how to.

To run the test, you'll need to have a good Docker environment running, and in the sbt shell, do it:test.

So again, my apologies for some vagueness here. But I'm happy to help with whatever the path forward is, whether it's documentation, chasing down a bug, or what have you.

data.tar.gz

krasserm commented 5 years ago

Hi Paul,

I'm pretty sure this is a Camel issue or at least Camel-related. Please try connecting to Kinesis using Camel alone. If this succeeds but then fails in combination with streamz it makes sense to take a closer look at the streamz Camel integration itself. See also the Camel AWS Kinesis documentation for setting the CamelAwsKinesisPartitionKey header and StreamMessage for setting headers in streamz (requires creation of a new StreamMessage instance with an updated headers map).

PaulAtFormation commented 5 years ago

Thanks Martin! I hadn't understood from the Camel AWS Kinesis documentation that the CamelAwsKinesisPartitionKey header is required, or that sending an A wouldn't necessarily construct a valid StreamMessage for a given component. I guess I'd also wish the component would not NPE and throw something more explanatory. But explicitly constructing a StreamMessage with the CamelAwsKinesisPartitionKey does indeed work. Thanks so much!

krasserm commented 5 years ago

You're welcome, glad it works now!