microsoft / FASTER

Fast persistent recoverable log and key-value store + cache, in C# and C++.
https://aka.ms/FASTER
MIT License
6.32k stars 567 forks source link

Missing records after compaction and checkpointing. #780

Closed wassim-k closed 1 year ago

wassim-k commented 1 year ago

Hi,

I'm experminting with FASTER KV as a state storage for Trill. I'm having an odd issue where when I run the following test, it's failing due to the number of records stored on disk being less than the records inserted,

In this case I hvae intentionally set the index size to a small number in order to demonstrate the issue I'm having.

Can you please explain the behaviour I'm seeing here.

The test result:

Message: 
Assert.Equal() Failure
Expected: 70000
Actual:   37386

  Stack Trace: 
StreamableStateTests.StreamableState_Benchmark() line 51
--- End of stack trace from previous location ---

  Standard Output: 

1073ms / 1.07s: Initiating store

1106ms / 1.11s: Upserting 70000 records

34ms / 0.03s: Checkpointing records before compaction

1312ms / 1.31s: Compacted unitl 1680064

8ms / 0.01s: Checkpointing records after compaction

6ms / 0.01s: Checkpointing index

6450ms / 6.45s: Initiating store

475ms / 0.48s: Querying 37386 records
internal class StreamableState : IDisposable
{
    private const string LogFileName = "hlog.log";
    private readonly IDevice _logDevice;
    private readonly ICheckpointManager _checkpointManager;
    private readonly FasterKV<long, long> _fasterKV;
    private bool _disposedValue;

    public StreamableState(string directory)
    {
        if (!Directory.Exists(directory))
        {
            Directory.CreateDirectory(directory);
        }

        _logDevice = Devices.CreateLogDevice(Path.Join(directory, LogFileName));

        _checkpointManager = new DeviceLogCommitCheckpointManager(
            new LocalStorageNamedDeviceFactory(),
            new DefaultCheckpointNamingScheme(Path.Join(directory, "checkpoints")));

        _fasterKV = new FasterKV<long, long>(
            1L << 2,
            new LogSettings
            {
                LogDevice = _logDevice
            },
            new CheckpointSettings
            {
                CheckpointManager = _checkpointManager
            },
            tryRecoverLatest: true);
    }

    public async Task UpsertAsync(long key, long value)
    {
        using var session = NewSession();
        await session.UpsertAsync(ref key, ref value);
    }

    public async Task RemoveAsync(long key)
    {
        using var session = NewSession();
        await session.DeleteAsync(key);
    }

    public IEnumerable<long> Query()
    {
        using var session = NewSession();
        using var iter = session.Iterate();

        while (iter.GetNext(out var recordInfo))
        {
            yield return iter.GetValue();
        }
    }

    public async Task CheckpointAsync()
    {
        await _fasterKV.TakeHybridLogCheckpointAsync(CheckpointType.FoldOver);
    }

    public async Task CheckpointIndexAsync()
    {
        await _fasterKV.TakeIndexCheckpointAsync();
    }

    public long Compact()
    {
        using var session = NewSession();
        return session.Compact(_fasterKV.Log.SafeReadOnlyAddress);
    }

    public void Dispose()
    {
        Dispose(disposing: true);
        GC.SuppressFinalize(this);
    }

    protected virtual void Dispose(bool disposing)
    {
        if (!_disposedValue)
        {
            if (disposing)
            {
                _checkpointManager.Dispose();
                _logDevice.Dispose();
                _fasterKV.Dispose();
            }

            _disposedValue = true;
        }
    }

    private ClientSession<long, long, long, long, Empty, IFunctions<long, long, long, long, Empty>> NewSession() =>
        _fasterKV.NewSession(new SimpleFunctions<long, long>());
}
public class StreamableStateTests
{
    private readonly ITestOutputHelper _testOutputHelper;

    public StreamableStateTests(ITestOutputHelper testOutputHelper)
    {
        _testOutputHelper = testOutputHelper;
    }

    [Fact]
    public async Task StreamableState_Benchmark()
    {
        var count = 70_000;
        int actualCount = 0;

        var directory = Path.Join(Path.Join(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "faster"));

        using (var state = await BenchmarkOperation(state => $"Initiating store", () => Task.FromResult(new StreamableState(directory))))
        {
            var values = Enumerable.Range(0, count).ToList();

            await BenchmarkOperation($"Upserting {values.Count} records", async () =>
            {
                foreach (var value in values)
                {
                    await state.UpsertAsync(value, value);
                }
            });

            await BenchmarkOperation($"Checkpointing records before compaction", () => state.CheckpointAsync());
            await BenchmarkOperation(compacted => $"Compacted until {compacted}", () => Task.FromResult(state.Compact()));
            await BenchmarkOperation($"Checkpointing records after compaction", () => state.CheckpointAsync());
            await BenchmarkOperation($"Checkpointing index", () => state.CheckpointIndexAsync());
        }

        using (var state = await BenchmarkOperation(state => $"Initiating store", () => Task.FromResult(new StreamableState(directory))))
        {
            var all = await BenchmarkOperation(all => $"Querying {all.Count} records", () => Task.FromResult(state.Query().ToList()));
            actualCount = all.Count;
        }

        Directory.Delete(directory, true);

        Assert.Equal(count, actualCount);
    }

    private async Task BenchmarkOperation(string description, Func<Task> operation)
    {
        // Upsert
        var stopWatch = Stopwatch.StartNew();
        await operation();
        stopWatch.Stop();

        _testOutputHelper.WriteLine(string.Empty);
        _testOutputHelper.WriteLine($"{stopWatch.ElapsedMilliseconds}ms / {stopWatch.Elapsed.TotalSeconds:0.00}s: {description}");
    }

    private async Task<T> BenchmarkOperation<T>(Func<T, string> description, Func<Task<T>> operation)
    {
        // Upsert
        var stopWatch = Stopwatch.StartNew();
        var result = await operation();
        stopWatch.Stop();

        _testOutputHelper.WriteLine(string.Empty);
        _testOutputHelper.WriteLine($"{stopWatch.ElapsedMilliseconds}ms / {stopWatch.Elapsed.TotalSeconds:0.00}s: {description(result)}");

        return result;
    }
}
badrishc commented 1 year ago

Fix linked in PR. Also, you cannot use a hash table smaller than 512 bytes, i.e., 8 cache lines, so set index size in constructor to at least 1L << 3.

badrishc commented 1 year ago

Another comment, creating a new session per upsert/delete call is hugely wasteful. Please consider using a pool of sessions, as done in https://github.com/microsoft/FASTER/blob/main/cs/playground/AsyncStress/SerializedFasterWrapper.cs using _sessionPool.

wassim-k commented 1 year ago

Thanks for the quick response, I've tested with larger index sizes but still doesn't matter how large the index, after inserting enough records, some are lost, I'll wait for the PR before testing again.

I had another question about compaction, I've modified the test slightly to insert the same set of records twice, and as expected the log file doubled in size, but then I ran Compact and CheckpointAsync expecting the log size to half again but instead it increased to 3x the first size, can you please explain this behaviour?

32ms / 0.03s: Initiating store

88ms / 0.09s: Upserting 10000 records

21ms / 0.02s: Checkpoint after 10000 records upserted.
hlog.log size: 240128

51ms / 0.05s: Upserting 10000 records

5ms / 0.01s: Checkpoint after 10000 duplicate records upserted.
hlog.log size: 480256

125ms / 0.13s: Compacted #480064

3ms / 0.00s: Checkpointing records after compaction
hlog.log size: 720384

95ms / 0.10s: Initiating store

7ms / 0.01s: Checkpointing index

47ms / 0.05s: Querying 10000 records
[Fact]
public async Task StreamableState_Benchmark()
{
    var count = 10_000;
    int actualCount = 0;

    var directory = Path.Join(Path.Join(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location), "faster"));

    using (var state = await BenchmarkOperation(state => $"Initiating store", () => Task.FromResult(new StreamableState(directory))))
    {
        var values = Enumerable.Range(0, count).ToList();

        await BenchmarkOperation($"Upserting {values.Count} records", async () =>
        {
            foreach (var value in values)
            {
                await state.UpsertAsync(value, value);
            }
        });

        await BenchmarkOperation($"Checkpoint after {count} records upserted.", () => state.CheckpointAsync());
        _testOutputHelper.WriteLine($"{StreamableState.LogFileName} size: {GetLogSize(directory)}");

        await BenchmarkOperation($"Upserting {values.Count} records", async () =>
        {
            foreach (var value in values)
            {
                await state.UpsertAsync(value, value);
            }
        });

        await BenchmarkOperation($"Checkpoint after {count} duplicate records upserted.", () => state.CheckpointAsync());
        _testOutputHelper.WriteLine($"{StreamableState.LogFileName} size: {GetLogSize(directory)}");

        await BenchmarkOperation(compacted => $"Compacted #{compacted}", () => Task.FromResult(state.Compact()));
        await BenchmarkOperation($"Checkpointing records after compaction", () => state.CheckpointAsync());
        _testOutputHelper.WriteLine($"{StreamableState.LogFileName} size: {GetLogSize(directory)}");
    }

    using (var state = await BenchmarkOperation(state => $"Initiating store", () => Task.FromResult(new StreamableState(directory))))
    {
        await BenchmarkOperation($"Checkpointing index", () => state.CheckpointIndexAsync());
        var all = await BenchmarkOperation(all => $"Querying {all.Count} records", () => Task.FromResult(state.Query().ToList()));
        actualCount = all.Count;
    }

    Directory.Delete(directory, true);

    Assert.Equal(count, actualCount);
}
badrishc commented 1 year ago

The fix is at https://github.com/microsoft/FASTER/pull/781

badrishc commented 1 year ago

The compaction does not reduce file size, rather it works at the granularity of entire files. Size of file is based on segment size (1gb default). This is configurable via settings.

wassim-k commented 1 year ago

If I'm using a slow storage for example, would it be more performant to reduce segment size in order to read less data on FasterKV initialization? or does it only read the data it needs from a certain address in the larger log file?

badrishc commented 1 year ago

It only reads the data it needs from the begin address, in the larger log file. The only benefit of reducing segment size is some disk space savings.