自研RPC框架之服务发现设计

自研RPC框架之服务发现设计

1.注册中心选择

常见的注册中心选择有ZooKeeper、Nacos、Eureka等,ZooKeeper作为分布式协调服务的应用场景很多,如集群管理、服务注册发现、分布式配置管理、命名服务等等,ZooKeeper的命名服务+Watcher机制可以完美解决服务注册发现和动态上下线的问题,不过ZooKeeper在海量的注册上下线场景下性能会受到影响,如CPU持续飙高甚至导致服务宕机,不过这只有在很大的注册压力下才会发生,我们应该通过合理的配置服务注册与上下线的批次,避免流量高峰期的操作等方案减小这种情况发生的概率。当然,==适配更多种类的注册中心==也是我们自研RPC框架的后续改进的一个方向,此时此刻ZooKeeper对我们来说是最合适的选择(毕竟学习成本也是成本嘛)。

给出极客时间上一个京东大佬的内部自研RPC的注册中心选型的思考:04 | 网络通信:RPC框架在网络通信上更倾向于哪种网络IO模型?-RPC实战与核心原理-极客时间 (geekbang.org)

整体的思路很简单,就是搭建一个ZooKeeper集群作为注册中心集群,服务注册的时候只需要服务节点向ZooKeeper节点写入注册信息即可,利用ZooKeeper的Watcher机制完成服务订阅与服务下发功能,整体流程如下图:

img

  1. 服务平台管理端先在ZooKeeper中创建一个服务根路径,可以根据接口名命名(例如:/service/com.demo.xxService),在这个路径再创建服务提供方目录与服务调用方目录(例如:provider、consumer),分别用来存储服务提供方的节点信息和服务调用方的节点信息。
  2. 当服务提供方发起注册时,会在服务提供方目录中创建一个临时节点,节点中存储该服务提供方的注册信息。
  3. 当服务调用方发起订阅时,则在服务调用方目录中创建一个临时节点,节点中存储该服务调用方的信息,同时服务调用方watch该服务的服务提供方目录(/service/com.demo.xxService/provider)中所有的服务节点数据。
  4. 当服务提供方目录下有节点数据发生变更时,ZooKeeper就会通知给发起订阅的服务调用方。

我所在的技术团队早期使用的RPC框架服务发现就是基于ZooKeeper实现的,并且还平稳运行了一年多,但后续团队的微服务化程度越来越高之后,ZooKeeper集群整体压力也越来越高,尤其在集中上线的时候越发明显。“集中爆发”是在一次大规模上线的时候,当时有超大批量的服务节点在同时发起注册操作,ZooKeeper集群的CPU突然飙升,导致ZooKeeper集群不能工作了,而且我们当时也无法立马将ZooKeeper集群重新启动,一直到ZooKeeper集群恢复后业务才能继续上线。

经过我们的排查,引发这次问题的根本原因就是ZooKeeper本身的性能问题当连接到ZooKeeper的节点数量特别多,对ZooKeeper读写特别频繁,且ZooKeeper存储的目录达到一定数量的时候,ZooKeeper将不再稳定,CPU持续升高,最终宕机。而宕机之后,由于各业务的节点还在持续发送读写请求,刚一启动,ZooKeeper就因无法承受瞬间的读写压力,马上宕机

这次“意外”让我们意识到,ZooKeeper集群性能显然已经无法支撑我们现有规模的服务集群了,我们需要重新考虑服务发现方案。

基于消息总线的最终一致性的注册中心

我们知道,ZooKeeper的一大特点就是强一致性,ZooKeeper集群的每个节点的数据每次发生更新操作,都会通知其它ZooKeeper节点同时执行更新。它要求保证每个节点的数据能够实时的完全一致,这也就直接导致了ZooKeeper集群性能上的下降。这就好比几个人在玩传递东西的游戏,必须这一轮每个人都拿到东西之后,所有的人才能开始下一轮,而不是说我只要获得到东西之后,就可以直接进行下一轮了。

而RPC框架的服务发现,在服务节点刚上线时,服务调用方是可以容忍在一段时间之后(比如几秒钟之后)发现这个新上线的节点的。毕竟服务节点刚上线之后的几秒内,甚至更长的一段时间内没有接收到请求流量,对整个服务集群是没有什么影响的,所以我们可以牺牲掉CP(强制一致性),而选择AP(最终一致),来换取整个注册中心集群的性能和稳定性。

那么是否有一种简单、高效,并且最终一致的更新机制,能代替ZooKeeper那种数据强一致的数据更新机制呢?

因为要求最终一致性,我们可以考虑采用消息总线机制。注册数据可以全量缓存在每个注册中心内存中,通过消息总线来同步数据。当有一个注册中心节点接收到服务节点注册时,会产生一个消息推送给消息总线,再通过消息总线通知给其它注册中心节点更新数据并进行服务下发,从而达到注册中心间数据最终一致性,具体流程如下图所示:

img

  • 当有服务上线,注册中心节点收到注册请求,服务列表数据发生变化,会生成一个消息,推送给消息总线,每个消息都有整体递增的版本
  • 消息总线会主动推送消息到各个注册中心,同时注册中心也会定时拉取消息。对于获取到消息的在消息回放模块里面回放,只接受大于本地版本号的消息,小于本地版本号的消息直接丢弃,从而实现最终一致性
  • 消费者订阅可以从注册中心内存拿到指定接口的全部服务实例,并缓存到消费者的内存里面。
  • 采用推拉模式,消费者可以及时地拿到服务实例增量变化情况,并和内存中的缓存数据进行合并。

为了性能,这里采用了两级缓存,注册中心和消费者的内存缓存,通过异步推拉模式来确保最终一致性。

另外,你也可能会想到,服务调用方拿到的服务节点不是最新的,所以目标节点存在已经下线或不提供指定接口服务的情况,这个时候有没有问题?这个问题我们放到了RPC框架里面去处理,在服务调用方发送请求到目标节点后,目标节点会进行合法性验证,如果指定接口服务不存在正在下线,则会拒绝该请求。服务调用方收到拒绝异常后,会安全重试到其它节点

通过消息总线的方式,我们就可以完成注册中心集群间数据变更的通知,保证数据的最终一致性,并能及时地触发注册中心的服务下发操作。在 RPC领域精耕细作后,你会发现,服务发现的特性是允许我们在设计超大规模集群服务发现系统的时候,舍弃强一致性,更多地考虑系统的健壮性。最终一致性才是分布式系统设计中更为常用的策略。

2.注册中心设计

我们的注册中心暂时只实现了ZooKeeperRegistry,我们后面也会以ZooKeeper为例解释服务注册发现的设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class RegistryConfig {
// 连接信息("zookeeper://192.168.12.134:2181")
private final String connectString;

public RegistryConfig(String connectString) {
this.connectString = connectString;
}

public Registry getRegistry() {
// 获取注册中心的类型
String registryType = getRegistryType(connectString, true).toLowerCase().trim();
// TODO:使用工厂设计模式优化
// 通过类型获取具体注册中心
if (registryType.equals("zookeeper")) {
String host = getRegistryType(connectString, false);
return new ZooKeeperRegistry(host, Constant.TIME_OUT);
} else if (registryType.equals("nacos")) {
String host = getRegistryType(connectString, false);
return new NacosRegistry(host, Constant.TIME_OUT);
}
throw new DiscoveryException("未发现合适的注册中心");
}

private String getRegistryType(String connectString, boolean ifType) {
String[] typeAndHost = connectString.split("://");
if (typeAndHost.length != 2) {
throw new RuntimeException("给定的注册中心连接URL不合法");
}
if (ifType) {
return typeAndHost[0];
} else {
return typeAndHost[1];
}
}
}

首先,所有的注册中心都继承统一的Registry接口,其中包含了四个方法:注册服务、服务发现、注销服务、获取服务节点权重。

1
2
3
4
5
6
7
8
9
10
public interface Registry {
// 注册服务
void register(ServiceConfig serviceConfig);
// 服务发现
List<InetSocketAddress> lookup(String serviceName);
// 注销服务
void logOff();
// 获取服务节点权重
int getWeight(String serviceName, InetSocketAddress address);
}

openrpc-common中我们定义了ZooKeeper的工具类ZooKeeperUtil,里面封装了创建节点、检查节点是否存在、获取子节点列表、获取节点数据、删除节点等多个工具方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public class ZooKeeperRegistry extends AbstractRegistry {
private ZooKeeper zooKeeper;
public ZooKeeperRegistry() {
this.zooKeeper = ZooKeeperUtil.createZookeeper();
}
public ZooKeeperRegistry(String connectString, int timeout) {
this.zooKeeper = ZooKeeperUtil.createZookeeper(connectString, timeout);
}

@Override
public void register(ServiceConfig serviceConfig) {
// 服务节点的名称
String parentNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceConfig.getInterface().getName() + serviceConfig.getGroup();
// 创建持久的服务节点
if (!ZooKeeperUtil.exists(zooKeeper, parentNode, null)) {
ZooKeeperNode zkNode = new ZooKeeperNode(parentNode, null);
ZooKeeperUtil.createNode(zooKeeper, zkNode, null, CreateMode.PERSISTENT);
}
// 创建临时的服务地址子节点
String node = parentNode + "/" + NetUtils.getIp() + ":" + OpenrpcBootStrap.configuration.getPort();
if (!ZooKeeperUtil.exists(zooKeeper, node, null)) {
ZooKeeperNode zkNode = new ZooKeeperNode(node, String.valueOf(serviceConfig.getWeight()).getBytes());
ZooKeeperUtil.createNode(zooKeeper, zkNode, null, CreateMode.EPHEMERAL);
}
}

@Override
public List<InetSocketAddress> lookup(String serviceName) {
// 获取服务节点路径
String serviceNode = Constant.BASE_PROVIDERS_PATH + "/" + serviceName;
// 查询服务地址列表
List<String> addressList = ZooKeeperUtil.getChildren(zooKeeper, serviceNode, new DynamicPerceptionWatcher());
if (addressList == null || addressList.size() == 0) {
throw new DiscoveryException("未发现任何可用的服务地址");
}
// 解析服务地址列表
List<InetSocketAddress> list = addressList.stream().map(ipStr -> {
String[] ipAndPort = ipStr.split(":");
return new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]));
}).toList();
return list;
}

@Override
public void logOff() {
// 获取服务集合
Set<String> serviceSet = OpenrpcBootStrap.serviceList.keySet();
// 注销所有服务
for (String serviceName : serviceSet) {
// 服务节点的名称
String node = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" + NetUtils.getIp() + ":" + OpenrpcBootStrap.configuration.getPort();
// 删除服务节点
if (ZooKeeperUtil.exists(zooKeeper, node, null)) {
ZooKeeperUtil.deleteNode(zooKeeper, node);
}
}
}

@Override
public int getWeight(String serviceName, InetSocketAddress address) {
// 服务节点的名称
String node = Constant.BASE_PROVIDERS_PATH + "/" + serviceName + "/" + address.getHostName() + ":" + address.getPort();
// 获取服务节点的权重数据
return ZooKeeperUtil.getData(zooKeeper, node);
}
}

注意我们在调用getChildren方法时给服务节点注册了DynamicPerceptionWatcher对象,一旦注册中心发现服务节点(如com.huling.HelloService)下有新服务地址子节点上线或有旧服务地址子节点下线,注册中心的WatcherManager模块就会通知ZK Server,ZK Server会根据变化类型通知相应客户端,告知他们发生了哪些变化。客户端收到ZooKeeper Server的通知后,客户端的ZkWatcherManager会根据watcher的类型如child watcher来触发相应的事件处理方法如processChildChanged。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class DynamicPerceptionWatcher implements Watcher {
private static final Logger log = LoggerFactory.getLogger(DynamicPerceptionWatcher.class);

@Override
public void process(WatchedEvent event) {
if (event.getType() == Event.EventType.NodeChildrenChanged) {
log.info("发现服务变动:{}", event.getPath());
// 1.动态感知服务上线
// 1.1.获取服务名称
String serviceName = getServiceName(event.getPath());
// 1.2.拉取最新服务列表
List<InetSocketAddress> serviceList = OpenrpcBootStrap.configuration.getRegistry().lookup(serviceName);
log.info("最新的服务列表:{}", serviceList);
// 1.3.获取负载均衡选择器中缓存的服务列表
Selector selector = OpenrpcBootStrap.configuration.getLoadBalancer().selector(serviceName);
List<InetSocketAddress> selectorServiceList = selector.getServiceList();
for (InetSocketAddress address : serviceList) {
// 1.4.通知selector服务上线
if (!selectorServiceList.contains(address)) {
log.info("发现新的服务上线:{}", event.getPath());
selector.addServiceAddress(address);
// 后续负载均衡器选择到新上线服务节点后会自动建立通道缓存,不需要预先缓存
}
}
// 2.动态感知服务下线
for (InetSocketAddress address : selectorServiceList) {
// 2.1.通知selector服务下线
if (!serviceList.contains(address)) {
log.info("发现服务下线:{}", event.getPath());
// 注意:如果使用ArrayList会发生并发修改异常,因为增强For循环使用迭代器遍历过程中不能发生新增或删除元素的操作,除非使用迭代器的remove方法
selector.removeServiceAddress(address);
// 清除已关闭的通道缓存 (TODO:是否需要延迟关闭连接)
OpenrpcBootStrap.channelCache.remove(address);
}
}
}
}

private String getServiceName(String servicePath) {
return servicePath.substring(servicePath.lastIndexOf("/") + 1, servicePath.length());
}

}