dotnetcore / osharp

OSharp是一个基于.Net6.0的快速开发框架,框架对 AspNetCore 的配置、依赖注入、日志、缓存、实体框架、Mvc(WebApi)、身份认证、功能权限、数据权限等模块进行更高一级的自动化封装,并规范了一套业务实现的代码结构与操作流程,使 .Net 框架更易于应用到实际项目开发中。
Apache License 2.0
2.77k stars 748 forks source link

数据访问层添加主从数据库(读写分离)架构支持 #217

Closed gmf520 closed 3 years ago

gmf520 commented 3 years ago

[toc]

主从数据库(读写分离)设计

需求分析

需求实现

主从数据库配置选项

从数据库配置选项

    /// <summary>
    /// 从数据库选项
    /// </summary>
    public class SlaveDatabaseOptions : DataErrorInfoBase
    {
        /// <summary>
        /// 获取或设置 权重
        /// </summary>
        public int Weight { get; set; }

        /// <summary>
        /// 获取或设置 数据库连接串
        /// </summary>
        public string ConnectionString { get; set; }
    }

在数据上下文配置中使用,数据上下文配置中,配置了主库连接字符串ConnectionString,从数据库选项集合Slaves,从数据库选择策略名SlaveSelectorName

    /// <summary>
    /// 数据上下文配置节点
    /// </summary>
    public class OsharpDbContextOptions
    {
        // 其他属性

        /// <summary>
        /// 获取或设置 主数据库连接字符串
        /// </summary>
        public string ConnectionString { get; set; }

        /// <summary>
        /// 获取或设置 从数据库轮询策略
        /// </summary>
        public string SlaveSelectorName { get; set; }

        /// <summary>
        /// 获取或设置 从数据库选项集合
        /// </summary>
        public SlaveDatabaseOptions[] Slaves { get; set; }

        // 其他属性
    }

相应的appsetting.json中的JSON配置节点

{
  "OSharp": {
    "DbContexts": {
      "SqlServer": {
        "DbContextTypeName": "OSharp.Entity.DefaultDbContext,OSharp.EntityFrameworkCore",
        "ConnectionString": "Server=localhost;Database=osharpns-dev-api;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true",
        "Slaves": [
          {
            "Name": "Slave01",
            "Weight": 2,
            "ConnectionString": "Server=localhost;Database=osharpns-dev-api-slave01;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true"
          },
          {
            "Name": "Slave02",
            "Weight": 5,
            "ConnectionString": "Server=localhost;Database=osharpns-dev-api-slave02;User Id=sa;Password=Abc123456!;MultipleActiveResultSets=true"
          }
        ],
        "SlaveSelectorName": "Weight",
        "DatabaseType": "SqlServer",
        "AuditEntityEnabled": true,
        "AutoMigrationEnabled": true
      }
    }
  }
}

主从分离数据存储实现

实现思路:

构建数据上下文DbContext

构建数据上下文选项构建者DbContextOptionsBuilder应用具体的数据库驱动平台

数据库连接字符串提供者IConnectionStringProvider,动态获取数据库连接字符串

数据库主从分离策略IMasterSlaveSplitPolicy,决定走主数据库还是走从数据库

返回主数据库连接字符串

从数据库选择器ISlaveDatabaseSelector,从多个从数据库中选择一个执行数据读取

从数据库随机选择器RandomSlaveDatabaseSelector

从数据库顺序选择器SequenceSlaveDatabaseSelector

从数据库滑动加权选择器WeightSlaveDatabaseSelector

返回从数据库连接字符串

流程图:

具体实现:

业务实现的时候从主从分离的数据库中进行数据存取操作,用哪个数据库,主要就是取决于数据库连接字符串的选择,因此,只要在创建数据上下文DbContext实例之前决定好数据库连接字符串,就能动态选择不同的数据库。

OSharp 框架是在构建DbContextOptionsBuilder的时候使用数据库连接字符串的,定义了一个数据库连接字符串提供者IConnectionStringProvider接口来动态提供要连接的数据库连接字符串

/// <summary>
/// 构建<see cref="DbContextOptionsBuilder"/>,附加必要的扩展属性
/// </summary>
public static DbContextOptionsBuilder BuildDbContextOptionsBuilder<TDbContext>(this IServiceProvider provider, DbContextOptionsBuilder builder) where TDbContext : DbContext
{
    ...

    IDbContextOptionsBuilderDriveHandler driveHandler = provider.GetServices<IDbContextOptionsBuilderDriveHandler>()
        .LastOrDefault(m => m.Type == databaseType);
    if (driveHandler == null)
    {
    throw new OsharpException($"无法解析类型为 {databaseType} 的 {typeof(IDbContextOptionsBuilderDriveHandler).DisplayName()} 实例");
    }

/*
    //旧代码
    ScopedDictionary scopedDictionary = provider.GetService<ScopedDictionary>();
    string key = $"DbConnection_{osharpDbContextOptions.ConnectionString}";
    DbConnection existingDbConnection = scopedDictionary.GetValue<DbConnection>(key);
    builder = driveHandler.Handle(builder, osharpDbContextOptions.ConnectionString, existingDbConnection);
*/

    //新代码
    IConnectionStringProvider connectionStringProvider = provider.GetRequiredService<IConnectionStringProvider>();
    string connectionString = connectionStringProvider.GetConnectionString(typeof(TDbContext));
    ScopedDictionary scopedDictionary = provider.GetService<ScopedDictionary>();
    string key = $"DbConnection_{connectionString}";
    DbConnection existingDbConnection = scopedDictionary.GetValue<DbConnection>(key);
    builder = driveHandler.Handle(builder, connectionString, existingDbConnection);

    ...
}

数据库连接字符串提供者IConnectionStringProvider

    /// <summary>
    /// 数据库连接字符串提供器
    /// </summary>
    public interface IConnectionStringProvider
    {
        /// <summary>
        /// 获取指定数据上下文类型的数据库连接字符串
        /// </summary>
        /// <param name="dbContextType">数据上下文类型</param>
        /// <returns></returns>
        string GetConnectionString(Type dbContextType);
    }

IConnectionStringProvider实现类ConnectionStringProvider

    /// <summary>
    /// 数据库连接字符串提供者
    /// </summary>
    public class ConnectionStringProvider : IConnectionStringProvider
    {
        private readonly IDictionary<string, OsharpDbContextOptions> _dbContexts;
        private readonly ISlaveDatabaseSelector[] _slaveDatabaseSelectors;
        private readonly IMasterSlaveSplitPolicy _masterSlavePolicy;

        /// <summary>
        /// 初始化一个<see cref="ConnectionStringProvider"/>类型的新实例
        /// </summary>
        public ConnectionStringProvider(IServiceProvider provider)
        {
            _dbContexts = provider.GetOSharpOptions().DbContexts;
            _masterSlavePolicy = provider.GetService<IMasterSlaveSplitPolicy>();
            _slaveDatabaseSelectors = provider.GetServices<ISlaveDatabaseSelector>().ToArray();
        }

        /// <summary>
        /// 获取指定数据上下文类型的数据库连接字符串
        /// </summary>
        /// <param name="dbContextType">数据上下文类型</param>
        /// <returns></returns>
        public virtual string GetConnectionString(Type dbContextType)
        {
            OsharpDbContextOptions dbContextOptions = _dbContexts.Values.FirstOrDefault(m => m.DbContextType == dbContextType);
            if (dbContextOptions == null)
            {
                throw new OsharpException($"数据上下文“{dbContextType}”的数据上下文配置信息不存在");
            }

            bool isSlave = _masterSlavePolicy.IsToSlaveDatabase(dbContextOptions);
            if (!isSlave)
            {
                return dbContextOptions.ConnectionString;
            }

            SlaveDatabaseOptions[] slaves = dbContextOptions.Slaves;
            ISlaveDatabaseSelector slaveDatabaseSelector = _slaveDatabaseSelectors.LastOrDefault(m => m.Name == dbContextOptions.SlaveSelectorName)
                ?? _slaveDatabaseSelectors.First(m => m.Name == "Weight");
            SlaveDatabaseOptions slave = slaveDatabaseSelector.Select(slaves);
            return slave.ConnectionString;
        }
    }

数据库主从分离策略IMasterSlaveSplitPolicy

    /// <summary>
    /// 定义数据库主从分离策略
    /// </summary>
    public interface IMasterSlaveSplitPolicy
    {
        /// <summary>
        /// 是否前往从数据库
        /// </summary>
        /// <param name="options">数据上下文选项</param>
        /// <returns></returns>
        bool IsToSlaveDatabase(OsharpDbContextOptions options);
    }

IMasterSlaveSplitPolicy实现类MasterSlaveSplitPolicy

主从分离策略,默认流程如下:

上下文配置中从数据库配置不存在

工作单元启用了事务

当前执行Function为空或者Function上没有设置走从库

走主数据库

Function设置走从库

走从数据库

作为业务功能描述的Function信息中,有一个选项配置Function.IsSlaveDatabase是否走从库

只有只读业务,并且能忍受一定的数据延迟的,才应配置为走从库

如果业务涉及新增、更新、删除操作,默认策略将忽略IsSlaveDatabase配置

    /// <summary>
    /// 定义功能信息
    /// </summary>
    public interface IFunction : IEntity<Guid>, ILockable, IEntityHash
    {
        //...

        /// <summary>
        /// 获取或设置 是否从库读取数据
        /// </summary>
        bool IsSlaveDatabase { get; set; }

        //...
    }

默认主从分离策略实现MasterSlaveSplitPolicy

    /// <summary>
    /// 主从分离策略
    /// </summary>
    internal class MasterSlaveSplitPolicy : IMasterSlaveSplitPolicy
    {
        private readonly IUnitOfWork _unitOfWork;
        private readonly ScopedDictionary _scopedDict;

        /// <summary>
        /// 初始化一个<see cref="MasterSlaveSplitPolicy"/>类型的新实例
        /// </summary>
        public MasterSlaveSplitPolicy(IServiceProvider provider)
        {
            _unitOfWork = provider.GetUnitOfWork(false);
            _scopedDict = provider.GetRequiredService<ScopedDictionary>();
        }

        /// <summary>
        /// 是否前往从数据库
        /// </summary>
        /// <param name="options">数据上下文选项</param>
        /// <returns></returns>
        public bool IsToSlaveDatabase(OsharpDbContextOptions options)
        {
            SlaveDatabaseOptions[] slaves = options.Slaves;
            if (slaves.IsNullOrEmpty())
            {
                return false;
            }

            //允许工作单元事务,走主库
            if (_unitOfWork.IsEnabledTransaction)
            {
                return false;
            }

            // 在Function显式配置走从库,才走从库
            IFunction function = _scopedDict.Function;
            if (function == null || !function.IsSlaveDatabase)
            {
                return false;
            }

            return true;
        }
    }

从数据库选择器ISlaveDatabaseSelector

    /// <summary>
    /// 定义从数据库选择功能
    /// </summary>
    [MultipleDependency]
    public interface ISlaveDatabaseSelector
    {
        /// <summary>
        /// 获取 名称
        /// </summary>
        string Name { get; }

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves);
    }
从数据库随机选择器RandomSlaveDatabaseSelector
    /// <summary>
    /// 随机从数据库选择器
    /// </summary>
    public sealed class RandomSlaveDatabaseSelector : ISlaveDatabaseSelector
    {
        private static readonly Random Random = new Random();
        private readonly ILogger _logger;

        /// <summary>
        /// 初始化一个<see cref="RandomSlaveDatabaseSelector"/>类型的新实例
        /// </summary>
        public RandomSlaveDatabaseSelector(IServiceProvider provider)
        {
            _logger = provider.GetLogger(GetType());
        }

        /// <summary>
        /// 获取 名称
        /// </summary>
        public string Name => "Random";

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
        {
            SlaveDatabaseOptions slave = Random.NextItem(slaves);
            _logger.LogDebug($"随机选取了“{slave.Name}”的从数据库");
            return slave;
        }
    }
从数据库顺序选择器SequenceSlaveDatabaseSelector
    /// <summary>
    /// 顺序轮询从数据库选择器
    /// </summary>
    public sealed class SequenceSlaveDatabaseSelector : ISlaveDatabaseSelector
    {
        private static readonly object LockObj = new object();
        private int _sequenceIndex;
        private readonly ILogger _logger;

        /// <summary>
        /// 初始化一个<see cref="SequenceSlaveDatabaseSelector"/>类型的新实例
        /// </summary>
        public SequenceSlaveDatabaseSelector(IServiceProvider provider)
        {
            _logger = provider.GetLogger(GetType());
        }

        /// <summary>
        /// 获取 名称
        /// </summary>
        public string Name => "Sequence";

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
        {
            lock (LockObj)
            {
                if (_sequenceIndex > slaves.Length - 1)
                {
                    _sequenceIndex = 0;
                }

                SlaveDatabaseOptions slave = slaves[_sequenceIndex];
                _logger.LogDebug($"顺序选取了“{slave.Name}”的从数据库,顺序号:{_sequenceIndex}");
                _sequenceIndex++;

                return slave;
            }
        }
    }
从数据库平滑加权选择器WeightSlaveDatabaseSelector
    /// <summary>
    /// 平滑权重从数据库选择器
    /// </summary>
    public sealed class WeightSlaveDatabaseSelector : ISlaveDatabaseSelector
    {
        private static readonly object LockObj = new object();
        private readonly ILogger _logger;
        private Queue<int> _queue = new Queue<int>();

        /// <summary>
        /// 初始化一个<see cref="WeightSlaveDatabaseSelector"/>类型的新实例
        /// </summary>
        public WeightSlaveDatabaseSelector(IServiceProvider provider)
        {
            _logger = provider.GetLogger(GetType());
        }

        /// <summary>
        /// 获取 名称
        /// </summary>
        public string Name => "Weight";

        /// <summary>
        /// 从所有从数据库中返回一个
        /// </summary>
        /// <param name="slaves">所有从数据库</param>
        /// <returns></returns>
        public SlaveDatabaseOptions Select(SlaveDatabaseOptions[] slaves)
        {
            lock (LockObj)
            {
                if (_queue.Count == 0)
                {
                    _queue = GetIndexes(slaves);
                }

                int index = _queue.Dequeue();
                SlaveDatabaseOptions slave = slaves[index];
                _logger.LogDebug($"平滑权重选取了“{slave.Name}”的从数据库,权重:{slave.Weight}");
                return slave;
            }
        }

        private static Queue<int> GetIndexes(SlaveDatabaseOptions[] slaves)
        {
            SlaveDatabaseOptionsWrap[] wraps = slaves.Select(m => new SlaveDatabaseOptionsWrap(m)).ToArray();
            int sum = wraps.Sum(m => m.Weight);
            Queue<int> queue = new Queue<int>();
            for (int i = 0; i < sum; i++)
            {
                int index = NextIndex(wraps);
                queue.Enqueue(index);
            }

            return queue;
        }

        private static int NextIndex(SlaveDatabaseOptionsWrap[] wraps)
        {
            int index = -1, total = 0;
            for (int i = 0; i < wraps.Length; i++)
            {
                SlaveDatabaseOptionsWrap wrap = wraps[i];
                wrap.Current += wrap.Weight;
                total += wrap.Weight;
                if (index == -1 || wraps[index].Current < wrap.Current)
                {
                    index = i;
                }
            }

            wraps[index].Current -= total;
            return index;
        }

        private class SlaveDatabaseOptionsWrap : SlaveDatabaseOptions
        {
            public SlaveDatabaseOptionsWrap(SlaveDatabaseOptions slave)
            {
                Weight = slave.Weight;
            }

            /// <summary>
            /// 获取或设置 当前权重
            /// </summary>
            public int Current { get; set; }
        }
    }