Open billoley opened 6 years ago
Why don't you just do ByteBuffer.duplicate() ? It's the same backing array, but you have new pointers which allow you to read the underlying data - without disrupting the original process.
https://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#duplicate()
My understanding is that before passing the data from a Compressor to the Decompressor, that Compressor.close() needs to be called.
After I get a copy of the Compressor's BitOutput object, I call:
newCompressorOutput.writeBits(0x0F, 4);
newCompressorOutput.writeBits(0xFFFFFFFF, 32);
newCompressorOutput.skipBit();
newCompressorOutput.flush();
.. and then initialize the Compressor.
If this is not true, then I may have other options.
I think there are also a few synchronization issues with sharing the underlying data.
1) Continue to ingest and compress TSDB data while queries are happening. 2) If I need to age off the compressor data (N hour cache) at the same time that it is being queried, I would have to synchronize around the use of the Compressor and Decompressor.
You only need to call close() before reading unknown number of items with Decompressor. If you know how many items to read, then there's no need for close(). Given that possibility, you could create a wrapper around your active compressor, such as the following example (modify to your needs, including any concurrent protection):
public class CompressInterface {
private int count = 0;
private ByteBufferBitOutput out;
private GorillaCompressor compressor;
public CompressInterface(long blockStamp) {
out = new ByteBufferBitOutput(8192);
compressor = new GorillaCompressor(blockStamp, out);
}
/**
* Add new value to the compression stream
*/
public void put(long timestamp, double value) {
compressor.addValue(timestamp, value);
count++;
}
/**
* Read current values from the compression stream
*/
public Stream<Pair> stream() {
Stream.Builder<Pair> builder = Stream.builder();
GorillaDecompressor decompressor = new GorillaDecompressor(new ByteBufferBitInput(out.getByteBuffer()
.duplicate()));
for(int i = 0; i < count; i++) {
builder.accept(decompressor.readPair());
}
return builder.build();
}
}
As for aging the data, you should close() the compressor as soon as the datapoint passes your block mark. And at that point, the underlying buffer is immutable -> no problems with synchronization. You only need this type of wrapper before the block is closed. Unless I misunderstood your last requirement.
@billoley Did the previous comment solve your issue? Or is there still something you would wish to be changed? I might consider bookkeeping "helpers" for next release if you wish to have them from the library.
I believe the concept of a wrapper class is working for me. It might be useful to create helper classes that handle common use cases.
1) Compressor / Decompressor access from one place 2) Periodic closing of Compressors 3) Age off of compressors
Bill Oley
On Tue, Feb 13, 2018 at 1:27 AM, Michael Burman notifications@github.com wrote:
@billoley https://github.com/billoley Did the previous comment solve your issue? Or is there still something you would wish to be changed? I might consider bookkeeping "helpers" for next release if you wish to have them from the library.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/burmanm/gorilla-tsc/pull/13#issuecomment-365163724, or mute the thread https://github.com/notifications/unsubscribe-auth/AFVhwzr852CXWwz2DaUEKVDH1Ic-3cfNks5tUStDgaJpZM4Ryn3j .
I'm actually using the LongArrayOutput version of BitOutput. The accessor for the backing array makes a copy before returning it, but it sizes the copy to length + 1. When I use this array in a Decompressor and read the number of entries that I added, I get an ArrayIndexOutOfBoundsException. This does not happen if I close the compressor first.
Considering that the getLongArray() method is already copying the array (and this solution doesn't work without replicating a lot of the code to implement close()), I think that adding a copy constructor would be a good idea.
Can you create a test that causes that ArrayIndexOutOfBoundsException? If you added n-items and can't read n-items back, then there's a bug somewhere.
I pushed 2.1.0 version which allows to get the underlying array with getLongArray() and it won't do a copy anymore.
Note that the GorillaDecompressor works fine without closing in most cases. This particular case was caught by one of my regression tests.
package test.package;
import fi.iki.yak.ts.compression.gorilla.*;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
public class TestGorillaDecompressor {
@Test
public void testReadWIthoutClose() {
long start = System.currentTimeMillis();
LongArrayOutput compressorOutput = new LongArrayOutput(480);
GorillaCompressor compressor = new GorillaCompressor(start,
compressorOutput);
long ONE_DAY = TimeUnit.DAYS.toMillis(1);
long t = start;
compressor.addValue(t, 1.0);
compressor.addValue(t + ONE_DAY, 3.0);
LongArrayInput decompressorInput = new
LongArrayInput(compressorOutput.getLongArray()); GorillaDecompressor decompressor = new GorillaDecompressor(decompressorInput);
for (int x = 1; x <= 2; x++) {
Pair pair = decompressor.readPair();
System.out.println(pair.getTimestamp() + " -- " +
pair.getDoubleValue()); } } }
Bill Oley
On Wed, Mar 14, 2018 at 4:51 AM, Michael Burman notifications@github.com wrote:
I pushed 2.1.0 version which allows to get the underlying array with getLongArray() and it won't do a copy anymore.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/burmanm/gorilla-tsc/pull/13#issuecomment-372946926, or mute the thread https://github.com/notifications/unsubscribe-auth/AFVhw8nKDOXlKLieZXIcC2ghSb9LhnRPks5teNobgaJpZM4Ryn3j .
Fixed in commit 32807f2f29b50a76ac67c1fbc5b5e1e1619b16ba
Provide a mechanism to get an exact copy of each BitOutput implementation.
If a user keeps a reference to the BitOutput implementation passed to a Compressor/GorillaCompressor, this will allow an exact copy to be made, finalized (as if Compressor.close() was called), and passed to a Decompressor/GorillaDecompressor without affecting the original Compressor/GorillaCompressor.
Useful in a production environment where we want to read the contents of the Compressor/GorillaCompressor but are not ready to close.