taoensso / nippy

The fastest serialization library for Clojure
https://www.taoensso.com/nippy
Eclipse Public License 1.0
1.04k stars 60 forks source link

Decompress/Deserialize form Java worker when data inserted via amazon ica #55

Closed ryanmmmmm closed 9 years ago

ryanmmmmm commented 9 years ago

We are trying to utilize the Amazon Connector Library:

https://github.com/awslabs/amazon-kinesis-connectors

I read through this issue:

https://github.com/mcohen01/amazonica/issues/36

I still want to compress on the wire and like how nippy doe sit, I just want to comsume messages in other code like Java.

We are inserting records via amazonica but we need to decompress/deserialize via java so I even created a small API method in a small clojure library to use nippy since that seemed to make sense:

Im using a String since it makes the calling of the Clojure API simpler from Java and can use the default Amazon connector library code which goes ahead and converts the HeapByteBuffer to string. I'll optimize that later.

(defn- -deserializeKinesisEvent
  [this byte-buffer-string]
  (let [
        byte-buffer (ByteBuffer/wrap (.getBytes byte-buffer-string))
        b (byte-array (.remaining byte-buffer))]
    (.get byte-buffer b)
    (nippy/thaw b)))

It looks like Sandy is the default for thaw/freeze correct for amazonica? I just want to confirm there is nothing else needed more thso i can try to get this working on java or clojure.

Exception when i try to call that method from Java to thaw:

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 [java]     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 [java]     at java.lang.Thread.run(Thread.java:744)
 [java] Caused by: java.lang.Exception: Thaw failed: Encrypted data or wrong compressor?
 [java]     at taoensso.nippy$thaw$ex__3212.doInvoke(nippy.clj:359)
 [java]     at clojure.lang.RestFn.invoke(RestFn.java:423)
 [java]     at taoensso.nippy$thaw$try_thaw_data__3216.invoke(nippy.clj:377)
 [java]     at taoensso.nippy$thaw.doInvoke(nippy.clj:393)
 [java]     at clojure.lang.RestFn.invoke(RestFn.java:410)
 [java]     at datasnap_core.csv_gen.transformer$_deserializeKinesisEvent.invoke(transformer.clj:67)
 [java]     at datasnap_core.csv_gen.Transformer.deserializeKinesisEvent(Unknown Source)
 [java]     ... 15 more
 [java] Caused by: org.iq80.snappy.CorruptionException: last byte of compressed length int has high bit set
 [java]     at org.iq80.snappy.SnappyDecompressor.readUncompressedLength(SnappyDecompressor.java:425)
 [java]     at org.iq80.snappy.SnappyDecompressor.uncompress(SnappyDecompressor.java:38)
 [java]     at org.iq80.snappy.Snappy.uncompress(Snappy.java:37)
 [java]     at taoensso.nippy.compression.SnappyCompressor.decompress(compression.clj:19)
 [java]     at taoensso.nippy$thaw$try_thaw_data__3216.invoke(nippy.clj:369)
 [java]     ... 19 more
 [java] java.lang.reflect.InvocationTargetException
 [java]     at sun.reflect.NativeMethodAcc
ryanmmmmm commented 9 years ago

I'm trying this:

        byte[] uncompressed = Snappy.uncompress(bytes.array());
        String result = new String(uncompressed, "UTF-8");

but i get a vague:

[java] at samples.redshiftbasic.KinesisEventRedshiftTransformer.toClass(KinesisEventRedshiftTransformer.java:48)ERROR samples.redshiftbasic.KinesisEventRedshiftTransformer: ---------------------------------java.io.IOException: FAILED_TO_UNCOMPRESS(5) [java] [java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(KinesisConnectorR

ryanmmmmm commented 9 years ago

trying other version of Snappy:

        byte[] bytes = buffer.array();
        int bytesSize = bytes.length;
        byte[] uncompressed = Snappy.uncompress(bytes, 0, bytesSize);
        String result = new String(uncompressed, "UTF-8");

[java] INFO samples.redshiftbasic.KinesisEventRedshiftTransformer: toClass22222222222222222222 [java] INFO samples.redshiftbasic.KinesisEventRedshiftTransformer: XXXXXXXXXXXXXXXXXXXXXXXXXXX--------------Invalid copy offset for opcode starting at 23 [java] [java] ERROR samples.redshiftbasic.KinesisEventRedshiftTransformer: [Ljava.lang.StackTraceElement;@55c45fa4 [java] ERROR samples.redshiftbasic.KinesisEventRedshiftTransformer: ---------------------------------org.iq80.snappy.CorruptionException: Invalid copy offset for opcode starting at 23 [java] at samples.redshiftbasic.KinesisEventRedshiftTransformer.toClass(KinesisEventRedshiftTransformer.java:48) [java] at com.amazonaws.services.kinesis.connectors.KinesisConnectorRecordProcessor.processRecords(KinesisConnectorRecordProcessor.java:115) [java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:125) [java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:48) [java] at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:23) [java] at java.util.concurrent.FutureTask.run(FutureTask.java:262) [java] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [java] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [java] at java.lang.Thread.run(Thread.java:744) [java] org.iq80.snappy.CorruptionException: Invalid copy offset for opcode starting at 23 [java] at org.iq80.snappy.SnappyDecompressor.decompressAllTags(SnappyDecompressor.java:165) [java] IN

ryanmmmmm commented 9 years ago

sorry wrong repository for this use sorry about that..