delta-incubator / delta-dotnet

DeltaLake bindings for dotnet based on delta-rs
Apache License 2.0
23 stars 5 forks source link

Table of Contents generated with DocToc

This package is a C# wrapper around delta-rs.

It uses the tokio-rs runtime to provide asynchronous behavior. This allows the usage of .NET Tasks and async/await to take advantage of the same behavior provided by the underlying rust library. This library also takes advantage of the Apache Arrow C Data Interface to minimize the amount of copying required to move data between runtimes.

alt text

The bridge library incorporates delta-rs and tokio-rs as shown in the image below. alt text

NOTE: On unix systems, there is the possibility of a stack overflow due to small stack sizes for the .NET framework. The default size should correspond to ulimit -s, but we can override this by setting the environment variable DOTNET_DefaultStackSize to a hexadecimal number of bytes. The unit tests use 180000.

Quick Start

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow;
using Apache.Arrow.Memory;
using Apache.Arrow.Types;
using DeltaLake.Runtime;
using DeltaLake.Table;

public static Runtime CreateRuntime()
{
    return new DeltaRuntime(RuntimeOptions.Default);
}

public static Task<DeltaTable> CreateDeltaTable(
    Runtime runtime,
    string path,
    CancellationToken cancellationToken
)
{
    var builder = new Apache.Arrow.Schema.Builder();
    builder.Field(fb =>
    {
        fb.Name("test");
        fb.DataType(Int32Type.Default);
        fb.Nullable(false);
    });
    var schema = builder.Build();
    return DeltaTable.CreateAsync(
        runtime,
        new TableCreateOptions(uri, schema)
        {
            Configuration = new Dictionary<string, string>(),
        },
        cancellationToken);
}

public static Task<DeltaTable, Runtime> InsertIntoTable(
    DeltaTable table,
    CancellationToken cancellationToken)
{
    var allocator = new NativeMemoryAllocator();
    var recordBatchBuilder = new RecordBatch.Builder(allocator)
        .Append(
            "test",
            false,
            col => col.Int32(arr => arr.AppendRange(Enumerable.Range(0, length))));
    var options = new InsertOptions
    {
        SaveMode = SaveMode.Append,
    };
    await table.InsertAsync(
        [recordBatchBuilder.Build()],
        schema,
        options,
        cancellationToken);
}