aws-samples / amazon-kinesis-data-analytics-examples

Example applications in Java, Python and SQL for Kinesis Data Analytics, demonstrating sources, sinks, and operators.
MIT No Attribution
139 stars 87 forks source link

java.io.NotSerializableException: Non-serializable lambda #50

Open GregoryKueski opened 1 year ago

GregoryKueski commented 1 year ago

When trying to start Kinesis Data Analytics with my flink application, I'm getting the following error org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Cannot serialize operator object class org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory. caused by java.io.NotSerializableException: Non-serializable lambda. I'm using exactly the same code provided in the examples for creating the kinesis data streams sink. I also use the same lines in the build.sbt. What could be the inconvenient?

Seems like it's the lambda from the setPartitionKeyGenerator method.

What could be a workaround?

def createSink: KinesisStreamsSink[String] = {
  val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties
  val outputProperties = applicationProperties.get("ProducerConfigProperties")

  KinesisStreamsSink.builder[String]
    .setKinesisClientProperties(outputProperties)
    .setSerializationSchema(new SimpleStringSchema)
    .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName))
    .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode))
    .build
}