microsoft / FASTER

Fast persistent recoverable log and key-value store + cache, in C# and C++.
https://aka.ms/FASTER
MIT License
6.32k stars 567 forks source link

Problem with version 2.5.1 #820

Closed Aldebaran91 closed 1 year ago

Aldebaran91 commented 1 year ago

There seems to by a problem with the default the version 2.5.1. If I insert about > 15 000 messeges into the FasterKV store and ask for a few messages it will return the wrong values. I insert about 10 to 25 million messages daily.

Aldebaran91 commented 1 year ago

Code fails to read correctly if FASTER.core is used in version "2.5.1" and count of messages inserted is 10 million or more. Code seems to work if 1 million or less messages are inserted. If I downgrade to "2.5.0" all is working again. Breaking change in code? No difference between .Net6 (6.0.407) and .Net7 (7.0.104).

using FASTER.core;

namespace ReadCacheBug
{
    public struct CacheKey : IFasterEqualityComparer<CacheKey>
    {
        public long key;

        public CacheKey(long first)
        {
            key = first;
        }

        public long GetHashCode64(ref CacheKey key)
        {
            return key.key;
        }

        public bool Equals(ref CacheKey k1, ref CacheKey k2)
        {
            return long.Equals(k1.key, k2.key);
        }
    }

    public struct CacheValue
    {
        public string value;

        public CacheValue(string first)
        {
            value = first;
        }
    }

    public class CacheKeySerializer : BinaryObjectSerializer<CacheKey>
    {
        public override void Deserialize(out CacheKey obj)
        {
            obj = new CacheKey(reader.ReadInt64());
        }

        public override void Serialize(ref CacheKey obj)
        {
            writer.Write(obj.key);
        }
    }

    public class CacheValueSerializer : BinaryObjectSerializer<CacheValue>
    {
        public override void Deserialize(out CacheValue obj)
        {
            obj = new CacheValue(reader.ReadString());
        }

        public override void Serialize(ref CacheValue obj)
        {
            writer.Write(obj.value);
        }
    }

    public struct CacheContext
    {
        public int type;
        public long ticks;
    }

    public class CacheFunctions : SimpleFunctions<CacheKey, CacheValue, CacheContext>
    {
        public override void ReadCompletionCallback(ref CacheKey key, ref CacheValue input, ref CacheValue output, CacheContext ctx, Status status, RecordMetadata recordMetadata)
        {
        }
    }

    internal class Program
    {
        static FasterKV<CacheKey, CacheValue> fasterKVStorage;

        static void Main(string[] args)
        {
            IDevice log = Devices.CreateLogDevice("ClassCache.log", preallocateFile: true, capacity: 25_000_000, deleteOnClose: true);
            IDevice objLog = Devices.CreateLogDevice("ObjectCache.data", preallocateFile: true, capacity: 25_000_000, deleteOnClose: true);

            LogSettings logSettings = new LogSettings
            {
                LogDevice = log,
                ObjectLogDevice = objLog,
                MutableFraction = 0,
                ReadCacheSettings = null,
                PageSizeBits = 14,
                MemorySizeBits = 20
            };

            SerializerSettings<CacheKey, CacheValue> serializerSettings = new SerializerSettings<CacheKey, CacheValue>
            {
                keySerializer = () => new CacheKeySerializer(),
                valueSerializer = () => new CacheValueSerializer()
            };

            fasterKVStorage = new FasterKV<CacheKey, CacheValue>(
                size: 1L << 23,
                logSettings: logSettings,
                serializerSettings: serializerSettings,
                comparer: new CacheKey(0)
                );

            InsertAndRead();

            Console.WriteLine("Finished");
        }

        public static void InsertAndRead()
        {
            for (int i = 0; i < 10_000_000; i++)
            {
                using (ClientSession<CacheKey, CacheValue, CacheValue, CacheValue, CacheContext, CacheFunctions> fasterKVInputSession = fasterKVStorage.For(new CacheFunctions()).NewSession<CacheFunctions>())
                {
                    CacheKey keyValue = new CacheKey(i);
                    CacheValue valueValue = new CacheValue(i.ToString());

                    fasterKVInputSession.Upsert(ref keyValue, ref valueValue);

                    if (i % 100_000 == 0)
                    {
                        Console.Clear();
                        Console.WriteLine(i.ToString("N0"));
                    }
                }
            }

            int found = 0;
            for (int i = 101; i > 1; i--)
            {
                long key = i * 10;
                string message = GetMessage(key).Result;

                if (string.Equals(message, (key).ToString()))
                {
                    found++;
                }
            }

            Console.WriteLine($"Found {found} out of 100 correctly.");
        }

        public static async Task<string> GetMessage(long referenceKey)
        {
            using (ClientSession<CacheKey, CacheValue, CacheValue, CacheValue, CacheContext, CacheFunctions> session = fasterKVStorage.For(new CacheFunctions()).NewSession<CacheFunctions>())
            {
                CacheValue output = default;
                CacheKey key = new CacheKey(referenceKey);

                (Status status, CacheValue output) status = (await session.ReadAsync(ref key, ref output)).Complete();

                string itemValue = status.output.value;

                return itemValue;
            }
        }
    }
}
TedHartMS commented 1 year ago

The capacity limit is resulting in truncation. You can remove that parameter and use periodic compaction to control space.

You also revealed a bug in ReadAsync, which will be fixed soon in our new release. Thanks!