awslabs / kinesis-kafka-connector

kinesis-kafka-connector is connector based on Kafka Connect to publish messages to Amazon Kinesis streams or Amazon Kinesis Firehose.
Apache License 2.0
153 stars 91 forks source link

AmazonKinesisSinkTask.addUserRecord requires schema #53

Open desavera opened 3 years ago

desavera commented 3 years ago

I have a use case where Topics hold data in Json format and no matter what combination of schema enablers I do the AmazonKinesisSinkTask.addUserRecord call will always force a call to the sinkRecord.valueSchema() ...

private ListenableFuture<UserRecordResult> addUserRecord(KinesisProducer kp, String streamName, String partitionKey,
        boolean usePartitionAsHashKey, SinkRecord sinkRecord) {

    // If configured use kafka partition key as explicit hash key
    // This will be useful when sending data from same partition into
    // same shard
    if (usePartitionAsHashKey)
        return kp.addUserRecord(streamName, partitionKey, Integer.toString(sinkRecord.kafkaPartition()),
                DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));
    else
        return kp.addUserRecord(streamName, partitionKey,
                DataUtility.parseValue(sinkRecord.valueSchema(), sinkRecord.value()));

}

I guess this means I NEED TO PROVIDE a schema to the value ??? I am getting the following exception as the valueSchema is returning null value :

Caused by: java.lang.NullPointerException at com.amazon.kinesis.kafka.DataUtility.parseValue(DataUtility.java:31) at com.amazon.kinesis.kafka.AmazonKinesisSinkTask.addUserRecord(AmazonKinesisSinkTask.java:235)

wookasz commented 3 years ago

@desavera were you able to find a way to avoid having to publish the schema along with your payload?