自研RPC框架之负载均衡器设计 1.负载均衡器抽象 我们的负载均衡器LoadBalancer中可能需要对不同的服务维护不同的服务地址列表,因此单独抽象出一个Selector的概念,由Selector来维护服务对应的服务地址列表信息,LoadBalancer只需要保存服务名与Selector的映射关系即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @SPI public interface LoadBalancer { InetSocketAddress selectServiceAddress (String serviceName) ; Selector selector (String serviceName) ; } public interface Selector { InetSocketAddress getNext () ; List<InetSocketAddress> getServiceList () ; void addServiceAddress (InetSocketAddress address) ; void removeServiceAddress (InetSocketAddress address) ; }
2.模板设计方法应用 我们实现了一个继承LoadBalancer的抽象负载均衡器,其中定义了负载均衡选择算法的模板流程
,并预留getSelector
抽象方法交给不同类型的负载均衡器实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public abstract class AbstractLoadBalancer implements LoadBalancer { public final Map<String, Selector> selectorMap; public AbstractLoadBalancer () { this .selectorMap = new ConcurrentHashMap <>(8 ); } @Override public InetSocketAddress selectServiceAddress (String serviceName) { Selector selector = selectorMap.get(serviceName); if (selector == null ) { List<InetSocketAddress> serviceList = new CopyOnWriteArrayList <>(OpenrpcBootStrap.configuration.getRegistry().lookup(serviceName)); selector = getSelector(serviceList, serviceName); selectorMap.put(serviceName, selector); } return selector.getNext(); } @Override public Selector selector (String serviceName) { Selector selector = selectorMap.get(serviceName); if (selector == null ) { throw new DiscoveryException ("服务正在上线或下线" ); } return selector; } protected abstract Selector getSelector (List<InetSocketAddress> serviceList, String serviceName) ; }
3.随机负载均衡策略 随机负载均衡策略很好理解,就是从服务地址列表中随机选择一个可用服务,需要使用到随机数生成器如Random、SecureRandom、ThreadLocalRandom等,其实现简单高效,不过随机策略不考虑服务器的当前负载或性能,可能会导致某些服务器过载而其他服务器处于低负载状态。而且,虽然长期来看分配可能相对均衡,但短期内可能出现不均衡现象 ,特别是在服务器性能差异大的情况下更加明显。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class RandomLoadBalancer extends AbstractLoadBalancer { private static final Logger log = LoggerFactory.getLogger(RandomLoadBalancer.class); @Override protected Selector getSelector (List<InetSocketAddress> serviceList, String serviceName) { return new RandomSelector (serviceList); } static class RandomSelector implements Selector { private List<InetSocketAddress> serviceList; public RandomSelector (List<InetSocketAddress> serviceList) { if (serviceList == null || serviceList.isEmpty()) { throw new LoadBalanceException ("未找到任何可用的服务" ); } this .serviceList = serviceList; } @Override public InetSocketAddress getNext () { int serverIndex = ThreadLocalRandom.current().nextInt(serviceList.size()); log.info("随机负载均衡选择服务节点:{}" , serviceList.get(serverIndex)); return serviceList.get(serverIndex); } @Override public List<InetSocketAddress> getServiceList () { return this .serviceList; } @Override public void addServiceAddress (InetSocketAddress address) { this .serviceList.add(address); } @Override public void removeServiceAddress (InetSocketAddress address) { this .serviceList.remove(address); } } }
4.轮询负载均衡策略 轮询负载均衡策略旨在实现请求分配的均匀性,确保所有服务器都能平等地参与处理请求,但是不考虑服务器的当前负载或性能差异,可能会导致某些服务器过载,尤其是在服务器性能不均等的情况下。而且,在需要会话持久性(Session Persistence)的应用场景中,纯粹的轮询策略可能无法保证用户的连续请求总是被发送到相同的服务器上,除非增加额外的会话跟踪机制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 public class RoundRobinLoadBalancer extends AbstractLoadBalancer { private static final Logger log = LoggerFactory.getLogger(RoundRobinLoadBalancer.class); @Override protected Selector getSelector (List<InetSocketAddress> serviceList, String serviceName) { return new RoundRobinSelector (serviceList); } static class RoundRobinSelector implements Selector { private List<InetSocketAddress> serviceList; private AtomicInteger index; public RoundRobinSelector (List<InetSocketAddress> serviceList) { this .serviceList = serviceList; this .index = new AtomicInteger (0 ); } @Override public InetSocketAddress getNext () { if (serviceList == null || serviceList.isEmpty()) { throw new LoadBalanceException ("未找到任何可用的服务" ); } int nextIndex = (index.getAndIncrement() & 0x7fffffff ) % serviceList.size(); log.info("轮询负载均衡选择服务节点:{}" , serviceList.get(nextIndex)); return serviceList.get(nextIndex); } @Override public List<InetSocketAddress> getServiceList () { return this .serviceList; } @Override public void addServiceAddress (InetSocketAddress address) { this .serviceList.add(address); } @Override public void removeServiceAddress (InetSocketAddress address) { this .serviceList.remove(address); } } }
5.一致性哈希负载均衡策略 一致性哈希负载均衡策略通过一致性哈希算法来决定如何分配请求到服务器,以达到负载均衡的目的。一致性哈希算法通过将数据和服务器都映射到同一个哈希环上,然后根据请求的哈希值来选择在环上顺时针方向遇到的第一个服务器。 通过使用虚拟节点
(即在哈希环上为每个实际服务器创建多个虚拟映射点),一致性哈希可以更均匀地分布负载
,避免某单一节点成为热点。并且,一致性哈希有助于保持数据的局部性,适合缓存和分布式存储系统,因为它减少了节点变更导致的缓存失效,从而提高了缓存效率。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 public class ConsistentHashLoadBalancer extends AbstractLoadBalancer { private static final Logger log = LoggerFactory.getLogger(ConsistentHashLoadBalancer.class); @Override protected Selector getSelector (List<InetSocketAddress> serviceList, String serviceName) { return new ConsistentHashSelector (serviceList, 128 ); } static class ConsistentHashSelector implements Selector { private SortedMap<Integer, InetSocketAddress> circle = new TreeMap <>(); private int virtualNodes = 128 ; public ConsistentHashSelector (List<InetSocketAddress> serviceList, int virtualNodes) { if (serviceList == null || serviceList.isEmpty()) { throw new LoadBalanceException ("未找到任何可用的服务" ); } this .virtualNodes = virtualNodes; for (InetSocketAddress inetSocketAddress : serviceList) { addNodeToCircle(inetSocketAddress); } } @Override public InetSocketAddress getNext () { RequestPayload payload = OpenrpcBootStrap.threadLocal.get().getRequestPayload(); String selectKey = payload.getInterfaceName() + Arrays.stream(payload.getParametersValue()); int hash = hash(selectKey.getBytes(), 0 , selectKey.length(), 0 ); if (!circle.containsKey(hash)) { SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash); hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey(); } log.info("一致性哈希负载均衡选择服务节点:{}" , circle.get(hash)); return circle.get(hash); } @Override public List<InetSocketAddress> getServiceList () { return this .circle.values().stream().toList(); } @Override public void addServiceAddress (InetSocketAddress address) { addNodeToCircle(address); } @Override public void removeServiceAddress (InetSocketAddress address) { removeNodeFromCircle(address); } private void addNodeToCircle (InetSocketAddress inetSocketAddress) { for (int i = 0 ; i < virtualNodes; i++) { String hashStr = inetSocketAddress + "-" + i; int hash = hash(hashStr.getBytes(), 0 , hashStr.length(), 0 ); log.info("插入虚拟节点:{}" , hash); circle.put(hash, inetSocketAddress); } } private void removeNodeFromCircle (InetSocketAddress inetSocketAddress) { for (int i = 0 ; i < virtualNodes; i++) { String hashStr = inetSocketAddress + "-" + i; int hash = hash(hashStr.getBytes(), 0 , hashStr.length(), 0 ); log.info("删除虚拟节点:{}" , hash); circle.remove(hash); } } private static int hash (byte [] data, int offset, int len, int seed) { final int c1 = 0xcc9e2d51 ; final int c2 = 0x1b873593 ; final int c3 = 0xe6546b64 ; final int c4 = 0x85ebca6b ; final int c5 = 0xc2b2ae35 ; int h1 = seed; int roundedEnd = offset + (len & 0xfffffffc ); for (int i = offset; i < roundedEnd; i += 4 ) { int k1 = (data[i] & 0xff ) | ((data[i + 1 ] & 0xff ) << 8 ) | ((data[i + 2 ] & 0xff ) << 16 ) | (data[i + 3 ] << 24 ); k1 *= c1; k1 = Integer.rotateLeft(k1, 15 ); k1 *= c2; h1 ^= k1; h1 = Integer.rotateLeft(h1, 13 ); h1 = h1 * 5 + c3; } int k1 = 0 ; switch (len & 0x03 ) { case 3 : k1 = (data[roundedEnd + 2 ] & 0xff ) << 16 ; case 2 : k1 |= (data[roundedEnd + 1 ] & 0xff ) << 8 ; case 1 : k1 |= (data[roundedEnd] & 0xff ); k1 *= c1; k1 = Integer.rotateLeft(k1, 15 ); k1 *= c2; h1 ^= k1; } h1 ^= len; h1 ^= (h1 >>> 16 ); h1 *= c4; h1 ^= (h1 >>> 13 ); h1 *= c5; h1 ^= (h1 >>> 16 ); return h1; } } }
另外,虚拟节点除了会提高节点的均衡度,还会提高系统的稳定性。当节点变化时,会有不同的节点共同分担系统的变化,因此稳定性更高 。比如,当某个节点被移除时,对应该节点的多个虚拟节点均会移除,而这些虚拟节点按顺时针方向的下一个虚拟节点,可能会对应不同的真实节点,即这些不同的真实节点共同分担了节点变化导致的压力。而且,有了虚拟节点后,还可以为硬件配置更好的节点增加权重,比如对权重更高的节点增加更多的虚拟机节点
即可。
因此,带虚拟节点的一致性哈希方法不仅适合硬件配置不同的节点的场景,而且适合节点规模会发生变化的场景 。
6.平滑加权轮询负载均衡策略 平滑加权轮询负载均衡策略通过考虑服务器的权重(通常反映了其处理能力或容量),这种策略能更公平地分配负载,确保高性能服务器承担更多的负载。平滑加权轮询允许动态调整服务器权重
,以适应服务器性能变化或维护状态,无需中断服务(需要配置中心配合做配置下发工作
)。
NGINX平滑的基于权重轮询算法其实很简单,原文表述如下:
Algorithm is as follows: on each peer selection we increase current_weight of each eligible peer by its weight, select peer with greatest current_weight and reduce its current_weight by total number of weight points distributed among peers.
算法执行2步,选择出1个当前节点。
每个节点,用它们的当前值加上它们自己的权重。
选择当前值最大的节点为选中节点,并把它的当前值减去所有节点的权重总和。
例如{a:5, b:1, c:1}
三个节点。一开始我们初始化三个节点的当前值为{0, 0, 0}
。 选择过程如下表:
轮数
选择前的当前权重
选择节点
选择后的当前权重
1
{5, 1, 1}
a
{-2, 1, 1}
2
{3, 2, 2}
a
{-4, 2, 2}
3
{1, 3, 3}
b
{1, -4, 3}
4
{6, -3, 4}
a
{-1, -3, 4}
5
{4, -2, 5}
c
{4, -2, -2}
6
{9, -1, -1}
a
{2, -1, -1}
7
{7, 0, 0}
a
{0, 0, 0}
我们可以发现,a, b, c选择的次数符合 5:1:1,而且权重大的不会被连续选择。7轮选择后,当前值又回到{0, 0, 0},以上操作可以一直循环,一样符合平滑和基于权重。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 public class WeightRoundRobinLoadBalancer extends AbstractLoadBalancer { @Override protected Selector getSelector (List<InetSocketAddress> serviceList, String serviceName) { return new WeightRoundRobinSelector (serviceList, serviceName); } static class WeightRoundRobinSelector implements Selector { private List<InetSocketAddress> serviceList; private List<Element> elementList; private String serviceName; public WeightRoundRobinSelector (List<InetSocketAddress> serviceList, String serviceName) { this .serviceList = serviceList; this .serviceName = serviceName; this .elementList = new ArrayList <>(); for (InetSocketAddress address : serviceList) { int weight = OpenrpcBootStrap.configuration.getRegistry().getWeight(serviceName, address); elementList.add(new Element (address, weight, 0 )); } } @Override public InetSocketAddress getNext () { if (serviceList.isEmpty() || elementList.isEmpty()) { throw new DiscoveryException ("未找到任何可用的服务节点" ); } int total = 0 ; Element result = elementList.get(0 ); for (Element element : elementList) { total += element.weight; element.currentWeight += element.weight; if (result.currentWeight < element.currentWeight) { result = element; } } result.currentWeight -= total; return result.address; } @Override public List<InetSocketAddress> getServiceList () { return this .serviceList; } @Override public void addServiceAddress (InetSocketAddress address) { serviceList.add(address); int weight = OpenrpcBootStrap.configuration.getRegistry().getWeight(serviceName, address); elementList.add(new Element (address, weight, 0 )); } @Override public void removeServiceAddress (InetSocketAddress address) { serviceList.remove(address); elementList.remove(new Element (address, 0 , 0 )); } } static class Element { InetSocketAddress address; int weight; int currentWeight; public Element (InetSocketAddress address, int weight, int currentWeight) { this .address = address; this .weight = weight; this .currentWeight = currentWeight; } @Override public boolean equals (Object o) { if (this == o) return true ; if (o == null || getClass() != o.getClass()) return false ; Element element = (Element) o; return Objects.equals(address, element.address); } @Override public int hashCode () { return Objects.hash(address); } } }