自研RPC框架之网络通信流程

自研RPC框架之网络通信流程

1.RPC服务端启动

image-20240210151313913

我们的RPC框架采用Netty作为网络通信的基础框架,因为其自带编解码器处理TCP粘包,使用简单高效,对于开发人员来说非常友好。

网络通信是我们RPC框架的重要基石,其中涉及到RPC协议的制定、请求协议解码器、响应协议编码器、日志处理器、方法执行处理器等,如下面代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ProviderChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 添加日志处理器(入站/出站)
.addLast(new LoggingHandler())
// 添加心跳检测处理器,60s内没有收到心跳就关闭连接
// .addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
// 添加响应协议编码器(出站)
.addLast(new OpenrpcResponseEncoder())
// 添加请求协议解码器(入站)
.addLast(new OpenrpcRequestDecoder())
// 添加方法执行处理器(入站)
.addLast(new MethodCallHandler());
}
}

2.RPC客户端启动

RPC客户端生成代理对象是通过JDK的动态代理机制完成的,利用Proxy.newProxyInstance方法并传入类加载器、接口列表、handler,我们就能拿到一个代理对象,其关键内容正是InvocationHandler对象。

1
2
3
4
5
6
7
8
public T get() {
// 使用动态代理生成代理对象
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
Class<T>[] classes = new Class[]{interfaceRef};
InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, group);
Object proxy = Proxy.newProxyInstance(classLoader, classes, handler);
return interfaceRef.cast(proxy);
}

InvocationHandler中invoke方法的核心逻辑如下,总结来说就是封装请求报文对象,其中包含了请求ID、请求类型(例如普通请求和心跳检测)、压缩类型、序列化类型、请求载荷(其中包含接口全限定名、方法名、方法参数类型列表、方法参数值列表);然后通过ThreadLocal对象保存请求报文并使用完毕后立刻销毁,通过注册中心拉取服务列表,负载均衡器获取到可用服务地址,根据服务地址从客户端连接缓存中获取Channel对象并异步发送消息(客户端通过请求ID与CompletableFuture绑定,通过后面响应中回传的请求ID找到对应的CompletableFuture对象并设置响应结果)。最终,代理对象返回远程调用的执行结果,对于远程调用的业务执行异常客户端需要自行负责捕获处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
try {
// 1.封装报文
RequestPayload payload = RequestPayload.builder()
.interfaceName(interfaceRef.getName())
.methodName(method.getName())
.parametersType(method.getParameterTypes())
.parametersValue(args)
.returnType(method.getReturnType())
.build();
OpenrpcRequest request = OpenrpcRequest.builder()
.requestId(OpenrpcBootStrap.configuration.getIdWorker().nextId())
.requestType(RequestType.REQUEST.getId())
.compressType(OpenrpcBootStrap.configuration.getCompressor().getCode())
.serializeType(OpenrpcBootStrap.configuration.getSerializer().getCode())
.requestPayload(payload)
.build();

// 2.本地线程变量保存请求对象
OpenrpcBootStrap.threadLocal.set(request);

// 3.注册中心拉取服务列表,负载均衡器获取可用服务地址
InetSocketAddress address = OpenrpcBootStrap.configuration.getLoadBalancer().selectServiceAddress(interfaceRef.getName() + group);
log.info("发现可用服务:{},{},{}", interfaceRef.getName(), method.getName(), address);

// 4.从客户端连接缓存中获取channel
Channel channel = getAvaliableChannel(address);

// 5.删除本地线程变量的请求对象,防止内存泄漏
OpenrpcBootStrap.threadLocal.remove();

// 6.异步发送消息
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
OpenrpcBootStrap.pendingCache.put(request.getRequestId(), completableFuture);
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 发送完成后触发监听器,检测是否发送成功
if (!channelFuture.isSuccess()) {
completableFuture.completeExceptionally(channelFuture.cause());
}
}
});

// 7.返回远程调用的执行结果
Object result = completableFuture.get(timeout, TimeUnit.MILLISECONDS);

// 8.服务调用端执行异常,抛出InvocationTargetException
if (result instanceof InvocationTargetException exception) {
throw exception;
}
if (result == null) {
throw new RequestFailException("远程调用失败,继续重试:");
}
return result;
} catch (Exception e) {
if (e instanceof InvocationTargetException exception) {
throw exception.getTargetException();
}
}

当然,我们还需要关注客户端连接缓存如何建立的,上面代码中已经说明如果从全局的客户端连接缓存中获取不到channel需要重新创建,这就是getAvaliableChannel方法需要做的事情,等到连接建立成功后,更新客户端连接缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private Channel getAvaliableChannel(InetSocketAddress address) {
// 从缓存中获取channel
Channel channel = OpenrpcBootStrap.channelCache.get(address);
if (channel == null) {
Bootstrap bs = NettyBootstrapInitializer.getBootstrap();
// 连接服务器并阻塞等待连接完成
try {
channel = bs.connect(address).sync().channel();
} catch (InterruptedException e) {
log.error("创建channel失败");
throw new NetworkException(e);
}
log.info("创建channel成功");
// 缓存channel对象
OpenrpcBootStrap.channelCache.put(address, channel);
}
return channel;
}

public class NettyBootstrapInitializer {
private static Bootstrap bootstrap = new Bootstrap();
static {
EventLoopGroup group = new NioEventLoopGroup();
// 客户端辅助启动类
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ConsumerChannelInitializer());
}
public static Bootstrap getBootstrap() {
return bootstrap;
}
}

与服务端的代码类似,RPC客户端也需要添加各种处理器,其中包含请求协议编码器、响应协议解码器、响应结果处理器等等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline()
// 添加日志处理器(入站/出站)
.addLast(new LoggingHandler(LogLevel.DEBUG))
// 添加心跳检测处理器,如果10s内没有任何数据包发出就发送心跳包
// .addLast(new IdleStateHandler(0, 10, 0, TimeUnit.SECONDS))
// 添加请求协议编码器(出站)
.addLast(new OpenrpcRequestEncoder())
// 添加响应协议解码器(入站)
.addLast(new OpenrpcResponseDecoder())
// 添加响应结果处理器(入站)
.addLast(new ResultProcessHandler());
}
}

3.协议处理器设计

上面我们看了服务端和客户端的启动过程,其中都涉及到RPC协议的请求/响应编解码以及其他的处理器如响应结果处理器、方法执行处理器、日志记录处理器、心跳检测处理器等等,下面我们按照RPC网络报文流转步骤逐个介绍它们的设计。

3.1请求编码处理器

image-20240210161250349

首先介绍我们的请求协议的设计,包含请求头请求体两大部分,其中请求头包含魔数协议版本头部长度总长度请求类型序列化类型压缩类型请求ID请求标志位扩展字段共24个字节,请求体的内容就是请求载荷经过序列化压缩后的字节数组,因此请求体长度需要通过计算得出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class OpenrpcRequestEncoder extends MessageToByteEncoder<OpenrpcRequest> {
private static final Logger log = LoggerFactory.getLogger(OpenrpcRequestEncoder.class);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, OpenrpcRequest openrpcRequest, ByteBuf byteBuf) throws Exception {
// 写入4字节的魔数值
byteBuf.writeBytes(MessageFormatConstant.MAGIC)
// 写入1字节的版本号
.writeByte(MessageFormatConstant.VERSION)
// 写入2字节的首部长度
.writeShort(MessageFormatConstant.HEADER_LENGTH)
// 预留4字节的报文长度空间
.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES)
// 写入1字节的请求类型
.writeByte(openrpcRequest.getRequestType())
// 写入1字节的序列化类型
.writeByte(openrpcRequest.getSerializeType())
// 写入1字节的压缩类型
.writeByte(openrpcRequest.getCompressType())
// 写入8字节的请求ID
.writeLong(openrpcRequest.getRequestId());
// 请求载荷的序列化
Serializer serializer = SerializerFactory.getSerializer(openrpcRequest.getSerializeType()).getImpl();
byte[] body = serializer.serialize(openrpcRequest.getRequestPayload());
// 请求载荷的压缩
Compressor compressor = CompressorFactory.getCompressor(openrpcRequest.getCompressType()).getImpl();
// 判断请求体数据长度是否超过压缩阈值
if (body.length >= OpenrpcBootStrap.configuration.getCompressThreshold()) {
body = compressor.compress(body);
// 设置是否压缩标志位
byteBuf.writeByte(MessageFormatConstant.COMPRESSED);
} else {
// 设置是否压缩标志位
byteBuf.writeByte(MessageFormatConstant.NOT_COMPRESSED);
}
// 写入扩展字段
byteBuf.writeByte(0x00);
// 写入请求体数据
byteBuf.writeBytes(body);
// 保存当前写指针位置
int writerIndex = byteBuf.writerIndex();
// 填充报文长度字段
byteBuf.writerIndex(MessageFormatConstant.FULL_LENGTH_FIELD_START);
byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
// 写指针重新归位
byteBuf.writerIndex(writerIndex);
log.info("调用端的请求编码工作已完成:{}", openrpcRequest.getRequestId());
}
}

3.2请求解码处理器

请求的解码过程就是编码的逆过程,通过继承LengthFieldBasedFrameDecoder处理TCP报文的粘包问题,需要注意的是我们需要针对心跳检测PING请求直接返回PONG响应,对于解析的请求载荷部分需要根据请求头中的标记位(里面有是否压缩的标记)决定是否解压缩。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
public class OpenrpcRequestDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(OpenrpcRequestDecoder.class);
public OpenrpcRequestDecoder() {
super(
// 最大帧长度
MessageFormatConstant.MAX_FRAME_LENGTH,
// 报文长度字段起始位置
MessageFormatConstant.FULL_LENGTH_FIELD_START,
// 报文长度字段字节数
MessageFormatConstant.FULL_LENGTH_FIELD_BYTES,
// 报文长度补偿值,防止截取到后续的报文
-(MessageFormatConstant.FULL_LENGTH_FIELD_START + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES),
// 跳过的字节数
0
);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object o = super.decode(ctx, in);
if (o instanceof ByteBuf byteBuf) {
// 解析魔数值
byte[] magic = new byte[MessageFormatConstant.MAGIC.length];
byteBuf.readBytes(magic);
// 校验魔数值
for (int i = 0; i < magic.length; i++) {
if (magic[i] != MessageFormatConstant.MAGIC[i]) {
throw new RuntimeException("请求协议不一致");
}
}
// 解析版本号
byte version = byteBuf.readByte();
if (version > MessageFormatConstant.VERSION) {
throw new RuntimeException("请求协议尚不支持");
}
// 解析首部长度
short headerLength = byteBuf.readShort();
// 解析报文长度
int fullLength = byteBuf.readInt();
// 解析请求类型
byte requestType = byteBuf.readByte();
// 解析序列化类型
byte serializeType = byteBuf.readByte();
// 解析压缩类型
byte compressType = byteBuf.readByte();
// 解析请求ID
long requestId = byteBuf.readLong();
// 解析标志位
byte flag = byteBuf.readByte();
// 解析扩展字段
byte extend = byteBuf.readByte();
// 心跳检测请求没有载荷,解析结束
if (requestType == RequestType.HEART_BEAT.getId()) {
return new OpenrpcRequest(requestId, requestType, compressType, serializeType, null);
}
// 解析请求载荷
byte[] body = new byte[fullLength - headerLength];
byteBuf.readBytes(body);
// 请求载荷的解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getImpl();
// 根据标志位中的压缩标记,判断是否需要解压缩
if ((flag & MessageFormatConstant.COMPRESSED) != 0) {
body = compressor.decompress(body);
}
// 请求载荷的反序列化
Serializer serializer = SerializerFactory.getSerializer(serializeType).getImpl();
RequestPayload payload = serializer.deserialize(body, RequestPayload.class);
// 封装请求对象
log.info("服务端的请求解码工作已完成:{}", requestId);
return new OpenrpcRequest(requestId, requestType, compressType, serializeType, payload);
}
return null;
}
}

3.3方法执行处理器

通过请求载荷中的接口全限定名称获取服务端缓存的服务实例对象,进一步根据方法名称、方法参数类型列表利用反射获取指定方法并执行,如果方法不存在则返回失败响应,如果业务执行异常则返回包含异常信息的成功响应,下面就进入响应的编码出站阶段了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public class MethodCallHandler extends SimpleChannelInboundHandler<OpenrpcRequest> {
private static final Logger log = LoggerFactory.getLogger(MethodCallHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, OpenrpcRequest msg) throws Exception {
// 获取请求载荷
RequestPayload payload = msg.getRequestPayload();
// 执行匹配方法
Object result = callTargetMethod(payload);
if (result == null) {
OpenrpcResponse response = new OpenrpcResponse(msg.getRequestId(), ResponseCode.FAIL.getCode(), msg.getCompressType(), msg.getSerializeType(), "");
ctx.channel().writeAndFlush(response);
log.warn("请求处理失败:{}", msg.getRequestId());
}
log.info("服务端的请求方法调用工作已完成:{}", msg.getRequestId());
// 封装响应
OpenrpcResponse response = new OpenrpcResponse(msg.getRequestId(), ResponseCode.SUCCESS.getCode(), msg.getCompressType(), msg.getSerializeType(), result);
// 返回响应,开始进入服务端出站的流水线处理器
ctx.channel().writeAndFlush(response);
}

private Object callTargetMethod(RequestPayload payload) {
// 获取方法执行的必备参数
String interfaceName = payload.getInterfaceName();
String methodName = payload.getMethodName();
Class<?>[] parametersType = payload.getParametersType();
Object[] parametersValue = payload.getParametersValue();
// 查询匹配的服务实现
ServiceConfig serviceConfig = OpenrpcBootStrap.serviceList.get(interfaceName);
if (serviceConfig == null) {
log.error("未找到匹配的服务:{},{},{}", interfaceName, methodName, parametersType);
return null;
}
// 查询匹配的方法
Object serviceImpl = serviceConfig.getRef();
Class<?> clazz = serviceImpl.getClass();
Object result = null;
try {
Method method = clazz.getMethod(methodName, parametersType);
// 反射执行方法
result = method.invoke(serviceImpl, parametersValue);
} catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | NullPointerException e) {
log.error("方法执行过程中发生异常:{},{},{},{}", interfaceName, methodName, parametersType, parametersValue, e);
if (e instanceof InvocationTargetException exception) {
return exception;
}
return null;
}
return result;
}
}

3.4响应编码处理器

image-20240210163526036

我们介绍一些响应协议的设计,同样是包含响应头响应体两部分,其中响应头中包含魔数协议版本头部长度总长度响应编码序列化类型压缩类型请求ID响应标志位扩展字段共24个字节,响应体的内容就是方法执行结果经过序列化和压缩后的字节数组,因此响应体的长度需要计算得出。

响应的编码和请求的编码过程类似,大家对比看看就会发现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class OpenrpcResponseEncoder extends MessageToByteEncoder<OpenrpcResponse> {
private static final Logger log = LoggerFactory.getLogger(OpenrpcResponseEncoder.class);
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, OpenrpcResponse openrpcResponse, ByteBuf byteBuf) throws Exception {
// 写入4字节的魔数值
byteBuf.writeBytes(MessageFormatConstant.MAGIC)
// 写入1字节的版本号
.writeByte(MessageFormatConstant.VERSION)
// 写入2字节的首部长度
.writeShort(MessageFormatConstant.HEADER_LENGTH)
// 预留4字节的报文长度空间
.writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES)
// 写入1字节的响应编码
.writeByte(openrpcResponse.getResponseCode())
// 写入1字节的序列化类型
.writeByte(openrpcResponse.getSerializeType())
// 写入1字节的压缩类型
.writeByte(openrpcResponse.getCompressType())
// 写入8字节的请求ID
.writeLong(openrpcResponse.getRequestId());
// 响应结果的序列化
Serializer serializer = SerializerFactory.getSerializer(openrpcResponse.getSerializeType()).getImpl();
byte[] body = serializer.serialize(openrpcResponse.getBody());
// 响应结果的压缩
Compressor compressor = CompressorFactory.getCompressor(openrpcResponse.getCompressType()).getImpl();
// 判断响应体数据长度是否超过压缩阈值
if (body.length >= OpenrpcBootStrap.configuration.getCompressThreshold()) {
body = compressor.compress(body);
// 设置是否压缩标志位
byteBuf.writeByte(MessageFormatConstant.COMPRESSED);
} else {
// 设置是否压缩标志位
byteBuf.writeByte(MessageFormatConstant.NOT_COMPRESSED);
}
// 写入扩展字段
byteBuf.writeByte(0x00);
// 写入响应体数据
byteBuf.writeBytes(body);
// 保存当前写指针位置
int writerIndex = byteBuf.writerIndex();
// 填充报文长度字段
byteBuf.writerIndex(MessageFormatConstant.FULL_LENGTH_FIELD_START);
byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length);
// 写指针重新归位
byteBuf.writerIndex(writerIndex);
log.info("服务端的响应编码工作已完成:{}", openrpcResponse.getRequestId());
}
}

3.5响应解码处理器

响应的解码过程就是编码的逆过程,通过继承LengthFieldBasedFrameDecoder处理TCP报文的粘包问题,大致流程跟请求的解码类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public class OpenrpcResponseDecoder extends LengthFieldBasedFrameDecoder {
private static final Logger log = LoggerFactory.getLogger(OpenrpcResponseDecoder.class);
public OpenrpcResponseDecoder() {
super(
// 最大帧长度
MessageFormatConstant.MAX_FRAME_LENGTH,
// 报文长度字段起始位置
MessageFormatConstant.FULL_LENGTH_FIELD_START,
// 报文长度字段字节数
MessageFormatConstant.FULL_LENGTH_FIELD_BYTES,
// 报文长度补偿值,防止截取到后续的报文
-(MessageFormatConstant.FULL_LENGTH_FIELD_START + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES),
// 跳过的字节数
0
);
}
@Override
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
Object o = super.decode(ctx, in);
if (o instanceof ByteBuf byteBuf) {
// 解析魔数值
byte[] magic = new byte[MessageFormatConstant.MAGIC.length];
byteBuf.readBytes(magic);
// 校验魔数值
for (int i = 0; i < magic.length; i++) {
if (magic[i] != MessageFormatConstant.MAGIC[i]) {
throw new RuntimeException("请求协议不一致");
}
}
// 解析版本号
byte version = byteBuf.readByte();
if (version > MessageFormatConstant.VERSION) {
throw new RuntimeException("请求协议尚不支持");
}
// 解析首部长度
short headerLength = byteBuf.readShort();
// 解析报文长度
int fullLength = byteBuf.readInt();
// 解析响应编码
byte responseCode = byteBuf.readByte();
// 解析序列化类型
byte serializeType = byteBuf.readByte();
// 解析压缩类型
byte compressType = byteBuf.readByte();
// 解析请求ID
long requestId = byteBuf.readLong();
// 解析标志位
byte flag = byteBuf.readByte();
// 解析扩展字段
byte extend = byteBuf.readByte();
// 解析响应结果
byte[] body = new byte[fullLength - headerLength];
byteBuf.readBytes(body);
// 响应结果的解压缩
Compressor compressor = CompressorFactory.getCompressor(compressType).getImpl();
// 根据标志位中的压缩标记,判断是否需要解压缩
if ((flag & MessageFormatConstant.COMPRESSED) != 0) {
body = compressor.decompress(body);
}
// 响应结果的反序列化
Serializer serializer = SerializerFactory.getSerializer(serializeType).getImpl();
Object result = serializer.deserialize(body, Object.class);
// 封装请求对象
log.info("调用端的响应解码工作已完成:{}", requestId);
return new OpenrpcResponse(requestId, responseCode, compressType, serializeType, result);
}
return null;
}
}

3.6响应结果处理器

最终兜兜转转,RPC远程调用响应终于到达最后一站了,还记得响应头中的请求ID字段吗,此时我们就可以根据请求ID判断此响应对于哪个请求,进而找到绑定的CompletableFuture对象,通过complete方法设置响应结果,此时RPC调用方通过get方法阻塞等待线程会被唤醒,完成了响应的异步通知

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ResultProcessHandler extends SimpleChannelInboundHandler<OpenrpcResponse> {
private static final Logger log = LoggerFactory.getLogger(ResultProcessHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, OpenrpcResponse response) throws Exception {
// 根据请求ID寻找匹配请求的completableFuture
CompletableFuture<Object> completableFuture = OpenrpcBootStrap.pendingCache.remove(response.getRequestId());
// 判断响应的类型
if (response.getResponseCode() == ResponseCode.FAIL.getCode()) {
// 异步写入结果
completableFuture.complete(null);
log.error("请求调用失败:{}", response.getRequestId());
} else if (response.getResponseCode() == ResponseCode.LIMITING.getCode()) {
// 异步写入结果
completableFuture.complete(null);
log.error("请求被限流:{}", response.getRequestId());
} else if (response.getResponseCode() == ResponseCode.CLOSING.getCode()) {
// 1.异步写入结果
completableFuture.complete(null);
// 2.从负载均衡器的健康服务列表移除正在关闭的服务
// 2.1.获取请求的服务名称
// TODO:这里存在一定问题,因为响应结果接受处理的线程与请求发现的线程可能不是同一个,可以考虑在响应协议中加入服务名称字段(接口全限定名+分组名称+版本号)
String interfaceName = OpenrpcBootStrap.threadLocal.get().getRequestPayload().getInterfaceName();
// 2.2.获取对应的selector
Selector selector = OpenrpcBootStrap.configuration.getLoadBalancer().selector(interfaceName);
// 2.3.移除关闭中的服务
selector.removeServiceAddress((InetSocketAddress) ctx.channel().remoteAddress());
log.info("服务端正在关闭,重试选取其他的服务节点:{}", response.getRequestId());
} else if (response.getResponseCode() == ResponseCode.SUCCESS.getCode()) {
// 异步写入结果
completableFuture.complete(response.getBody());
log.info("调用端的响应结果处理工作已完成:{}", response.getRequestId());
}
}
}