自研RPC框架之异常/超时重试机制
1.重试注解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface OpenrpcRetry { int timeout() default 5000; int tryTimes() default 3; int initInterval() default 2000; int maxRetries() default 3; double backoffMultiplier() default 1.75; double jitterFactor() default 0.25; Class<? extends Throwable>[] whiteList() default {}; }
|
2.指数退避算法
我们的RPC框架可以针对调用超时和业务异常发起重试,用到的核心算法就是指数退避算法
,指数退避算法是一种错误恢复机制,广泛用于计算机网络、分布式系统、以及各种需要自适应重试逻辑
的场景中。这种算法通过动态调整重试等待时间
来减少冲突或负载,特别是在高并发环境下,可以有效地避免系统过载。
指数退避算法设定一个初始的重试间隔时间,在每次连续的失败尝试之后,将重试间隔时间乘以一个指数,这意味着每次失败后,等待时间将指数级增加。为了避免多个客户端或实例在相同的时间重试造成的冲突,通常会加入随机化因素,使得重试间隔有一定的随机性。并且,通常会设置一个最大重试间隔时间和最大尝试次数,以避免等待时间增长过长或无限重试。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Data @AllArgsConstructor @NoArgsConstructor @Builder public class ExponentialBackoffRetryPolicy { private int maxRetries; private int initialInterval; private double backoffMultiplier; private double jitterFactor;
public int getNextRetryInterval(int retryCount) { double backoff = initialInterval * Math.pow(backoffMultiplier, retryCount); double jitter = backoff * jitterFactor * (Math.random() * 2 - 1); return (int) (backoff + jitter); } }
|
3.异常和超时重试
对于有OpenrpcRetry
注解的方法,RPC会解析其中的重试参数,在RPC发起远程调用请求后如果超时时间
内没有获得响应,则判断是否需要重试,如果响应的结果是异常白名单
中的异常类型,或者响应的结果不是业务异常且没有超过最大重试次数
,那么会指数退避等待后发起重试
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.info("执行方法:{}", method.getName()); int timeout = 5000; int retryCount = 0; int intervalTime = 0; Random random = new Random(); OpenrpcRetry retryAnnotation = method.getAnnotation(OpenrpcRetry.class); ExponentialBackoffRetryPolicy retryPolicy = null; List<Class<? extends Throwable>> whiteList = null; if (retryAnnotation != null) { retryPolicy = ExponentialBackoffRetryPolicy.builder() .maxRetries(Math.min(retryAnnotation.tryTimes(), retryAnnotation.maxRetries())) .initialInterval(retryAnnotation.initInterval()) .backoffMultiplier(retryAnnotation.backoffMultiplier()) .jitterFactor(retryAnnotation.jitterFactor()) .build(); whiteList = Arrays.stream(retryAnnotation.whiteList()).toList(); intervalTime = retryPolicy.getInitialInterval(); timeout = retryAnnotation.timeout(); } else { retryPolicy = ExponentialBackoffRetryPolicy.builder() .maxRetries(0) .initialInterval(0) .backoffMultiplier(1) .jitterFactor(0) .build(); }
while(true) { try { RequestPayload payload = RequestPayload.builder() .interfaceName(interfaceRef.getName()) .methodName(method.getName()) .parametersType(method.getParameterTypes()) .parametersValue(args) .returnType(method.getReturnType()) .build(); OpenrpcRequest request = OpenrpcRequest.builder() .requestId(OpenrpcBootStrap.configuration.getIdWorker().nextId()) .requestType(RequestType.REQUEST.getId()) .compressType(OpenrpcBootStrap.configuration.getCompressor().getCode()) .serializeType(OpenrpcBootStrap.configuration.getSerializer().getCode()) .requestPayload(payload) .build();
OpenrpcBootStrap.threadLocal.set(request);
int tryCount = 3; Channel channel = null; while (tryCount > 0) { InetSocketAddress address = OpenrpcBootStrap.configuration.getLoadBalancer().selectServiceAddress(interfaceRef.getName() + group); log.info("发现可用服务:{},{},{}", interfaceRef.getName(), method.getName(), address); channel = getAvaliableChannel(address); if (channel.isActive()) break; LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100)); tryCount--; } if (tryCount == 0) { throw new NetworkException("远程服务暂时不可用"); }
OpenrpcBootStrap.threadLocal.remove();
CompletableFuture<Object> completableFuture = new CompletableFuture<>(); OpenrpcBootStrap.pendingCache.put(request.getRequestId(), completableFuture);
channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { completableFuture.completeExceptionally(channelFuture.cause()); } } });
Object result = completableFuture.get(timeout, TimeUnit.MILLISECONDS); if (result instanceof InvocationTargetException exception) { throw exception; } if (result == null) { throw new RequestFailException("远程调用失败,继续重试:" + retryCount); } return result; } catch (Exception e) { if (e instanceof InvocationTargetException exception) { if (whiteList == null || whiteList.isEmpty()) throw exception.getTargetException(); boolean isWhite = false; for (Class<? extends Throwable> clazz : whiteList) { if (clazz.isInstance(exception)) { isWhite = true; break; } } if (!isWhite) throw exception.getTargetException(); } retryCount++; if (retryCount > retryPolicy.getMaxRetries()) { log.error("远程调用失败:", e); break; } LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(intervalTime)); intervalTime = retryPolicy.getNextRetryInterval(retryCount); } } throw new RequestFailException("远程调用失败"); }
|