自研RPC框架之负载均衡器设计

自研RPC框架之负载均衡器设计

1.负载均衡器抽象

我们的负载均衡器LoadBalancer中可能需要对不同的服务维护不同的服务地址列表,因此单独抽象出一个Selector的概念,由Selector来维护服务对应的服务地址列表信息,LoadBalancer只需要保存服务名与Selector的映射关系即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@SPI
public interface LoadBalancer {
// 根据服务名获取可用服务地址
InetSocketAddress selectServiceAddress(String serviceName);
// 根据服务名获取对应的selector选择器
Selector selector(String serviceName);
}

public interface Selector {
// 负载均衡策略选择的下一个可用服务地址
InetSocketAddress getNext();
// 获取selector中维护的服务地址列表缓存
List<InetSocketAddress> getServiceList();
// 新增上线服务地址
void addServiceAddress(InetSocketAddress address);
// 删除下线服务地址
void removeServiceAddress(InetSocketAddress address);
}

2.模板设计方法应用

我们实现了一个继承LoadBalancer的抽象负载均衡器,其中定义了负载均衡选择算法的模板流程,并预留getSelector抽象方法交给不同类型的负载均衡器实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public abstract class AbstractLoadBalancer implements LoadBalancer{
// 每个服务对应的负载均衡选择器
public final Map<String, Selector> selectorMap;

public AbstractLoadBalancer() {
this.selectorMap = new ConcurrentHashMap<>(8);
}

@Override
public InetSocketAddress selectServiceAddress(String serviceName) {
Selector selector = selectorMap.get(serviceName);
if (selector == null) {
List<InetSocketAddress> serviceList = new CopyOnWriteArrayList<>(OpenrpcBootStrap.configuration.getRegistry().lookup(serviceName));
selector = getSelector(serviceList, serviceName);
selectorMap.put(serviceName, selector);
}
return selector.getNext();
}

@Override
public Selector selector(String serviceName) {
Selector selector = selectorMap.get(serviceName);
if (selector == null) {
throw new DiscoveryException("服务正在上线或下线");
}
return selector;
}

protected abstract Selector getSelector(List<InetSocketAddress> serviceList, String serviceName);
}

3.随机负载均衡策略

随机负载均衡策略很好理解,就是从服务地址列表中随机选择一个可用服务,需要使用到随机数生成器如Random、SecureRandom、ThreadLocalRandom等,其实现简单高效,不过随机策略不考虑服务器的当前负载或性能,可能会导致某些服务器过载而其他服务器处于低负载状态。而且,虽然长期来看分配可能相对均衡,但短期内可能出现不均衡现象,特别是在服务器性能差异大的情况下更加明显。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class RandomLoadBalancer extends AbstractLoadBalancer {
private static final Logger log = LoggerFactory.getLogger(RandomLoadBalancer.class);

@Override
protected Selector getSelector(List<InetSocketAddress> serviceList, String serviceName) {
return new RandomSelector(serviceList);
}

static class RandomSelector implements Selector {
private List<InetSocketAddress> serviceList;

public RandomSelector(List<InetSocketAddress> serviceList) {
if (serviceList == null || serviceList.isEmpty()) {
throw new LoadBalanceException("未找到任何可用的服务");
}
this.serviceList = serviceList;
}

@Override
public InetSocketAddress getNext() {
int serverIndex = ThreadLocalRandom.current().nextInt(serviceList.size());
log.info("随机负载均衡选择服务节点:{}", serviceList.get(serverIndex));
return serviceList.get(serverIndex);
}

@Override
public List<InetSocketAddress> getServiceList() {
return this.serviceList;
}

@Override
public void addServiceAddress(InetSocketAddress address) {
this.serviceList.add(address);
}

@Override
public void removeServiceAddress(InetSocketAddress address) {
this.serviceList.remove(address);
}
}
}

4.轮询负载均衡策略

轮询负载均衡策略旨在实现请求分配的均匀性,确保所有服务器都能平等地参与处理请求,但是不考虑服务器的当前负载或性能差异,可能会导致某些服务器过载,尤其是在服务器性能不均等的情况下。而且,在需要会话持久性(Session Persistence)的应用场景中,纯粹的轮询策略可能无法保证用户的连续请求总是被发送到相同的服务器上,除非增加额外的会话跟踪机制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class RoundRobinLoadBalancer extends AbstractLoadBalancer {
private static final Logger log = LoggerFactory.getLogger(RoundRobinLoadBalancer.class);

@Override
protected Selector getSelector(List<InetSocketAddress> serviceList, String serviceName) {
return new RoundRobinSelector(serviceList);
}

static class RoundRobinSelector implements Selector {
private List<InetSocketAddress> serviceList;
private AtomicInteger index;

public RoundRobinSelector(List<InetSocketAddress> serviceList) {
this.serviceList = serviceList;
this.index = new AtomicInteger(0);
}

@Override
public InetSocketAddress getNext() {
if (serviceList == null || serviceList.isEmpty()) {
throw new LoadBalanceException("未找到任何可用的服务");
}
// (1)仅仅在整型溢出时出现几次的不严格轮询,对于足够大的整型范围这个影响几乎可以忽略不计
// int nextIndex = Math.abs(index.getAndIncrement() % serviceList.size());
// (2)或者保证形成环形循环
int nextIndex = (index.getAndIncrement() & 0x7fffffff) % serviceList.size();
log.info("轮询负载均衡选择服务节点:{}", serviceList.get(nextIndex));
return serviceList.get(nextIndex);
}

@Override
public List<InetSocketAddress> getServiceList() {
return this.serviceList;
}

@Override
public void addServiceAddress(InetSocketAddress address) {
this.serviceList.add(address);
}

@Override
public void removeServiceAddress(InetSocketAddress address) {
this.serviceList.remove(address);
}
}
}

5.一致性哈希负载均衡策略

一致性哈希负载均衡策略通过一致性哈希算法来决定如何分配请求到服务器,以达到负载均衡的目的。一致性哈希算法通过将数据和服务器都映射到同一个哈希环上,然后根据请求的哈希值来选择在环上顺时针方向遇到的第一个服务器。通过使用虚拟节点(即在哈希环上为每个实际服务器创建多个虚拟映射点),一致性哈希可以更均匀地分布负载,避免某单一节点成为热点。并且,一致性哈希有助于保持数据的局部性,适合缓存和分布式存储系统,因为它减少了节点变更导致的缓存失效,从而提高了缓存效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
public class ConsistentHashLoadBalancer extends AbstractLoadBalancer {
private static final Logger log = LoggerFactory.getLogger(ConsistentHashLoadBalancer.class);

@Override
protected Selector getSelector(List<InetSocketAddress> serviceList, String serviceName) {
return new ConsistentHashSelector(serviceList, 128);
}

static class ConsistentHashSelector implements Selector {
// 哈希环存储服务节点
private SortedMap<Integer, InetSocketAddress> circle = new TreeMap<>();
// 虚拟节点的个数
private int virtualNodes = 128;

public ConsistentHashSelector(List<InetSocketAddress> serviceList, int virtualNodes) {
if (serviceList == null || serviceList.isEmpty()) {
throw new LoadBalanceException("未找到任何可用的服务");
}
this.virtualNodes = virtualNodes;
// 每个服务节点转化为若干个虚拟节点挂载到哈希环
for (InetSocketAddress inetSocketAddress : serviceList) {
addNodeToCircle(inetSocketAddress);
}
}

@Override
public InetSocketAddress getNext() {
// 从本地线程变量获取请求对象
RequestPayload payload = OpenrpcBootStrap.threadLocal.get().getRequestPayload();
// 根据服务名+参数列表进行一致性哈希负载均衡
String selectKey = payload.getInterfaceName() + Arrays.stream(payload.getParametersValue());
int hash = hash(selectKey.getBytes(), 0, selectKey.length(), 0);
if (!circle.containsKey(hash)) {
// 顺时针寻找第一个大于请求hash的虚拟节点,如果不存在则选择哈希环中第一个即最小虚拟节点
SortedMap<Integer, InetSocketAddress> tailMap = circle.tailMap(hash);
hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
}
log.info("一致性哈希负载均衡选择服务节点:{}", circle.get(hash));
return circle.get(hash);
}

@Override
public List<InetSocketAddress> getServiceList() {
return this.circle.values().stream().toList();
}

@Override
public void addServiceAddress(InetSocketAddress address) {
addNodeToCircle(address);
}

@Override
public void removeServiceAddress(InetSocketAddress address) {
removeNodeFromCircle(address);
}

private void addNodeToCircle(InetSocketAddress inetSocketAddress) {
// 服务节点泛化为多个虚拟节点,避免节点过少时节点哈希分布不均匀
for (int i = 0; i < virtualNodes; i++) {
String hashStr = inetSocketAddress + "-" + i;
int hash = hash(hashStr.getBytes(), 0, hashStr.length(), 0);
log.info("插入虚拟节点:{}", hash);
circle.put(hash, inetSocketAddress);
}
}

private void removeNodeFromCircle(InetSocketAddress inetSocketAddress) {
for (int i = 0; i < virtualNodes; i++) {
String hashStr = inetSocketAddress + "-" + i;
int hash = hash(hashStr.getBytes(), 0, hashStr.length(), 0);
log.info("删除虚拟节点:{}", hash);
circle.remove(hash);
}
}

// 不要使用String的原生哈希算法,因为其根据底层字节数组进行逐位运算,连续的字符串的哈希值很容易特别接近
// MurmurHash是一种非加密型哈希函数,以高性能和良好的分散性闻名。它可以生成32位或64位的哈希值,适用于一致性哈希,因为它在不同的输入下产生的哈希值分布非常均匀
private static int hash(byte[] data, int offset, int len, int seed) {

final int c1 = 0xcc9e2d51;
final int c2 = 0x1b873593;
final int c3 = 0xe6546b64;
final int c4 = 0x85ebca6b;
final int c5 = 0xc2b2ae35;

int h1 = seed;
int roundedEnd = offset + (len & 0xfffffffc); // round down to 4 byte block

for (int i = offset; i < roundedEnd; i += 4) {
// little endian load order
int k1 = (data[i] & 0xff) | ((data[i + 1] & 0xff) << 8) | ((data[i + 2] & 0xff) << 16) | (data[i + 3] << 24);
k1 *= c1;
k1 = Integer.rotateLeft(k1, 15);
k1 *= c2;

h1 ^= k1;
h1 = Integer.rotateLeft(h1, 13);
h1 = h1 * 5 + c3;
}

// handle any remaining bytes
int k1 = 0;
switch (len & 0x03) {
case 3:
k1 = (data[roundedEnd + 2] & 0xff) << 16;
// fall through
case 2:
k1 |= (data[roundedEnd + 1] & 0xff) << 8;
// fall through
case 1:
k1 |= (data[roundedEnd] & 0xff);
k1 *= c1;
k1 = Integer.rotateLeft(k1, 15);
k1 *= c2;
h1 ^= k1;
}

// finalization
h1 ^= len;
h1 ^= (h1 >>> 16);
h1 *= c4;
h1 ^= (h1 >>> 13);
h1 *= c5;
h1 ^= (h1 >>> 16);

return h1;
}

}
}

另外,虚拟节点除了会提高节点的均衡度,还会提高系统的稳定性。当节点变化时,会有不同的节点共同分担系统的变化,因此稳定性更高。比如,当某个节点被移除时,对应该节点的多个虚拟节点均会移除,而这些虚拟节点按顺时针方向的下一个虚拟节点,可能会对应不同的真实节点,即这些不同的真实节点共同分担了节点变化导致的压力。而且,有了虚拟节点后,还可以为硬件配置更好的节点增加权重,比如对权重更高的节点增加更多的虚拟机节点即可。

因此,带虚拟节点的一致性哈希方法不仅适合硬件配置不同的节点的场景,而且适合节点规模会发生变化的场景

6.平滑加权轮询负载均衡策略

平滑加权轮询负载均衡策略通过考虑服务器的权重(通常反映了其处理能力或容量),这种策略能更公平地分配负载,确保高性能服务器承担更多的负载。平滑加权轮询允许动态调整服务器权重,以适应服务器性能变化或维护状态,无需中断服务(需要配置中心配合做配置下发工作)。

NGINX平滑的基于权重轮询算法其实很简单,原文表述如下:

Algorithm is as follows: on each peer selection we increase current_weight of each eligible peer by its weight, select peer with greatest current_weight and reduce its current_weight by total number of weight points distributed among peers.

算法执行2步,选择出1个当前节点。

  1. 每个节点,用它们的当前值加上它们自己的权重。
  2. 选择当前值最大的节点为选中节点,并把它的当前值减去所有节点的权重总和。

例如{a:5, b:1, c:1}三个节点。一开始我们初始化三个节点的当前值为{0, 0, 0}。 选择过程如下表:

轮数 选择前的当前权重 选择节点 选择后的当前权重
1 {5, 1, 1} a {-2, 1, 1}
2 {3, 2, 2} a {-4, 2, 2}
3 {1, 3, 3} b {1, -4, 3}
4 {6, -3, 4} a {-1, -3, 4}
5 {4, -2, 5} c {4, -2, -2}
6 {9, -1, -1} a {2, -1, -1}
7 {7, 0, 0} a {0, 0, 0}

我们可以发现,a, b, c选择的次数符合 5:1:1,而且权重大的不会被连续选择。7轮选择后,当前值又回到{0, 0, 0},以上操作可以一直循环,一样符合平滑和基于权重。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
public class WeightRoundRobinLoadBalancer extends AbstractLoadBalancer {
@Override
protected Selector getSelector(List<InetSocketAddress> serviceList, String serviceName) {
return new WeightRoundRobinSelector(serviceList, serviceName);
}

static class WeightRoundRobinSelector implements Selector {
private List<InetSocketAddress> serviceList;
private List<Element> elementList;
private String serviceName;
public WeightRoundRobinSelector(List<InetSocketAddress> serviceList, String serviceName) {
this.serviceList = serviceList;
this.serviceName = serviceName;
this.elementList = new ArrayList<>();
// 通过注册中心拉取节点的权重信息
for (InetSocketAddress address : serviceList) {
int weight = OpenrpcBootStrap.configuration.getRegistry().getWeight(serviceName, address);
// 初始化时所有Element的当前权重值都为0
elementList.add(new Element(address, weight, 0));
}
}

@Override
public InetSocketAddress getNext() {
if (serviceList.isEmpty() || elementList.isEmpty()) {
throw new DiscoveryException("未找到任何可用的服务节点");
}
int total = 0;
Element result = elementList.get(0);
for (Element element : elementList) {
// 计算总的权值和
total += element.weight;
// 每一轮开始各个节点的当前权重都加上节点的初始权重
element.currentWeight += element.weight;
// 选择当前轮次当前权重值最大的节点
if (result.currentWeight < element.currentWeight) {
result = element;
}
}
// 选择完成后,选中节点的当前权重值需要减去总体权重和
result.currentWeight -= total;
return result.address;
}

@Override
public List<InetSocketAddress> getServiceList() {
return this.serviceList;
}

@Override
public void addServiceAddress(InetSocketAddress address) {
serviceList.add(address);
int weight = OpenrpcBootStrap.configuration.getRegistry().getWeight(serviceName, address);
elementList.add(new Element(address, weight, 0));
}

@Override
public void removeServiceAddress(InetSocketAddress address) {
serviceList.remove(address);
elementList.remove(new Element(address, 0, 0));
}
}

static class Element {
// 服务节点地址
InetSocketAddress address;
// 服务节点的初始权重,不变
int weight;
// 服务节点的当前权重,改变
int currentWeight;

public Element(InetSocketAddress address, int weight, int currentWeight) {
this.address = address;
this.weight = weight;
this.currentWeight = currentWeight;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Element element = (Element) o;
return Objects.equals(address, element.address);
}

@Override
public int hashCode() {
return Objects.hash(address);
}
}
}