(更新时间)2021年5月30日 商城高并发秒杀系统(.NET Core版) 07-负载均衡组件的封装

愚公搬代码 发表于 2021/10/20 00:04:13 2021/10/20
【摘要】 一:负载均衡组件的封装 /// <summary> /// 负载均衡ServiceCollection扩展 /// </summary> public static class ...


/// <summary>
/// 负载均衡ServiceCollection扩展
/// </summary>
public static class LoadBalanceServiceCollectionExtensions
    /// <summary>
    /// 注册负载均衡
    /// </summary>
    /// <param name="services"></param>
    /// <returns></returns>
    public static IServiceCollection AddLoadBalance(this IServiceCollection services)
        AddLoadBalance(services, options => { });
        return services;

    /// <summary>
    /// 注册负载均衡
    /// </summary>
    /// <param name="services"></param>
    /// <returns></returns>
    public static IServiceCollection AddLoadBalance(this IServiceCollection services, Action<LoadBalanceOptions> options)

        // 1、注册到IOC容器
        services.AddSingleton<ILoadBalance, RandomLoadBalance>();

        return services;



/// <summary>
/// 负载均衡选项
/// </summary>
public class LoadBalanceOptions
    public LoadBalanceOptions()
        this.Type = "Random";

    /// <summary>
    /// 负载均衡类型
    /// </summary>
    public string Type { set; get; }



/// <summary>
/// 负载均衡抽象实现
/// </summary>
public abstract class AbstractLoadBalance : ILoadBalance
    static int CalculateWarmupWeight(int uptime, int warmup, int weight)
        int ww = (int)((float)uptime / ((float)warmup / (float)weight));
        return ww < 1 ? 1 : (ww > weight ? weight : ww);

    public ServiceNode Select(IList<ServiceNode> serviceUrls)
        if (serviceUrls == null || serviceUrls.Count == 0)
            return null;
        if (serviceUrls.Count == 1)
            return serviceUrls[0];
        return DoSelect(serviceUrls);

    /// <summary>
    /// 子类去实现
    /// </summary>
    /// <param name="serviceUrls"></param>
    /// <returns></returns>
    public abstract ServiceNode DoSelect(IList<ServiceNode> serviceUrls);

    /// <summary>
    /// 获取权重
    /// </summary>
    /// <returns></returns>
    protected int GetWeight()
        int weight = 100;
        if (weight > 0)
            long timestamp = 0L;
            if (timestamp > 0L)
                int uptime = (int)(DateTime.Now.ToFileTimeUtc() - timestamp);
                int warmup = 10 * 60 * 1000;
                if (uptime > 0 && uptime < warmup)
                    weight = CalculateWarmupWeight(uptime, warmup, weight);
        return weight;

/// <summary>
/// Hash一致性算法
/// </summary>
public class ConsistentHashLoadBalance : AbstractLoadBalance
    private ConcurrentDictionary<string, ConsistentHashSelector> selectors = new ConcurrentDictionary<string, ConsistentHashSelector>();

    public override ServiceNode DoSelect(IList<ServiceNode> serviceUrls)
        string key = serviceUrls[0].Url;
        int identityHashCode = serviceUrls.GetHashCode();
        ConsistentHashSelector selector = (ConsistentHashSelector)selectors[key];
        if (selector == null || selector.GetHashCode() != identityHashCode)
            selectors.TryAdd(key, new ConsistentHashSelector(serviceUrls, "", identityHashCode));
            selector = (ConsistentHashSelector)selectors[key];
        return selector.Select(key);

    private class ConsistentHashSelector

        private SortedDictionary<long, ServiceNode> virtualServiceUrls;

        private int replicaNumber;

        private int identityHashCode;

        private int[] argumentIndex;
        private IList<ServiceNode> serviceUrls;
        private string v;

        public ConsistentHashSelector(IList<ServiceNode> serviceUrls, string methodName, int identityHashCode)
            this.virtualServiceUrls = new SortedDictionary<long, ServiceNode>();
            this.identityHashCode = identityHashCode;
            string url = serviceUrls[0].Url;
            this.replicaNumber = 160;// 默认多少个虚拟节点
            string[] index = new string[] { };
            argumentIndex = new int[index.Length];
            for (int i = 0; i < index.Length; i++)
                argumentIndex[i] = int.Parse(index[i]);
            foreach (ServiceNode serviceUrl in serviceUrls)
                string address = serviceUrl.Url;
                for (int i = 0; i < replicaNumber / 4; i++)
                    byte[] digest = md5(address + i);
                    for (int h = 0; h < 4; h++)
                        long m = hash(digest, h);
                        virtualServiceUrls.Add(m, serviceUrl);

        public ServiceNode Select(string url)
            string key = url;
            byte[] digest = md5(key);
            return selectForKey(hash(digest, 0));

        private string toKey(Object[] args)
            StringBuilder buf = new StringBuilder();
            foreach (int i in argumentIndex)
                if (i >= 0 && i < args.Length)
            return buf.ToString();

        private ServiceNode selectForKey(long hash)
            KeyValuePair<long, ServiceNode> entry = virtualServiceUrls.GetEnumerator().Current;
            if ("null".Equals(entry) || "".Equals(entry))
                entry = virtualServiceUrls.GetEnumerator().Current;
            return entry.Value;

        private long hash(byte[] digest, int number)
            return (((long)(digest[3 + (number * 4)] & 0xFF) << 24)
                    | ((long)(digest[2 + number * 4] & 0xFF) << 16)
                    | ((long)(digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;

        private byte[] md5(string value)
            var hashed = EncryptProvider.Md5(value);
            byte[] bytes = Encoding.UTF8.GetBytes(hashed);
            return bytes;


/// <summary>
/// 服务负载均衡
/// </summary>
public interface ILoadBalance
    /// <summary>
    /// 服务选择
    /// </summary>
    /// <param name="serviceUrls"></param>
    /// <returns></returns>
    ServiceNode Select(IList<ServiceNode> serviceUrls);

/// <summary>
/// 最少活跃数算法
/// 核心在于记住每个url的状态
/// </summary>
class LeastActiveLoadBalance : AbstractLoadBalance
    private readonly Random random = new Random();

    public override ServiceUrl DoSelect(IList<ServiceUrl> serviceUrls)
        int length = serviceUrls.Count; // Number of invokers
        int leastActive = -1; // The least active value of all invokers
        int leastCount = 0; // The number of invokers having the same least active value (leastActive)
        int[] leastIndexs = new int[length]; // The index of invokers having the same least active value (leastActive)
        int totalWeight = 0; // The sum of with warmup weights
        int firstWeight = 0; // Initial value, used for comparision
        bool sameWeight = true; // Every invoker has the same weight value?
        for (int i = 0; i < length; i++)
            ServiceUrl serviceUrl = serviceUrls[i];
            int active = 10; // Active number(活跃数,是状态模式)
            int afterWarmup = GetWeight(); // Weight
            if (leastActive == -1 || active < leastActive)
            { // Restart, when find a invoker having smaller least active value.
                leastActive = active; // Record the current least active value
                leastCount = 1; // Reset leastCount, count again based on current leastCount
                leastIndexs[0] = i; // Reset
                totalWeight = afterWarmup; // Reset
                firstWeight = afterWarmup; // Record the weight the first invoker
                sameWeight = true; // Reset, every invoker has the same weight value?
            else if (active == leastActive)
            { // If current invoker's active value equals with leaseActive, then accumulating.
                leastIndexs[leastCount++] = i; // Record index number of this invoker
                totalWeight += afterWarmup; // Add this invoker's weight to totalWeight.
                                            // If every invoker has the same weight?
                if (sameWeight && i > 0
                        && afterWarmup != firstWeight)
                    sameWeight = false;
        // assert(leastCount > 0)
        if (leastCount == 1)
            // If we got exactly one invoker having the least active value, return this invoker directly.
            return serviceUrls[leastIndexs[0]];
        if (!sameWeight && totalWeight > 0)
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offsetWeight = random.Next(totalWeight) + 1;
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++)
                int leastIndex = leastIndexs[i];
                offsetWeight -= GetWeight();
                if (offsetWeight <= 0)
                    return serviceUrls[leastIndex];
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return serviceUrls[leastIndexs[random.Next(leastCount)]];

/// <summary>
/// 加权随机算法
/// </summary>
public class RandomLoadBalance : AbstractLoadBalance
    private readonly Random random = new Random();

    public override ServiceNode DoSelect(IList<ServiceNode> serviceUrls)
        int length = serviceUrls.Count; // Number of serviceUrls
        int totalWeight = 0; // The sum of weights
        bool sameWeight = true; // Every serviceUrls has the same weight?
        for (int i = 0; i < length; i++)
            int weight = GetWeight();
            totalWeight += weight; // Sum
            if (sameWeight && i > 0
                    && weight != GetWeight())
                sameWeight = false;
        if (totalWeight > 0 && !sameWeight)
            // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
            int offset = random.Next(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++)
                offset -= GetWeight();
                if (offset < 0)
                    return serviceUrls[i];
        // If all invokers have the same weight value or totalWeight=0, return evenly.
        return serviceUrls[random.Next(length)];

/// <summary>
/// 轮询算法
/// </summary>
public class RoundRobinLoadBalance : AbstractLoadBalance
    private static int RECYCLE_PERIOD = 60000;

    public class WeightedRoundRobin
        private static int weight;
        private static long current = 0;
        private static long lastUpdate;
        public static int GetWeight()
            return weight;
        public static void SetWeight(int weight)
            WeightedRoundRobin.weight = weight;
            current = Interlocked.Add(ref current, 0);
        public static long IncreaseCurrent()
            return Interlocked.Add(ref current, weight);
        public static void Sel(int total)
            Interlocked.Add(ref current, -1 * total);
        public static long GetLastUpdate()
            return lastUpdate;
        public static void setLastUpdate(long lastUpdate)
            WeightedRoundRobin.lastUpdate = lastUpdate;

    private ConcurrentDictionary<string, ConcurrentDictionary<string, WeightedRoundRobin>> methodWeightMap = new ConcurrentDictionary<string, ConcurrentDictionary<string, WeightedRoundRobin>>();
    private AtomicBoolean updateLock = new AtomicBoolean();
     * get invoker addr list cached for specified invocation
     * <p>
     * <b>for unit test only</b>
     * @param invokers
     * @param invocation
     * @return
    protected ICollection<string> getInvokerAddrList(IList<ServiceNode> serviceUrls)
        string key = serviceUrls[0].Url;
        ConcurrentDictionary<string, WeightedRoundRobin> map = methodWeightMap[key];
        if (map != null)
            return map.Keys;
        return null;
    public override ServiceNode DoSelect(IList<ServiceNode> serviceUrls)
        string key = serviceUrls[0].Url;
        ConcurrentDictionary<string, WeightedRoundRobin> map = methodWeightMap[key];
        if (map == null)
            methodWeightMap.TryAdd(key, new ConcurrentDictionary<string, WeightedRoundRobin>());
            map = methodWeightMap[key];
        int totalWeight = 0;
        long maxCurrent = long.MaxValue;
        long now = DateTime.Now.ToFileTimeUtc();
        ServiceNode serviceUrl = null;
        WeightedRoundRobin selectedWRR = null;
        foreach (ServiceNode url in serviceUrls)
            string identifyString = url.Url;
            WeightedRoundRobin weightedRoundRobin = map[identifyString];
            int weight = GetWeight();
            if (weight < 0)
                weight = 0;
            if (weightedRoundRobin == null)
                weightedRoundRobin = new WeightedRoundRobin();
                map.TryAdd(identifyString, weightedRoundRobin);
                weightedRoundRobin = map[identifyString];
            if (weight != weightedRoundRobin.GetWeight())
                //weight changed
            long cur = weightedRoundRobin.IncreaseCurrent();
            if (cur > maxCurrent)
                maxCurrent = cur;
                serviceUrl = url;
                selectedWRR = weightedRoundRobin;
            totalWeight += weight;
        if (!updateLock.get() && serviceUrls.Count != map.Count)
            if (updateLock.compareAndSet(false, true))
                    // copy -> modify -> update reference
                    ConcurrentDictionary<String, WeightedRoundRobin> newMap = new ConcurrentDictionary<String, WeightedRoundRobin>();
                   // newMap.TryUpdate(map);
                    IEnumerator<string> it = newMap.Keys.GetEnumerator();
                    while (it.MoveNext())
                        if (now - newMap[it.Current].GetLastUpdate() > RECYCLE_PERIOD)
                    methodWeightMap.TryAdd(key, newMap);
        if (serviceUrl != null)
            return serviceUrl;
        // should not happen here
        return serviceUrls[0];


