localstack / localstack-java-utils

☕ Java utilities and JUnit integration for LocalStack
Apache License 2.0
75 stars 43 forks source link

Kinesis sending Serialized object. #52

Closed hanymorcos closed 3 years ago

hanymorcos commented 3 years ago

I'm trying to write object as bytes to Kinesis streams, but I'm getting errors. I wrote a class using Java Generic and sending objects as Array of Bytes.

java.io.StreamCorruptedException: invalid stream header: at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:1059)

Method to write to get bytes to ByteBuffer.wrap method.

public byte[] objectToBuffer(T record) throws IOException {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream out = null;

    out = new ObjectOutputStream(bos);
    out.writeObject(record);
    out.flush();
      return bos.toByteArray();
}

To convert ByteBuffer back to object. It's erroring here:

public T bufferToObject(ByteBuffer recordByteBuffer) throws IOException, ClassNotFoundException {

  byte[] bytes = new byte[recordByteBuffer.limit()];
  recordByteBuffer.get(bytes);

  ObjectInputStream objectInputStream = new ObjectInputStream(
          new ByteArrayInputStream(bytes));
  return (T) objectInputStream.readObject();
}

What's really strange is that code works in Junit using Localstack? But it doesn't work on the real AWS kinesis.

hanymorcos commented 3 years ago

I changed the bytes returned to ascii. It's returning Json. Why is AWS returning JSON from Kinesis? I gave it bytes.

hanymorcos commented 3 years ago

I see that Kinesis returns JSON by default but it does take serialized Object as input. Localstack returns and takes serialized object which is creating a testing night mare.

pinzon commented 3 years ago

Hi @hanymorcos, I just wrote this PR (localstack/localstack#3978) that deals with JSON and CBOR serialization for Kinesis for another issue. Could you try it and see if it fix your problem? Thanks.