RuedigerMoeller / fast-serialization

FST: fast java serialization drop in-replacement
Apache License 2.0
1.58k stars 248 forks source link

Not releasing references to InputStreams and OutputStreams #110

Closed jedvardsson closed 7 years ago

jedvardsson commented 8 years ago

Hi,

I am reviewing FST to see if we can use it in our projects. I may be overlooking something here so please correct me if am wrong. Looking at the code it seems like FST holds on to the InputStream passed to org.nustaq.serialization.FSTConfiguration#getObjectInput(java.io.InputStream) (analogous for getObjectOutput()).

The wiki examples states that one shouldn't close the resulting ObjectInput and since there are no SoftReferences holding it I can't see how the InputStream will ever be released unless a second call is made to getObjectInput() with a new stream.

RuedigerMoeller commented 8 years ago

fst caches some stuff such as buffers and metainformation (threadlocal). As in messaging applications only small objects are sent, reconstruction of buffers and streams usually consumes more cpu than the serialization itself. In practice this is not an issue as flat byte arrays (which make the most part of inputstreams) do not create GC pressure, so this is a good tradeoff.

The memory consumption is of fixed size, as on reuse the previous objects are reused or overwritten, so its not a memory leak.

In case the overhead is not significant for your app (e.g. you serialize large objects) consider a pull request enabl. ing an explicit reset/null of the cached streams (performance penalty can be significant especially for messaging apps)

jellevictoor commented 7 years ago

Is there a pull request for this already or does this remain open. I'm seeing the same thing. Since I'm serializing very large objects and these are stored in a threadlocal, this means I have my number of threads * the cached bytearray in memory. Causing me to require a lot of memory.

smigfu commented 7 years ago

We do have a similar issue in our product (lots of threads and large byte-arrays). We check after every read/write if the ThreadLocal byte[] has changed (which it does if something larger then the original byte[] has been serilalized/deserialized). If this is the case we reset it to the original one. We had to copy the implementation of the FstDefaultCoder though in order to access the fields. The utility class looks like this:

public final class FstUtil
{
    static final class FstDefaultCoder implements FSTCoder
    {
        private final FSTConfiguration conf;
        private final FSTObjectInput input;
        private final FSTObjectOutput output;

        private FstDefaultCoder()
        {
            conf = FSTConfiguration.createDefaultConfiguration();
            conf.setShareReferences(true);
            input = new FSTObjectInputNoShared(conf);
            output = new FSTObjectOutputNoShared(conf);
        }

        @Override
        public FSTConfiguration getConf()
        {
            return conf;
        }

        @Override
        public byte[] toByteArray(final Object o)
        {
            output.resetForReUse();
            try
            {
                output.writeObject(o);
            }
            catch (final IOException e)
            {
                FSTUtil.rethrow(e);
            }
            return output.getCopyOfWrittenBuffer();
        }

        @Override
        public int toByteArray(final Object obj, final byte[] result, final int resultOffset, final int avaiableSize)
        {
            output.resetForReUse();
            try
            {
                output.writeObject(obj);
            }
            catch (final IOException e)
            {
                FSTUtil.rethrow(e);
            }
            final int written = output.getWritten();
            if (written > avaiableSize)
            {
                throw FSTBufferTooSmallException.Instance;
            }
            System.arraycopy(output.getBuffer(), 0, result, resultOffset, written);
            return written;
        }

        @Override
        public Object toObject(final byte[] arr)
        {
            return toObject(arr, 0, arr.length);
        }

        @Override
        public Object toObject(final byte[] arr, final int off, final int len)
        {
            try
            {
                if (off == 0)
                {
                    input.resetForReuseUseArray(arr);
                }
                else
                {
                    input.resetForReuseCopyArray(arr, off, len);
                }
                return input.readObject();
            }
            catch (final Exception e)
            {
                FSTUtil.rethrow(e);
            }
            return null;
        }

        FSTObjectInput getObjectInput()
        {
            return input;
        }

        FSTObjectOutput getObjectOutput()
        {
            return output;
        }
    }

    private static final int TLS_BYTE_SIZE = 4000;
    private static final ThreadLocal<byte[]> BYTES = ThreadLocal.withInitial(() -> new byte[TLS_BYTE_SIZE]);
    private static final ThreadLocal<FstDefaultCoder> CODER = ThreadLocal.withInitial(() ->
    {
        try
        {
            final FstDefaultCoder coder = new FstDefaultCoder();
            coder.getObjectInput().resetForReuseUseArray(BYTES.get());
            coder.getObjectOutput().resetForReUse(BYTES.get());

            return coder;
        }
        catch (final IOException e)
        {
            throw new UncheckedIOException(e);
        }
    });

    private FstUtil()
    {
        // no instantiation
    }

    public static <T> T decode(final byte[] bytes)
    {
        return decode(bytes, 0, bytes.length);
    }

    @SuppressWarnings("unchecked")
    public static <T> T decode(final byte[] bytes, final int offset, final int length)
    {
        try
        {
            final byte[] copy = bytes.length <= TLS_BYTE_SIZE ? BYTES.get() : new byte[bytes.length];
            System.arraycopy(bytes, 0, copy, 0, bytes.length);
            return (T) CODER.get().toObject(copy, 0, copy.length);
        }
        catch (final Exception e)
        {
            throw new MyApplicationException(e);
        }
        finally
        {
            if (bytes.length > TLS_BYTE_SIZE)
            {
                try
                {
                    CODER.get().getObjectInput().resetForReuseUseArray(BYTES.get());
                }
                catch (final IOException e)
                {
                    throw new UncheckedIOException(e);
                }
            }
        }
    }

    public static byte[] encode(final Object in)
    {
        byte[] bytes = null;
        try
        {
            bytes = CODER.get().toByteArray(in);
            return bytes;
        }
        catch (final Exception e)
        {
            throw new MyApplicationException(e);
        }
        finally
        {
            if (bytes != null && bytes.length > TLS_BYTE_SIZE)
            {
                CODER.get().getObjectOutput().resetForReUse(BYTES.get());
            }
        }
    }

    static FstDefaultCoder getCoder()
    {
        return CODER.get();
    }
}

It works ok for us. Maybe it helps you as well.

jellevictoor commented 7 years ago

Thanks, we have now cut out fst since it was causing way to much overhead. I'll have a look at your implementation and see if it works for us. I just think we are using the framework for the wrong goals

Op wo 22 mrt. 2017 om 09:02 schreef Philipp Marx notifications@github.com:

We do have a similar issue in our product (lots of threads and large byte-arrays). We check after every read/write if the ThreadLocal byte[] has changed (which it does if something larger then the original byte[] has been serilalized/deserialized). If this is the case we reset it to the original one. We had to copy the implementation of the FstDefaultCoder though in order to access the fields. The utility class looks like this:

public final class FstUtil { static final class FstDefaultCoder implements FSTCoder { private final FSTConfiguration conf; private final FSTObjectInput input; private final FSTObjectOutput output;

   private FstDefaultCoder()
   {
       conf = FSTConfiguration.createDefaultConfiguration();
       conf.setShareReferences(true);
       input = new FSTObjectInputNoShared(conf);
       output = new FSTObjectOutputNoShared(conf);
   }

   @Override
   public FSTConfiguration getConf()
   {
       return conf;
   }

   @Override
   public byte[] toByteArray(final Object o)
   {
       output.resetForReUse();
       try
       {
           output.writeObject(o);
       }
       catch (final IOException e)
       {
           FSTUtil.rethrow(e);
       }
       return output.getCopyOfWrittenBuffer();
   }

   @Override
   public int toByteArray(final Object obj, final byte[] result, final int resultOffset, final int avaiableSize)
   {
       output.resetForReUse();
       try
       {
           output.writeObject(obj);
       }
       catch (final IOException e)
       {
           FSTUtil.rethrow(e);
       }
       final int written = output.getWritten();
       if (written > avaiableSize)
       {
           throw FSTBufferTooSmallException.Instance;
       }
       System.arraycopy(output.getBuffer(), 0, result, resultOffset, written);
       return written;
   }

   @Override
   public Object toObject(final byte[] arr)
   {
       return toObject(arr, 0, arr.length);
   }

   @Override
   public Object toObject(final byte[] arr, final int off, final int len)
   {
       try
       {
           if (off == 0)
           {
               input.resetForReuseUseArray(arr);
           }
           else
           {
               input.resetForReuseCopyArray(arr, off, len);
           }
           return input.readObject();
       }
       catch (final Exception e)
       {
           FSTUtil.rethrow(e);
       }
       return null;
   }

   FSTObjectInput getObjectInput()
   {
       return input;
   }

   FSTObjectOutput getObjectOutput()
   {
       return output;
   }

}

private static final int TLS_BYTE_SIZE = 4000; private static final ThreadLocal<byte[]> BYTES = ThreadLocal.withInitial(() -> new byte[TLS_BYTE_SIZE]); private static final ThreadLocal CODER = ThreadLocal.withInitial(() -> { try { final FstDefaultCoder coder = new FstDefaultCoder(); coder.getObjectInput().resetForReuseUseArray(BYTES.get()); coder.getObjectOutput().resetForReUse(BYTES.get());

       return coder;
   }
   catch (final IOException e)
   {
       throw new UncheckedIOException(e);
   }

});

private FstUtil() { // no instantiation }

public static T decode(final byte[] bytes) { return decode(bytes, 0, bytes.length); }

@SuppressWarnings("unchecked") public static T decode(final byte[] bytes, final int offset, final int length) { try { final byte[] copy = bytes.length <= TLS_BYTE_SIZE ? BYTES.get() : new byte[bytes.length]; System.arraycopy(bytes, 0, copy, 0, bytes.length); return (T) CODER.get().toObject(copy, 0, copy.length); } catch (final Exception e) { throw new MyApplicationException(e); } finally { if (bytes.length > TLS_BYTE_SIZE) { try { CODER.get().getObjectInput().resetForReuseUseArray(BYTES.get()); } catch (final IOException e) { throw new UncheckedIOException(e); } } } }

public static byte[] encode(final Object in) { byte[] bytes = null; try { bytes = CODER.get().toByteArray(in); return bytes; } catch (final Exception e) { throw new MyApplicationException(e); } finally { if (bytes != null && bytes.length > TLS_BYTE_SIZE) { CODER.get().getObjectOutput().resetForReUse(BYTES.get()); } } }

static FstDefaultCoder getCoder() { return CODER.get(); } }

It works ok for us. Maybe it helps you as well.

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/RuedigerMoeller/fast-serialization/issues/110#issuecomment-288324834, or mute the thread https://github.com/notifications/unsubscribe-auth/AAnC3TeIebj8J4hyWQ4ZeF2Cpn3ecvO9ks5roNWUgaJpZM4HItwR .

carryxyh commented 5 years ago

@RuedigerMoeller Has this problem been fixed in the latest version? I'd like update the version of our project to solve it.