EsotericSoftware / kryo

Java binary serialization and cloning: fast, efficient, automatic
BSD 3-Clause "New" or "Revised" License
6.19k stars 823 forks source link

llegalStateException on calling free for pooled item to synchronized pool using soft references and being full #804

Closed wyhasany closed 3 years ago

wyhasany commented 3 years ago

Using com.esotericsoftware.kryo.util.Pool with threadSafe = true and softReferences = false and a limit.

java.lang.IllegalStateException: Queue full
    at java.base/java.util.AbstractQueue.add(AbstractQueue.java:98)
    at com.esotericsoftware.kryo.util.Pool$1.add(Pool.java:52)
    at com.esotericsoftware.kryo.util.Pool$SoftReferenceQueue.offer(Pool.java:168)
    at com.esotericsoftware.kryo.util.Pool.free(Pool.java:95)
    at serializers.KryoSnappyRedisSerializer.serialize(KryoSnappyRedisSerializer.java:79)

I'm running following code under JMH with 8 concurrent threads. Is that correct behavior?

package tech.viacomcbs.serializers;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.SerializerFactory;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.CompatibleFieldSerializer;
import com.esotericsoftware.kryo.util.DefaultInstantiatorStrategy;
import com.esotericsoftware.kryo.util.Pool;
import org.objenesis.strategy.StdInstantiatorStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;

import java.io.ByteArrayOutputStream;
import java.io.IOException;

public class KryoRedisSerializer implements RedisSerializer<Object> {

    public static final Logger log = LoggerFactory.getLogger(KryoRedisSerializer.class);

    static final byte[] EMPTY_ARRAY = new byte[0];

    private static final Pool<Kryo> kryoPool = new Pool<Kryo>(true, true, 16) {
        @Override
        protected Kryo create() {
            Kryo kryo = new Kryo();
            try {
                kryo.setRegistrationRequired(false);
                CompatibleFieldSerializer.CompatibleFieldSerializerConfig config = new CompatibleFieldSerializer.CompatibleFieldSerializerConfig();
                config.setExtendedFieldNames(false);
                config.setReadUnknownFieldData(true);
                kryo.setDefaultSerializer(new SerializerFactory.CompatibleFieldSerializerFactory(config));
                kryo.setInstantiatorStrategy(new DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
            return kryo;
        }
    };

    Pool<Output> outputPool = new Pool<Output>(true, false, 16) {
        protected Output create () {
            return new Output(1024, -1);
        }
    };

    Pool<Input> inputPool = new Pool<Input>(true, false, 16) {
        protected Input create () {
            return new Input(4096);
        }
    };

    @Override
    public byte[] serialize(Object t) throws SerializationException {
        if (t == null) {
            return EMPTY_ARRAY;
        } else {
            Kryo kryo = null;
            Output output = null;
            try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
                kryo = kryoPool.obtain();
                output = outputPool.obtain();
                output.setOutputStream(stream);
                kryo.writeClassAndObject(output, t);
                output.flush();
                return stream.toByteArray();
            } catch (IOException e) {
                throw new SerializationException("Unable to close stream", e);
            } finally {
                if (kryo != null) {
                    kryoPool.free(kryo);
                }
                if (output != null) {
                    outputPool.free(output);
                }
            }
        }
    }

    @Override
    public Object deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null || bytes.length == 0) {
            return null;
        } else {
            Kryo kryo = null;
            Input input = null;
            try {
                kryo = kryoPool.obtain();
                input = inputPool.obtain();
                input.setBuffer(bytes);
                return kryo.readClassAndObject(input);
            } finally {
                if (kryo != null) {
                    kryoPool.free(kryo);
                }
                if (input != null) {
                    inputPool.free(input);
                }
            }
        }
    }

}
wyhasany commented 3 years ago

It seems that's happened because I've shared Pool<Kryo> between many instance of KryoRedisSerializer as the pool was static.

wyhasany commented 3 years ago

Oh, it seems I've been mistaken. It's looks like that we can't use Pool with threadSafe=true and softReference=true in multithread env. As under congestion the limit of the queue can be reached.

Then: freeObjects.offer(object) at at com.esotericsoftware.kryo.util.Pool.free(Pool.java:95) calls SoftReferenceQueue.offer which delegates call to queue delegate.add() to this anonymously extended LinkedBlockingQueue:

queue = new LinkedBlockingQueue<T>(maximumCapacity) {
    @Override
    public boolean add (T o) {
        if (size() >= maximumCapacity) return false;
        super.add(o);
        return true;
    }
};

there is size checking, however we don't have here any locking, so the size could be changed by the another thread. Then super.add(o) can throw exception which is nowhere handled. To protect ISE Queue full you can just make following change:

-                   if (size() >= maximumCapacity) return false;
-                   super.add(o);
-                   return true;
+                                      return super.offer(o);
theigl commented 3 years ago

PR is merged and will be part of the next release. Thanks!

wyhasany commented 3 years ago

Until the release of Kryo 5.0.4 (probably). If you've found similar bug the basic workaround is switching off soft references:

//fails
Pool<Kryo> kryoPool = new Pool<Kryo>(true, true, 16);
//passes
Pool<Kryo> kryoPool = new Pool<Kryo>(true, false, 16)

Since the next release the first version would work smoothlessly in multi-threaded environment.

@theigl thanks for cooperation 👍