Open ayrloong opened 11 months ago
Related: https://github.com/microsoft/reverse-proxy/pull/2178 and https://github.com/microsoft/reverse-proxy/issues/393
Getting the right design and base support for weighting will unlock several other useful features as well.
How is it going?
@adityamandaleeka Hi thanks for adding this feature to the plan. I would like to submit a PR to implement this feature.
@ayrloong you can start by helping define the design. This will be a complex feature and it's better to get the design right before starting a PR. E.g. what do you expect the overall behavior to be, and what APIs and config do you think are necessary to implement that? The linked items above should help.
@ayrloong you can start by helping define the design. This will be a complex feature and it's better to get the design right before starting a PR. E.g. what do you expect the overall behavior to be, and what APIs and config do you think are necessary to implement that? The linked items above should help.
Thanks for the heads up, I'm sure some designs are needed to describe this feature.
@Tratcher Hi this is my initial design for this feature.
public sealed record WeightedClusterConfig
{
public string? ClusterId { get; init; }
public int? Weight { get; set; } ;
}
/// </summary>
public sealed record RouteConfig
{
/// <summary>
/// Gets or sets the weight clusters that requests matching this route
/// If set ClusterId there is no need to set this
/// </summary>
public IReadOnlyList<WeightedClusterConfig>? WeightedClusters { get; init; }
}
private static RouteConfig CreateRoute(IConfigurationSection section)
{
return new RouteConfig
{
///.....
WeightedClusters = CreateWeightedClusters(section.GetSection(nameof(RouteConfig.WeightedClusters)))
};
}
private static IReadOnlyList<WeightedClusterConfig>? CreateWeightedClusters (IConfigurationSection section)
{
if (!section.Exists())
{
return null;
}
return section.GetChildren().Select(CreateWeightedCluster).ToArray();
}
private static WeightedClusterConfig CreateWeightedCluster(IConfigurationSection section)
{
return new WeightedClusterConfig()
{
ClusterId = section[nameof(WeightedClusterConfig.ClusterId)]!,
Weight = section.ReadInt32(nameof(WeightedClusterConfig.Weight))
};
}
internal static class RoutingHelper
{
public static T SelectByWeight<T>(this IEnumerable<T> endpoints, Func<T, double> weightProvider, Randomizer randomizer)
{
var accumulatedProbability = 0d;
var weightSum = endpoints.Sum(weightProvider);
var randomPercentageValue = randomizer.NextDouble(weightSum);
foreach (var endpoint in endpoints)
{
var weight = weightProvider(endpoint);
if (randomPercentageValue <= weight + accumulatedProbability)
{
return endpoint;
}
accumulatedProbability += weight;
}
throw new InvalidOperationException(
$"The item cannot be selected because the weights are not correctly calculated.");
}
}
internal class Randomizer
{
#if NET6_0_OR_GREATER
public virtual double NextDouble(double maxValue) => Random.Shared.NextDouble() * maxValue;
public virtual int NextInt(int maxValue) => Random.Shared.Next(maxValue);
#else
private static readonly System.Threading.ThreadLocal<Random> _randomInstance = new(() => new Random());
i
public virtual double NextDouble(double maxValue) => _randomInstance.Value!.NextDouble() * maxValue;
public virtual int NextInt(int maxValue) => _randomInstance.Value!.Next(maxValue);
#endif
}
public class CanaryMiddleware(RequestDelegate next, IProxyStateLookup lookup, IRequestClusterPolicy clusterPolicy)
{
public Task InvokeAsync(HttpContext context)
{
var proxyFeature = context.GetReverseProxyFeature();
var weightedClusters = proxyFeature.Route.Config.WeightedClusters;
if (weightedClusters is null) return next(context);
var weightedCluster = clusterPolicy.PickCluster(context, weightedClusters);
if (lookup.TryGetCluster(weightedCluster?.ClusterId, out var cluster))
{
context.ReassignProxyRequest(cluster);
}
return next(context);
}
public interface IRequestClusterPolicy
{
WeightedClusterConfig? PickCluster(HttpContext context, IEnumerable<WeightedClusterConfig> clusters);
}
internal class WeightedClusterPolicy(Randomizer randomizer) : IRequestClusterPolicy
{
public WeightedClusterConfig? PickCluster(HttpContext context, IEnumerable<WeightedClusterConfig> clusters)
{
return clusters.SelectByWeight(g => g.Weight ?? 0, randomizer);
}
}
Interesting approach. I'm a bit surprised the weights are at the route/cluster level, I expected them to be at the destination level and feed into the load balancing algorithms. That said, maybe we should consider both as separate features. Can you break down the tradeoffs of both?
How about session affinity, making sure the same client returns to the same cluster? Or is that a detail of IRequestClusterPolicy?
No need to show straight forward implementation details like ConfigurationConfigProvider. Schemas, APIs, and behavior are the interesting points at this stage.
It might make sense to combine the CanaryMiddleware with ProxyPipelineInitializerMiddleware https://github.com/microsoft/reverse-proxy/blob/4b296825f49e0e643e49c9808ec96a49932b0f6f/src/ReverseProxy/Model/ProxyPipelineInitializerMiddleware.cs#L49-L59
I think destination weight policy may be added in the future, so I separated routing/cluster.
public class WeightedLoadBalancingPolicy : ILoadBalancingPolicy
{
public string Name { get; }
public DestinationState? PickDestination(HttpContext context, ClusterState cluster, IReadOnlyList<DestinationState> availableDestinations)
{
// return selected destination
throw new NotImplementedException();
}
}
Regarding cluster session affinity, I initially ignored this feature but after your reminder, I re-examined this feature.
Combining CanaryMiddleware with ProxyPipelineInitializerMiddleware is a very good choice, which can reduce unnecessary code
This is the combined code
ClusterState cluster = null;
var weightedClusters = route.Config.WeightedClusters;
if (weightedClusters is not null && weightedClusters.Count != 0)
{
var weightedCluster = clusterPolicy.PickCluster(context, weightedClusters);
cluster = new ClusterState(weightedCluster.ClusterId);
}
else
{
cluster = route.Cluster;
}
Hi @Tratcher , we implemented wrr (Weight Round Robin). We did it in a very simple way, and this code is being serviced for 200,000 qps projects, it need to be available 24*7 with no downtime.
appsettings.json, add Weight in destination.
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Information"
}
},
"AllowedHosts": "*",
"ReverseProxy": {
"Routes": {
"route1" : {
"ClusterId": "cluster1",
"Match": {
"Path": "{**catch-all}"
}
}
},
"Clusters": {
"cluster1": {
"Destinations": {
"destination1": {
"Address": "https://example.com/",
"Weight": 100
}
}
}
}
}
}
WeightRoundRobinLoadBalancingPolicy
public class WeightRoundRobinLoadBalancingPolicy: ILoadBalancingPolicy
{
private ILogger<WeightRoundRobinLoadBalancingPolicy> _logger;
public string Name => "WeightRoundRobin";
public WeightRoundRobinLoadBalancingPolicy(ILogger<WeightRoundRobinLoadBalancingPolicy> logger)
{
_logger = logger;
}
public DestinationState? PickDestination(HttpContext context, ClusterState cluster, IReadOnlyList<DestinationState> availableDestinations)
{
if (Weighting.WeightedClusterWeights.TryGetValue(cluster.ClusterId, out var weightedWeights))
{
if (weightedWeights is null)
{
_logger.LogInformation($"PickDestination Error: Can not get [{cluster.ClusterId}] cluster weightedWeights");
return null;
}
if (weightedWeights.DestinationIds is null)
{
_logger.LogInformation($"PickDestination Error: Can not get [{cluster.ClusterId}] destination, DestinationIds is null");
return null;
}
var destinationId = weightedWeights.DestinationIds[WeightingHelper.GetIndexByRandomWeight(weightedWeights.DestinationWeightedWeights, weightedWeights.DestinationWeights, weightedWeights.TotalWeights ?? 1D)];
return availableDestinations.FirstOrDefault(destination => destination.DestinationId == destinationId);
}
_logger.LogInformation($"PickDestination Error: Can not get [{cluster.ClusterId}] cluster");
return null;
}
}
WeightConfigFilter
public class WeightConfigFilter : IProxyConfigFilter
{
private ILogger<WeightConfigFilter> _logger;
public WeightConfigFilter(ILogger<WeightConfigFilter> logger)
{
_logger = logger;
}
public ValueTask<ClusterConfig> ConfigureClusterAsync(ClusterConfig cluster, CancellationToken cancel)
{
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Started");
try
{
var weights = cluster.Destinations?.ToDictionary(destination => destination.Key, destination =>
{
if (destination.Value.Metadata?.TryGetValue("Weight", out var weight) ?? false)
return double.Parse(weight) / 100D;
else
return 1D;
});
List<string> destinationIds = new();
List<double> destinationWeights = new();
WeightedWeight? weightedWeight = null;
if (weights is not null)
{
foreach (var weight in weights)
{
destinationIds.Add(weight.Key);
destinationWeights.Add(weight.Value);
}
var weightedWeights = WeightingHelper.GetWeightedWeights(destinationWeights.ToArray());
weightedWeight = new()
{
DestinationIds = destinationIds.ToArray(),
DestinationWeights = destinationWeights.ToArray(),
DestinationWeightedWeights = weightedWeights.Weights,
TotalWeights = weightedWeights.TotalWeight
};
}
if (Weighting.ClusterWeights.ContainsKey(cluster.ClusterId))
{
Weighting.ClusterWeights[cluster.ClusterId] = weights;
Weighting.WeightedClusterWeights[cluster.ClusterId] = weightedWeight;
}
else
{
Weighting.ClusterWeights.Add(cluster.ClusterId, weights);
Weighting.WeightedClusterWeights.Add(cluster.ClusterId, weightedWeight);
}
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Set, clusterId: {cluster.ClusterId}, {JsonSerializer.Serialize(Weighting.WeightedClusterWeights[cluster.ClusterId])}");
}
catch (Exception ex)
{
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Error:{ex}");
}
_logger.LogInformation($"[{DateTime.Now}]:{nameof(WeightConfigFilter)}.{nameof(ConfigureClusterAsync)} Finished");
return new ValueTask<ClusterConfig>(cluster);
}
public ValueTask<RouteConfig> ConfigureRouteAsync(RouteConfig route, ClusterConfig? cluster, CancellationToken cancel)
{
return new ValueTask<RouteConfig>(route);
}
}
WeightingHelper
public class WeightingHelper
{
public static (double[]? Weights, double? TotalWeight) GetWeightedWeights(double[] weights)
{
if (weights.Length == 0) return (null, null);
else if (weights.Length == 1) return ([.. weights], weights[0]);
var totalWeight = 0D;
Span<double> newWeights = stackalloc double[weights.Length];
for (int i = 0; i < weights.Length; i++)
{
totalWeight += weights[i];
newWeights[i] = totalWeight;
}
return ([.. newWeights], totalWeight);
}
public static int GetIndexByRandomWeight(Span<double> weightedWeights, Span<double> weights, double totalWeight)
{
// Ignore weight when only one server
if (weightedWeights.Length == 1) return 0;
var randomWeight = Random.Shared.NextDouble() * totalWeight;
var index = weightedWeights.BinarySearch(randomWeight);
if (index < 0)
index = -index - 1;
else if (index > weightedWeights.Length)
// The number of servers decreases
index = GetIndexByRandomWeight(weightedWeights, weights, totalWeight);
if (weights[index] != 0D)
return index;
else
// The weight of the server is 0
return GetIndexByRandomWeight(weightedWeights, weights, totalWeight);
}
}
WeightedWeight
public class WeightedWeight
{
public string[]? DestinationIds { get; set; }
public double[]? DestinationWeights { get; set; }
public double[]? DestinationWeightedWeights { get; set; }
public double? TotalWeights { get; set; }
}
BTW, we provide some new APIs for dynamically updating configurations.
Finally, if this is the wrr you want, I can pr.
Many reverse proxy tools have built-in weight clustering capabilities. Although YARP provides extensions, I think this function is more commonly used, so I want to add this function to Yarp.