tmsmith / Dapper-Extensions

Dapper Extensions is a small library that complements Dapper by adding basic CRUD operations (Get, Insert, Update, Delete) for your POCOs. For more advanced querying scenarios, Dapper Extensions provides a predicate system. The goal of this library is to keep your POCOs pure by not requiring any attributes or base class inheritance.
1.79k stars 586 forks source link

how to bulk update insert delete list data #240

Closed apchenjun closed 3 years ago

apchenjun commented 3 years ago

I want to modify the collection data in batches https://stackoverflow.com/questions/20635796/bulk-update-in-c-sharp Whether to consider adding batch editing functions

valfrid-ly commented 3 years ago

We don't have this feature yet. It's a good idea to keep in mind

apchenjun commented 3 years ago

We don't have this feature yet. It's a good idea to keep in mind

@valfrid-ly It is recommended to add common functions such as bulkinsert, bulkupdate, and bulkdelete, https://github.com/olegil/SqlBulkTools The functions of this library can be transplanted and perfected to support generic dynamic parameters

JMan7777 commented 3 years ago

For MS SQL I posted a bulk insert solution here:

https://github.com/tmsmith/Dapper-Extensions/issues/18#issuecomment-677440696

Bulk delete for MS SQL I also can provide if wanted.

apchenjun commented 3 years ago

For MS SQL I posted a bulk insert solution here:

#18 (comment)

Bulk delete for MS SQL I also can provide if wanted.

@JMan7777 Dapper-Extensions Whether to consider integrating bulkinsert, bulkupdate, and bulkdelete

valfrid-ly commented 3 years ago

Just remember guys it's not MS SQL only.

We support Oracle, DB2, SQL Server, SQLite and MySQL too.

So a feature have to be there for all these databases

apchenjun commented 3 years ago

@JMan7777 Hope to see the bulkinsert, bulkupdate, and bulkdelete codes you provided. Thank you

JMan7777 commented 3 years ago

@apchenjun As the developer said, the project supports multiple databases, not only MS SQL. While I can provide bulk upload and delete for MS SQL if requested, the implementation for the other DBs must be also made by someone ;).

apchenjun commented 3 years ago

@JMan7777 I only use ms sql in my current project, so I need your ms sql version

JMan7777 commented 3 years ago

@apchenjun Understand, will upload my code the next days.

valfrid-ly commented 3 years ago

For MS SQL I posted a bulk insert solution here:

#18 (comment)

Bulk delete for MS SQL I also can provide if wanted.

If you can provide I'll be grateful. But we have to provide for all supported database and include unit test for all of them too.

If you provide this under these guidelines I'll validate and approve your PR.

apchenjun commented 3 years ago

@valfrid-ly @JMan7777 Thank you very much, I am waiting for your good news. I think this function of bulkinsert, bulkupdate, and bulkdelete is very common and useful in projects, and it is necessary to improve this.

valfrid-ly commented 3 years ago

@JMan7777 these features are really useful. I have a friend that already implemented and is going to implement

apchenjun commented 3 years ago

@JMan7777 @valfrid-ly This is some information I refer to, I hope to help you, or give you some inspiration. MicroOrm.Dapper.Repositories/DapperRepository.BulkInsert.cs MicroOrm.Dapper.Repositories/DapperRepository.BulkUpdate.cs Dapper.FastCrud.BulkUpdate.cs Dapper.FastCrud.BulkDelete.cs https://gitee.com/hy59005271/DapperExtensions_Demo https://github.com/KostovMartin/Dapper.Bulk/blob/master/src/Dapper.Bulk/DapperBulk.cs https://github.com/ffhighwind/DapperExtraCRUD/blob/master/DapperExtraCRUD/DapperExtraExtensions.cs https://www.michalbialecki.com/2019/05/21/bulk-insert-in-dapper/ Dapper.SimpleCRUD/SimpleCRUDAsync.cs

JMan7777 commented 3 years ago

@apchenjun My implementation is only covering bulk insert and delete. E.g. the bulk insert makes use of the SQL library bulk copy feature. Will let it up to @valfrid-ly if he wants it ;).

@valfrid-ly If you want I can give you my code for consideration and some inspiration.

valfrid-ly commented 3 years ago

@JMan7777 I'll be glad to receive your help

JMan7777 commented 3 years ago

@valfrid-ly I paste below some code which is not exactly what I use, as I removed a lot of other helper methods from it (e.g. Polly support, MARS multi threading problem workaround, custom IClassMapper interface implementation, etc.) but it should give you an idea. The code is not pretty but hope it will help you out as a starting point.

using Dapper;
using DapperExtensions;
using DapperExtensions.Mapper;
using Microsoft.Data.SqlClient;
using MoreLinq;
using NLog;
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.Data;
using System.Data.Common;
using System.Diagnostics;
using System.Linq;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;

namespace com.sample.classes
{
    public static class CustomDapperAsyncExtensions
    {
        private static readonly Logger Logger = LogManager.GetCurrentClassLogger();

        private static DapperExtensionsConfiguration DapperExtensionsConfiguration;

        public static void SetMappingAssemblies(IList<Assembly> assemblies)
        {
            DapperExtensionsConfiguration = new DapperExtensionsConfiguration(DapperAsyncExtensions.DefaultMapper, assemblies, DapperAsyncExtensions.SqlDialect);
            DapperAsyncExtensions.Configure(DapperExtensionsConfiguration);
        }

        public static async Task BulkDeleteAsync<T>(this SqlTransaction sqlTransaction, IEnumerable<T> entities, IDbTransaction transaction, string tableType, int? commandTimeout = null)
        {
                IClassMapper classMapper = DapperExtensionsConfiguration.GetMap<T>();

                string tableTypeColumnsSelect = $"SELECT name FROM {classMapper.SchemaName}.sys.columns WHERE object_id IN (SELECT type_table_object_id FROM {classMapper.SchemaName}.sys.table_types WHERE name = '{tableType}') ORDER BY column_id ASC";
                IEnumerable<string> tableTypeColumns = await sqlTransaction.Connection.QueryAsync<string>(
                    tableTypeColumnsSelect,
                    null,
                    sqlTransaction,
                    commandTimeout ?? sqlTransaction.Connection.ConnectionTimeout,
                    CommandType.Text
                    );

                if (tableTypeColumns == null || !tableTypeColumns.Any())
                {
                    throw new DataException($"Table type {tableType} is not valid.");
                }

                string targetTable = $"{classMapper.SchemaName}.{classMapper.TableName}";

                DataTable newDeletes = new DataTable("NewDeletes");
                List<DataColumn> keys = new List<DataColumn>();
                List<DataColumn> dataColums = new List<DataColumn>();
                classMapper.Properties.ToList().ForEach(prop =>
                {
                    if (!prop.Ignored)
                    {
                        DataColumn dataColumn = new DataColumn();

                        Type type = System.Nullable.GetUnderlyingType(prop.PropertyInfo.PropertyType);
                        if (type == null)
                        {
                            type = prop.PropertyInfo.PropertyType;
                        }

                        dataColumn.DataType = type;
                        dataColumn.ColumnName = prop.ColumnName;
                        dataColumn.AutoIncrement = prop.KeyType.Equals(KeyType.Identity);
                        if (!prop.KeyType.Equals(KeyType.NotAKey))
                        {
                            keys.Add(dataColumn);
                            if (!prop.KeyType.Equals(KeyType.Identity))
                            {
                                dataColumn.AllowDBNull = false;
                            }
                        }
                        dataColums.Add(dataColumn);
                    }
                });

                //Check if the tableTypeColumns are completely mapped and valid.
                if (tableTypeColumns.Except(dataColums.Select(d => d.ColumnName)).Any())
                {
                    throw new DataException($"Table type {tableType} is not consistant with class mapper for {typeof(T).Name}.");
                }

                //It is important that the columns / keys are in the same sequence than the table type columns!
                newDeletes.Columns.AddRange(dataColums.OrderBy(d => tableTypeColumns.ToList().IndexOf(d.ColumnName)).ToArray());
                newDeletes.PrimaryKey = keys.OrderBy(k => tableTypeColumns.ToList().IndexOf(k.ColumnName)).ToArray();

                // Add some new rows to the collection.
                entities.ToList().ForEach(entity =>
                {
                    DataRow row = newDeletes.NewRow();
                    classMapper.Properties.ToList().ForEach(prop =>
                    {
                        if (!prop.Ignored)
                        {
                            var value = prop.PropertyInfo.GetValue(entity);
                            row[prop.ColumnName] = value ?? DBNull.Value;
                        }
                    });
                    Logger.Trace($"Preparing internal bulk delete data table for '{entity.GetType().Name}' as '{typeof(T).Name}' with identifier '{entity.GetCacheIdentifier()}'");
                    newDeletes.Rows.Add(row);
                });

                //Commit to the internal data table
                newDeletes.AcceptChanges();

                Stopwatch stopwatch = new Stopwatch();
                stopwatch.Start();
                try
                {
                    // Define the DELETE statement.
                    string sqlDelete = $"DELETE A FROM {targetTable} A INNER JOIN @DeleteTable B ON";
                    newDeletes.PrimaryKey.ForEach(pk =>
                    {
                        sqlDelete += $" A.{pk.ColumnName} = B.{pk.ColumnName} AND";
                    });
                    if (sqlDelete.EndsWith(" AND", StringComparison.Ordinal))
                    {
                        sqlDelete = sqlDelete.Remove(sqlDelete.Length - " AND".Length);
                    }

                    // Configure the command and parameter.
                    SqlParameter tvpParam = new SqlParameter();
                    tvpParam.ParameterName = "@DeleteTable";
                    tvpParam.SqlDbType = SqlDbType.Structured;
                    tvpParam.TypeName = $"{classMapper.SchemaName}.{tableType}";
                    tvpParam.Direction = ParameterDirection.Input;
                    tvpParam.Value = newDeletes;

                    SqlCommand deleteCommand = new SqlCommand(sqlDelete, sqlTransaction.Connection, sqlTransaction);
                    deleteCommand.Parameters.Add(tvpParam);
                    deleteCommand.CommandTimeout = commandTimeout ?? sqlTransaction.Connection.ConnectionTimeout;
                    deleteCommand.CommandType = CommandType.Text;

                    // Execute the command.
                    //Don't use async. Still too many issues on MS side :(
                    int rowsDeleted = deleteCommand.ExecuteNonQuery();

                    if (rowsDeleted == entities.Count())
                    {
                        Logger.Trace($"BulkDeleteAsync for {entities.Count()} {typeof(T).Name}'s successful. Elapsed: {stopwatch.ElapsedMilliseconds:0} ms.");
                    }
                    else
                    {
                        throw new DataException($"BulkDeleteAsync for {entities.Count()} {typeof(T).Name}'s failed as the delete affected {rowsDeleted} rows instead.");
                    }
                }
                catch (Exception ex)
                {
                    Logger.Error(ex, $"BulkDeleteAsync for {entities.Count()} {typeof(T).Name}'s failed. Elapsed: {stopwatch.ElapsedMilliseconds:0} ms.");
                    throw;
                }
                finally
                {
                    stopwatch.Stop();
                }
        }

        public static async Task BulkInsertAsync<T>(this SqlTransaction sqlTransaction, IEnumerable<T> entities, IDbTransaction transaction, int? commandTimeout = null)
        {
                //Custom Bulk Insert using SqlBulkCopy
                using (SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlTransaction.Connection, SqlBulkCopyOptions.Default, sqlTransaction))
                {
                    IClassMapper classMapper = DapperExtensionsConfiguration.GetMap<T>();
                    bulkCopy.BatchSize = 25000;
                    bulkCopy.NotifyAfter = 25000;
                    bulkCopy.SqlRowsCopied += (sender, eventArgs) => Logger.Info($"BulkInsertAsync wrote {eventArgs.RowsCopied} records of { entities.Count()} {typeof(T).Name}'s.");
                    bulkCopy.DestinationTableName = $"{classMapper.SchemaName}.{classMapper.TableName}";
                    bulkCopy.BulkCopyTimeout = commandTimeout ?? sqlTransaction.Connection.ConnectionTimeout;

                    DataTable newInserts = new DataTable("NewInserts");
                    List<DataColumn> keys = new List<DataColumn>();
                    classMapper.Properties.ToList().ForEach(prop =>
                    {
                        if (!prop.Ignored)
                        {
                            DataColumn dataColumn = new DataColumn();

                            Type type = System.Nullable.GetUnderlyingType(prop.PropertyInfo.PropertyType);
                            if (type == null)
                            {
                                type = prop.PropertyInfo.PropertyType;
                            }

                            dataColumn.DataType = type;
                            dataColumn.ColumnName = prop.ColumnName;
                            dataColumn.AutoIncrement = prop.KeyType.Equals(KeyType.Identity);
                            if (!prop.KeyType.Equals(KeyType.NotAKey))
                            {
                                keys.Add(dataColumn);
                                if (!prop.KeyType.Equals(KeyType.Identity))
                                {
                                    dataColumn.AllowDBNull = false;
                                }
                            }
                            // We need to specify the column mapping as the column sequence in the Dapper mapping
                            // file might be different than the DB table column sequence
                            bulkCopy.ColumnMappings.Add(dataColumn.ColumnName, dataColumn.ColumnName);
                            newInserts.Columns.Add(dataColumn);
                        }
                    });

                    // Create an array for DataColumn objects.
                    newInserts.PrimaryKey = keys.ToArray();

                    // Add some new rows to the collection.
                    entities.ToList().ForEach(entity =>
                    {
                        DataRow row = newInserts.NewRow();
                        classMapper.Properties.ToList().ForEach(prop =>
                        {
                            if (!prop.Ignored)
                            {
                                var value = prop.PropertyInfo.GetValue(entity);
                                row[prop.ColumnName] = value ?? DBNull.Value;
                            }
                        });
                        Logger.Trace($"Preparing '{bulkCopy.DestinationTableName}' bulk insert for '{entity.GetType().Name}' as '{typeof(T).Name}' with identifier '{entity.GetCacheIdentifier()}'");
                        newInserts.Rows.Add(row);
                    });

                    //Commit to the internal data table
                    newInserts.AcceptChanges();

                    Stopwatch stopwatch = new Stopwatch();
                    stopwatch.Start();
                    try
                    {
                        // Write from the source to the destination.
                        await bulkCopy.WriteToServerAsync(newInserts).ConfigureAwait(false);
                        Logger.Trace($"BulkInsertAsync for {entities.Count()} {typeof(T).Name}'s successful. Elapsed: {stopwatch.ElapsedMilliseconds:0} ms.");
                    }
                    catch (Exception ex)
                    {
                        Logger.Error(ex, $"BulkInsertAsync for {entities.Count()} {typeof(T).Name}'s failed. Elapsed: {stopwatch.ElapsedMilliseconds:0} ms.");
                        throw;
                    }
                    finally
                    {
                        stopwatch.Stop();
                    }
                }
           }
    }
}

In your Startup.cs class (or wherever you initialize Dapper) please set the mapping assemblies as in the following example:

CustomDapperAsyncExtensions.SetMappingAssemblies(new[] {
                typeof(AAA).Assembly,
                typeof(BBB).Assembly
            }); 

If you have any question just contact me.

valfrid-ly commented 3 years ago

Having the bulkinsert actually as individual inserts we have in the DapperExtensions. It is an overload of the insert method that receives a list of the entity.

It's already there for while

valfrid-ly commented 3 years ago

It can be found here: DapperExtensions

We have for Insert, Update and Delete.

What were are missing is a REAL bulkinsert and bulkupdate that sends only 1 request to the database to do it at once.

Lines 151 to 168 are the following for insert and bulk insert:

    /// <summary>
    /// Executes an insert query for the specified entity.
    /// </summary>
    public static void Insert<T>(this IDbConnection connection, IEnumerable<T> entities, IDbTransaction transaction = null, int? commandTimeout = null) where T : class
    {
        Instance.Insert(connection, entities, transaction, commandTimeout);
    }

    /// <summary>
    /// Executes an insert query for the specified entity, returning the primary key.
    /// If the entity has a single key, just the value is returned.
    /// If the entity has a composite key, an IDictionary&lt;string, object&gt; is returned with the key values.
    /// The key value for the entity will also be updated if the KeyType is a Guid or Identity.
    /// </summary>
    public static dynamic Insert<T>(this IDbConnection connection, T entity, IDbTransaction transaction = null, int? commandTimeout = null) where T : class
    {
        return Instance.Insert(connection, entity, transaction, commandTimeout);
    }