自研RPC框架之网络通信流程
1.RPC服务端启动
我们的RPC框架采用Netty作为网络通信的基础框架,因为其自带编解码器处理TCP粘包,使用简单高效,对于开发人员来说非常友好。
网络通信是我们RPC框架的重要基石,其中涉及到RPC协议的制定、请求协议解码器、响应协议编码器、日志处理器、方法执行处理器等,如下面代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ProviderChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new LoggingHandler()) .addLast(new OpenrpcResponseEncoder()) .addLast(new OpenrpcRequestDecoder()) .addLast(new MethodCallHandler()); } }
|
2.RPC客户端启动
RPC客户端生成代理对象是通过JDK的动态代理机制完成的,利用Proxy.newProxyInstance
方法并传入类加载器、接口列表、handler,我们就能拿到一个代理对象,其关键内容正是InvocationHandler
对象。
1 2 3 4 5 6 7 8
| public T get() { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<T>[] classes = new Class[]{interfaceRef}; InvocationHandler handler = new RpcConsumerInvocationHandler(interfaceRef, group); Object proxy = Proxy.newProxyInstance(classLoader, classes, handler); return interfaceRef.cast(proxy); }
|
InvocationHandler中invoke方法的核心逻辑如下,总结来说就是封装请求报文对象
,其中包含了请求ID、请求类型(例如普通请求和心跳检测)、压缩类型、序列化类型、请求载荷(其中包含接口全限定名、方法名、方法参数类型列表、方法参数值列表);然后通过ThreadLocal
对象保存请求报文并使用完毕后立刻销毁
,通过注册中心拉取服务列表,负载均衡器获取到可用服务地址,根据服务地址从客户端连接缓存
中获取Channel对象并异步发送消息
(客户端通过请求ID与CompletableFuture绑定
,通过后面响应中回传的请求ID找到对应的CompletableFuture对象并设置响应结果)。最终,代理对象返回远程调用的执行结果,对于远程调用的业务执行异常
客户端需要自行负责捕获处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| try { RequestPayload payload = RequestPayload.builder() .interfaceName(interfaceRef.getName()) .methodName(method.getName()) .parametersType(method.getParameterTypes()) .parametersValue(args) .returnType(method.getReturnType()) .build(); OpenrpcRequest request = OpenrpcRequest.builder() .requestId(OpenrpcBootStrap.configuration.getIdWorker().nextId()) .requestType(RequestType.REQUEST.getId()) .compressType(OpenrpcBootStrap.configuration.getCompressor().getCode()) .serializeType(OpenrpcBootStrap.configuration.getSerializer().getCode()) .requestPayload(payload) .build();
OpenrpcBootStrap.threadLocal.set(request);
InetSocketAddress address = OpenrpcBootStrap.configuration.getLoadBalancer().selectServiceAddress(interfaceRef.getName() + group); log.info("发现可用服务:{},{},{}", interfaceRef.getName(), method.getName(), address); Channel channel = getAvaliableChannel(address);
OpenrpcBootStrap.threadLocal.remove();
CompletableFuture<Object> completableFuture = new CompletableFuture<>(); OpenrpcBootStrap.pendingCache.put(request.getRequestId(), completableFuture); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { completableFuture.completeExceptionally(channelFuture.cause()); } } });
Object result = completableFuture.get(timeout, TimeUnit.MILLISECONDS); if (result instanceof InvocationTargetException exception) { throw exception; } if (result == null) { throw new RequestFailException("远程调用失败,继续重试:"); } return result; } catch (Exception e) { if (e instanceof InvocationTargetException exception) { throw exception.getTargetException(); } }
|
当然,我们还需要关注客户端连接缓存如何建立的,上面代码中已经说明如果从全局的客户端连接缓存
中获取不到channel需要重新创建,这就是getAvaliableChannel
方法需要做的事情,等到连接建立成功后,更新客户端连接缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| private Channel getAvaliableChannel(InetSocketAddress address) { Channel channel = OpenrpcBootStrap.channelCache.get(address); if (channel == null) { Bootstrap bs = NettyBootstrapInitializer.getBootstrap(); try { channel = bs.connect(address).sync().channel(); } catch (InterruptedException e) { log.error("创建channel失败"); throw new NetworkException(e); } log.info("创建channel成功"); OpenrpcBootStrap.channelCache.put(address, channel); } return channel; }
public class NettyBootstrapInitializer { private static Bootstrap bootstrap = new Bootstrap(); static { EventLoopGroup group = new NioEventLoopGroup(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ConsumerChannelInitializer()); } public static Bootstrap getBootstrap() { return bootstrap; } }
|
与服务端的代码类似,RPC客户端也需要添加各种处理器,其中包含请求协议编码器、响应协议解码器、响应结果处理器等等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ConsumerChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline() .addLast(new LoggingHandler(LogLevel.DEBUG)) .addLast(new OpenrpcRequestEncoder()) .addLast(new OpenrpcResponseDecoder()) .addLast(new ResultProcessHandler()); } }
|
3.协议处理器设计
上面我们看了服务端和客户端的启动过程,其中都涉及到RPC协议的请求/响应编解码以及其他的处理器如响应结果处理器、方法执行处理器、日志记录处理器、心跳检测处理器等等,下面我们按照RPC网络报文流转步骤逐个介绍它们的设计。
3.1请求编码处理器
首先介绍我们的请求协议的设计,包含请求头
和请求体
两大部分,其中请求头包含魔数
、协议版本
、头部长度
、总长度
、请求类型
、序列化类型
、压缩类型
、请求ID
、请求标志位
和扩展字段
共24个字节,请求体的内容就是请求载荷经过序列化
和压缩
后的字节数组,因此请求体长度需要通过计算得出。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class OpenrpcRequestEncoder extends MessageToByteEncoder<OpenrpcRequest> { private static final Logger log = LoggerFactory.getLogger(OpenrpcRequestEncoder.class); @Override protected void encode(ChannelHandlerContext channelHandlerContext, OpenrpcRequest openrpcRequest, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(MessageFormatConstant.MAGIC) .writeByte(MessageFormatConstant.VERSION) .writeShort(MessageFormatConstant.HEADER_LENGTH) .writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES) .writeByte(openrpcRequest.getRequestType()) .writeByte(openrpcRequest.getSerializeType()) .writeByte(openrpcRequest.getCompressType()) .writeLong(openrpcRequest.getRequestId()); Serializer serializer = SerializerFactory.getSerializer(openrpcRequest.getSerializeType()).getImpl(); byte[] body = serializer.serialize(openrpcRequest.getRequestPayload()); Compressor compressor = CompressorFactory.getCompressor(openrpcRequest.getCompressType()).getImpl(); if (body.length >= OpenrpcBootStrap.configuration.getCompressThreshold()) { body = compressor.compress(body); byteBuf.writeByte(MessageFormatConstant.COMPRESSED); } else { byteBuf.writeByte(MessageFormatConstant.NOT_COMPRESSED); } byteBuf.writeByte(0x00); byteBuf.writeBytes(body); int writerIndex = byteBuf.writerIndex(); byteBuf.writerIndex(MessageFormatConstant.FULL_LENGTH_FIELD_START); byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length); byteBuf.writerIndex(writerIndex); log.info("调用端的请求编码工作已完成:{}", openrpcRequest.getRequestId()); } }
|
3.2请求解码处理器
请求的解码过程就是编码的逆过程,通过继承LengthFieldBasedFrameDecoder
处理TCP报文的粘包问题,需要注意的是我们需要针对心跳检测PING请求直接返回PONG响应,对于解析的请求载荷部分需要根据请求头中的标记位(里面有是否压缩的标记)决定是否解压缩。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| public class OpenrpcRequestDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(OpenrpcRequestDecoder.class); public OpenrpcRequestDecoder() { super( MessageFormatConstant.MAX_FRAME_LENGTH, MessageFormatConstant.FULL_LENGTH_FIELD_START, MessageFormatConstant.FULL_LENGTH_FIELD_BYTES, -(MessageFormatConstant.FULL_LENGTH_FIELD_START + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES), 0 ); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { Object o = super.decode(ctx, in); if (o instanceof ByteBuf byteBuf) { byte[] magic = new byte[MessageFormatConstant.MAGIC.length]; byteBuf.readBytes(magic); for (int i = 0; i < magic.length; i++) { if (magic[i] != MessageFormatConstant.MAGIC[i]) { throw new RuntimeException("请求协议不一致"); } } byte version = byteBuf.readByte(); if (version > MessageFormatConstant.VERSION) { throw new RuntimeException("请求协议尚不支持"); } short headerLength = byteBuf.readShort(); int fullLength = byteBuf.readInt(); byte requestType = byteBuf.readByte(); byte serializeType = byteBuf.readByte(); byte compressType = byteBuf.readByte(); long requestId = byteBuf.readLong(); byte flag = byteBuf.readByte(); byte extend = byteBuf.readByte(); if (requestType == RequestType.HEART_BEAT.getId()) { return new OpenrpcRequest(requestId, requestType, compressType, serializeType, null); } byte[] body = new byte[fullLength - headerLength]; byteBuf.readBytes(body); Compressor compressor = CompressorFactory.getCompressor(compressType).getImpl(); if ((flag & MessageFormatConstant.COMPRESSED) != 0) { body = compressor.decompress(body); } Serializer serializer = SerializerFactory.getSerializer(serializeType).getImpl(); RequestPayload payload = serializer.deserialize(body, RequestPayload.class); log.info("服务端的请求解码工作已完成:{}", requestId); return new OpenrpcRequest(requestId, requestType, compressType, serializeType, payload); } return null; } }
|
3.3方法执行处理器
通过请求载荷中的接口全限定名称
获取服务端缓存的服务实例对象
,进一步根据方法名称、方法参数类型列表
利用反射
获取指定方法并执行,如果方法不存在则返回失败响应,如果业务执行异常则返回包含异常信息的成功响应,下面就进入响应的编码出站阶段了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| public class MethodCallHandler extends SimpleChannelInboundHandler<OpenrpcRequest> { private static final Logger log = LoggerFactory.getLogger(MethodCallHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, OpenrpcRequest msg) throws Exception { RequestPayload payload = msg.getRequestPayload(); Object result = callTargetMethod(payload); if (result == null) { OpenrpcResponse response = new OpenrpcResponse(msg.getRequestId(), ResponseCode.FAIL.getCode(), msg.getCompressType(), msg.getSerializeType(), ""); ctx.channel().writeAndFlush(response); log.warn("请求处理失败:{}", msg.getRequestId()); } log.info("服务端的请求方法调用工作已完成:{}", msg.getRequestId()); OpenrpcResponse response = new OpenrpcResponse(msg.getRequestId(), ResponseCode.SUCCESS.getCode(), msg.getCompressType(), msg.getSerializeType(), result); ctx.channel().writeAndFlush(response); }
private Object callTargetMethod(RequestPayload payload) { String interfaceName = payload.getInterfaceName(); String methodName = payload.getMethodName(); Class<?>[] parametersType = payload.getParametersType(); Object[] parametersValue = payload.getParametersValue(); ServiceConfig serviceConfig = OpenrpcBootStrap.serviceList.get(interfaceName); if (serviceConfig == null) { log.error("未找到匹配的服务:{},{},{}", interfaceName, methodName, parametersType); return null; } Object serviceImpl = serviceConfig.getRef(); Class<?> clazz = serviceImpl.getClass(); Object result = null; try { Method method = clazz.getMethod(methodName, parametersType); result = method.invoke(serviceImpl, parametersValue); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | NullPointerException e) { log.error("方法执行过程中发生异常:{},{},{},{}", interfaceName, methodName, parametersType, parametersValue, e); if (e instanceof InvocationTargetException exception) { return exception; } return null; } return result; } }
|
3.4响应编码处理器
我们介绍一些响应协议的设计,同样是包含响应头
和响应体
两部分,其中响应头中包含魔数
、协议版本
、头部长度
、总长度
、响应编码
、序列化类型
、压缩类型
、请求ID
、响应标志位
和扩展字段
共24个字节,响应体的内容就是方法执行结果经过序列化和压缩后的字节数组,因此响应体的长度需要计算得出。
响应的编码和请求的编码过程类似,大家对比看看就会发现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public class OpenrpcResponseEncoder extends MessageToByteEncoder<OpenrpcResponse> { private static final Logger log = LoggerFactory.getLogger(OpenrpcResponseEncoder.class); @Override protected void encode(ChannelHandlerContext channelHandlerContext, OpenrpcResponse openrpcResponse, ByteBuf byteBuf) throws Exception { byteBuf.writeBytes(MessageFormatConstant.MAGIC) .writeByte(MessageFormatConstant.VERSION) .writeShort(MessageFormatConstant.HEADER_LENGTH) .writerIndex(byteBuf.writerIndex() + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES) .writeByte(openrpcResponse.getResponseCode()) .writeByte(openrpcResponse.getSerializeType()) .writeByte(openrpcResponse.getCompressType()) .writeLong(openrpcResponse.getRequestId()); Serializer serializer = SerializerFactory.getSerializer(openrpcResponse.getSerializeType()).getImpl(); byte[] body = serializer.serialize(openrpcResponse.getBody()); Compressor compressor = CompressorFactory.getCompressor(openrpcResponse.getCompressType()).getImpl(); if (body.length >= OpenrpcBootStrap.configuration.getCompressThreshold()) { body = compressor.compress(body); byteBuf.writeByte(MessageFormatConstant.COMPRESSED); } else { byteBuf.writeByte(MessageFormatConstant.NOT_COMPRESSED); } byteBuf.writeByte(0x00); byteBuf.writeBytes(body); int writerIndex = byteBuf.writerIndex(); byteBuf.writerIndex(MessageFormatConstant.FULL_LENGTH_FIELD_START); byteBuf.writeInt(MessageFormatConstant.HEADER_LENGTH + body.length); byteBuf.writerIndex(writerIndex); log.info("服务端的响应编码工作已完成:{}", openrpcResponse.getRequestId()); } }
|
3.5响应解码处理器
响应的解码过程就是编码的逆过程,通过继承LengthFieldBasedFrameDecoder
处理TCP报文的粘包问题,大致流程跟请求的解码类似。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| public class OpenrpcResponseDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(OpenrpcResponseDecoder.class); public OpenrpcResponseDecoder() { super( MessageFormatConstant.MAX_FRAME_LENGTH, MessageFormatConstant.FULL_LENGTH_FIELD_START, MessageFormatConstant.FULL_LENGTH_FIELD_BYTES, -(MessageFormatConstant.FULL_LENGTH_FIELD_START + MessageFormatConstant.FULL_LENGTH_FIELD_BYTES), 0 ); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { Object o = super.decode(ctx, in); if (o instanceof ByteBuf byteBuf) { byte[] magic = new byte[MessageFormatConstant.MAGIC.length]; byteBuf.readBytes(magic); for (int i = 0; i < magic.length; i++) { if (magic[i] != MessageFormatConstant.MAGIC[i]) { throw new RuntimeException("请求协议不一致"); } } byte version = byteBuf.readByte(); if (version > MessageFormatConstant.VERSION) { throw new RuntimeException("请求协议尚不支持"); } short headerLength = byteBuf.readShort(); int fullLength = byteBuf.readInt(); byte responseCode = byteBuf.readByte(); byte serializeType = byteBuf.readByte(); byte compressType = byteBuf.readByte(); long requestId = byteBuf.readLong(); byte flag = byteBuf.readByte(); byte extend = byteBuf.readByte(); byte[] body = new byte[fullLength - headerLength]; byteBuf.readBytes(body); Compressor compressor = CompressorFactory.getCompressor(compressType).getImpl(); if ((flag & MessageFormatConstant.COMPRESSED) != 0) { body = compressor.decompress(body); } Serializer serializer = SerializerFactory.getSerializer(serializeType).getImpl(); Object result = serializer.deserialize(body, Object.class); log.info("调用端的响应解码工作已完成:{}", requestId); return new OpenrpcResponse(requestId, responseCode, compressType, serializeType, result); } return null; } }
|
3.6响应结果处理器
最终兜兜转转,RPC远程调用响应终于到达最后一站了,还记得响应头中的请求ID字段吗,此时我们就可以根据请求ID判断此响应对于哪个请求,进而找到绑定的CompletableFuture对象,通过complete
方法设置响应结果,此时RPC调用方通过get
方法阻塞等待线程会被唤醒,完成了响应的异步通知
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class ResultProcessHandler extends SimpleChannelInboundHandler<OpenrpcResponse> { private static final Logger log = LoggerFactory.getLogger(ResultProcessHandler.class); @Override protected void channelRead0(ChannelHandlerContext ctx, OpenrpcResponse response) throws Exception { CompletableFuture<Object> completableFuture = OpenrpcBootStrap.pendingCache.remove(response.getRequestId()); if (response.getResponseCode() == ResponseCode.FAIL.getCode()) { completableFuture.complete(null); log.error("请求调用失败:{}", response.getRequestId()); } else if (response.getResponseCode() == ResponseCode.LIMITING.getCode()) { completableFuture.complete(null); log.error("请求被限流:{}", response.getRequestId()); } else if (response.getResponseCode() == ResponseCode.CLOSING.getCode()) { completableFuture.complete(null); String interfaceName = OpenrpcBootStrap.threadLocal.get().getRequestPayload().getInterfaceName(); Selector selector = OpenrpcBootStrap.configuration.getLoadBalancer().selector(interfaceName); selector.removeServiceAddress((InetSocketAddress) ctx.channel().remoteAddress()); log.info("服务端正在关闭,重试选取其他的服务节点:{}", response.getRequestId()); } else if (response.getResponseCode() == ResponseCode.SUCCESS.getCode()) { completableFuture.complete(response.getBody()); log.info("调用端的响应结果处理工作已完成:{}", response.getRequestId()); } } }
|