denis-ivanov / EntityFrameworkCore.ClickHouse

ClickHouse provider for Entity Framework Core.
https://clickhouse.tech/
23 stars 5 forks source link

(Question) Is there a bulk insert here or planned? #51

Closed Exzept1on closed 3 months ago

TPCharts commented 3 months ago

The ClickHouse.Client library (which this project references) includes a good bulk inserter.

If you're fine with loading everything into memory and inserting, it's very easy to use (see Clickhouse.Client documentation on ClickHouseBulkCopy)

If you're doing a bulk insert, you might run into issues like me where the items loaded to insert overload memory (e.g. importing 40m+ items), so, for any readers' convenience...

Here's an example of an extension method I use on IAsyncEnumerable incorporating Clickhouse.Client bulk insert (could easily convert it to accept IList, IEnumerable, etc.) to simplify the bulk insert progress from LINQ projections.

T is the type of EntityFramework DTOs that I use with EntityFramework.Core.Clickhouse in non-bulk insertion scenarios.

Its use case is streaming results from CSV, database queries, HTTP requests, other databases, etc.

It's worth playing around with the batchSize parameter depending on the types of objects you're inserting. For some cases, 25k is the max I can use without running into problems. In other cases, over 250k is fine.

EDIT: worth mentioning: if you're inserting or querying large batches of items, you might run into the same issue I did here: https://github.com/DarkWanderer/ClickHouse.Client/issues/507

Recent activity suggests they've resolved the issue but at this moment (a day later) it's unlikely a NuGet package with the fix was released.

public static class ClickHouseDatabaseExtensions
{
    /// <summary>
    /// Consumes an AsyncEnumerable and bulk inserts to database while limiting memory consumed.
    /// </summary>
    /// <typeparam name="T"></typeparam>
    /// <param name="source">The source.</param>
    /// <param name="tableName">Name of the table.</param>
    /// <param name="mapper">The mapper. If not provided, reflection will be used to map.</param>
    /// <param name="afterWrite">Executes after each batch write. Use to update UI, etc..</param>
    /// <param name="cancellation">If cancelled, will end processing after the current batch finishes writing to database.</param>
    /// <param name="batchSize">Size of each batch. 25,000 may be reasonable for larger object.</param>
    /// <param name="countTotal">Optional total count. Used to provide more precise progress reports.</param>
    /// <param name="progress">Progress reporter for updating UI.</param>
    /// <returns></returns>
    public static async Task<long> DoBulkClickHouseInsertAsync<T>(
        this IAsyncEnumerable<T> source,
        string tableName,
        IMapperCustom<T, object?[]>? mapper = null,
        Action? afterWrite = null,
        int batchSize = 25000,
        int? countTotal = null,
        IProgressReporter? progress = null
    )
    {
        Guard.NotNoText(tableName);
        Guard.NotNull(source);

        // Setup values for reporting and writing to database
        Type type = typeof(T);

        string typeName = type.Name.SplitCamelCase().ToLowerInvariant();

        string itemProcessingMessage = $"Processing {typeName}s for next database write batch...";

        // Could remove this and explicity provide table columns if needed
        // Cheap to do otherwise assuming it's called once during a bulk insert
        List<System.Reflection.PropertyInfo> propsOrderedByName =
        [
            .. type.GetProperties().OrderBy(p => p.Name)
        ];

        // Assumes that database table columns are ordered by name
        List<string> columns = [.. propsOrderedByName.Select(p => p.Name)];

        // Store batches into a dictionary. Allows database writes
        // to occur while still processing & mapping source. (At 25,000
        // items, the database write seems to take 50-75% of the time
        // taken to map the next batch for my typical use case).
        Guid? currentBuildingBatchId;

        List<object?[]>? currentBuildingBatch;

        Dictionary<Guid, List<object?[]>> batches = [];

        createAndSetNewCurrentBuildingBatch();

        // Setup ClickHouse bulk copy.
        await using var connection = new ClickHouseConnection(DbContextClickHouse.ConnectionString);

        using var bulkCopy = new ClickHouseBulkCopy(connection)
        {
            DestinationTableName = $"default.{tableName}",
            ColumnNames = columns,

            // Updated in write tasks to actual batch size
            BatchSize = 0,

            // Not useful to increase in our case since we're consuming
            // an async enumerable & can't use the built int
            // batching mechanism (otherwise have to load
            // everything into memory, which isn't feasible)
            MaxDegreeOfParallelism = 1
        };

        // If refactoring, ensure that InitAsync
        // is only called once or timeout exceptions will start.
        await bulkCopy.InitAsync();

        // Process the source enumerable
        // into batches and write when
        // batchSize is reached
        int countProcessed = 0;

        // Batch writing tasks stored here
        // to run while we continue processing
        // new batches
        List<Task> writeTasks = [];

        await foreach (var item in source)
        {
            countProcessed++;

            progress?.Report(() => countProcessed, () => $"Processing {typeName}...");

            var obj = mapper is not null

                // Maintenance nightmare, but potentially useful micro-optimization
                ? mapper.Map(item)

                // Use reflection for convenience when performance isn't as critical
                // Also much easier to maintain (TODO: might remove mapper option eventually)
                : propsOrderedByName.Select(p => p.GetValue(item, null)).ToArray();

            currentBuildingBatch!.Add(obj);

            // Wrapper around IProgress<T> that offers convenience
            // around throttling message building, reporting
            // to UI, percentage calculations, etc. In this case
            // it only reports once per 500ms
            progress?.Report(
                () => countProcessed,
                () => countTotal ?? 0,
                () => itemProcessingMessage
            );

            if (currentBuildingBatch.Count >= batchSize)
            {
                writeTasks.Add(createWriteBatchTask(currentBuildingBatchId!.Value));

                createAndSetNewCurrentBuildingBatch();
            }
        }

        if (currentBuildingBatch!.Count > 0)
        {
            // Add remaining items (will usually be triggered unless
            // the source happened to have an exact
            writeTasks.Add(createWriteBatchTask(currentBuildingBatchId!.Value));
        }

        await Task.WhenAll(writeTasks);

        return bulkCopy.RowsWritten;

        void createAndSetNewCurrentBuildingBatch()
        {
            currentBuildingBatch = [];
            currentBuildingBatchId = Guid.NewGuid();
            batches.Add(currentBuildingBatchId.Value, currentBuildingBatch);
        }

        Task createWriteBatchTask(Guid batchId)
        {
            return WriteBatchAsync(bulkCopy, batchId, batches, typeName, progress, afterWrite);
        }
    }

    private static async Task WriteBatchAsync(
        ClickHouseBulkCopy bulkCopy,
        Guid batchId,
        Dictionary<Guid, List<object?[]>> batchDict,
        string typeName,
        IProgressReporter? progress = null,
        Action? afterWrite = null
    )
    {
        var batch = batchDict[batchId];

        // Wrapper around IProgress<T> that offers convenience
        // around throttling message building, reporting
        // to UI, percentage calculations, etc. In this case
        // it only reports once per 500ms. Could replace with
        // IProgress<T> if you don't need anything fancy.
        progress?.Report(
            () =>
                $"Writing batch with {batch.Count:N0} {typeName}s to database at {DateTime.Now.ToLongTimeString()}...",
            forceShow: true
        );

        bulkCopy.BatchSize = batch.Count;

        // bulkCopy wants the batch object's properties to be type of "object"
        await bulkCopy.WriteToServerAsync(batch);

        // Free up used batch memory
        batchDict.Remove(batchId);

        // Optional function that might be used to refresh UI, etc.
        afterWrite?.Invoke();
    }
}
Exzept1on commented 3 months ago

This is awesome, thanks