自研RPC框架之异常/超时重试机制

自研RPC框架之异常/超时重试机制

1.重试注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 只能作用于[幂等性]接口上
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface OpenrpcRetry {
// 超时时间
int timeout() default 5000;
// 重试次数
int tryTimes() default 3;
// 初始重试间隔
int initInterval() default 2000;
// 最大重试次数
int maxRetries() default 3;
// 退避系数
double backoffMultiplier() default 1.75;
// 抖动因子
double jitterFactor() default 0.25;
// 异常白名单
Class<? extends Throwable>[] whiteList() default {};
}

2.指数退避算法

我们的RPC框架可以针对调用超时和业务异常发起重试,用到的核心算法就是指数退避算法,指数退避算法是一种错误恢复机制,广泛用于计算机网络、分布式系统、以及各种需要自适应重试逻辑的场景中。这种算法通过动态调整重试等待时间来减少冲突或负载,特别是在高并发环境下,可以有效地避免系统过载。

指数退避算法设定一个初始的重试间隔时间,在每次连续的失败尝试之后,将重试间隔时间乘以一个指数,这意味着每次失败后,等待时间将指数级增加。为了避免多个客户端或实例在相同的时间重试造成的冲突,通常会加入随机化因素,使得重试间隔有一定的随机性。并且,通常会设置一个最大重试间隔时间和最大尝试次数,以避免等待时间增长过长或无限重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class ExponentialBackoffRetryPolicy {
// 最大重试次数
private int maxRetries;
// 初始重试间隔
private int initialInterval;
// 退避系数
private double backoffMultiplier;
// 抖动因子
private double jitterFactor;

// 获取下一次重试的间隔时间
public int getNextRetryInterval(int retryCount) {
// 计算指数退避的时间间隔
double backoff = initialInterval * Math.pow(backoffMultiplier, retryCount);
// 计算抖动时间间隔
double jitter = backoff * jitterFactor * (Math.random() * 2 - 1);
// 返回指数退避和抖动的总和
return (int) (backoff + jitter);
}
}

3.异常和超时重试

对于有OpenrpcRetry注解的方法,RPC会解析其中的重试参数,在RPC发起远程调用请求后如果超时时间内没有获得响应,则判断是否需要重试,如果响应的结果是异常白名单中的异常类型,或者响应的结果不是业务异常且没有超过最大重试次数,那么会指数退避等待后发起重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
log.info("执行方法:{}", method.getName());
// 超时时间默认5000ms
int timeout = 5000;
// 当前重试次数默认0次
int retryCount = 0;
// 重试间隔时长默认0ms
int intervalTime = 0;
Random random = new Random();
OpenrpcRetry retryAnnotation = method.getAnnotation(OpenrpcRetry.class);
// 根据@OpenrpcRetry注解构建重试策略
ExponentialBackoffRetryPolicy retryPolicy = null;
// 根据@OpenrpcRetry注解获取异常白名单列表
List<Class<? extends Throwable>> whiteList = null;
if (retryAnnotation != null) {
retryPolicy = ExponentialBackoffRetryPolicy.builder()
.maxRetries(Math.min(retryAnnotation.tryTimes(), retryAnnotation.maxRetries()))
.initialInterval(retryAnnotation.initInterval())
.backoffMultiplier(retryAnnotation.backoffMultiplier())
.jitterFactor(retryAnnotation.jitterFactor())
.build();
whiteList = Arrays.stream(retryAnnotation.whiteList()).toList();
intervalTime = retryPolicy.getInitialInterval();
timeout = retryAnnotation.timeout();
} else {
retryPolicy = ExponentialBackoffRetryPolicy.builder()
.maxRetries(0)
.initialInterval(0)
.backoffMultiplier(1)
.jitterFactor(0)
.build();
}

// 超时/异常重试机制
while(true) {
try {
// 1.封装报文
RequestPayload payload = RequestPayload.builder()
.interfaceName(interfaceRef.getName())
.methodName(method.getName())
.parametersType(method.getParameterTypes())
.parametersValue(args)
.returnType(method.getReturnType())
.build();
OpenrpcRequest request = OpenrpcRequest.builder()
.requestId(OpenrpcBootStrap.configuration.getIdWorker().nextId())
.requestType(RequestType.REQUEST.getId())
.compressType(OpenrpcBootStrap.configuration.getCompressor().getCode())
.serializeType(OpenrpcBootStrap.configuration.getSerializer().getCode())
.requestPayload(payload)
.build();

// 2.本地线程变量保存请求对象
OpenrpcBootStrap.threadLocal.set(request);

int tryCount = 3;
Channel channel = null;
while (tryCount > 0) {
// 3.注册中心拉取服务列表,负载均衡器获取可用服务地址
InetSocketAddress address = OpenrpcBootStrap.configuration.getLoadBalancer().selectServiceAddress(interfaceRef.getName() + group);
log.info("发现可用服务:{},{},{}", interfaceRef.getName(), method.getName(), address);
// 4.从缓存中获取channel
channel = getAvaliableChannel(address);
// 5.选中下线服务的无效channel,等待重试
if (channel.isActive()) break;
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100));
tryCount--;
}
if (tryCount == 0) {
throw new NetworkException("远程服务暂时不可用");
}

// 6.删除本地线程变量的请求对象,防止内存泄漏
OpenrpcBootStrap.threadLocal.remove();

// 7.异步发送消息
CompletableFuture<Object> completableFuture = new CompletableFuture<>();
OpenrpcBootStrap.pendingCache.put(request.getRequestId(), completableFuture);

channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 发送完成后触发监听器,检测是否发送成功
if (!channelFuture.isSuccess()) {
completableFuture.completeExceptionally(channelFuture.cause());
}
}
});

// 8.返回远程调用的执行结果
Object result = completableFuture.get(timeout, TimeUnit.MILLISECONDS);
// 9.服务调用端执行异常,抛出InvocationTargetException,异常重试机制会检查业务异常白名单
if (result instanceof InvocationTargetException exception) {
throw exception;
}
if (result == null) {
throw new RequestFailException("远程调用失败,继续重试:" + retryCount);
}
return result;
} catch (Exception e) {
// 检查自定义的异常白名单
if (e instanceof InvocationTargetException exception) {
// 白名单为空,直接抛出业务异常
if (whiteList == null || whiteList.isEmpty()) throw exception.getTargetException();
boolean isWhite = false;
// 检查白名单列表
for (Class<? extends Throwable> clazz : whiteList) {
if (clazz.isInstance(exception)) {
isWhite = true;
break;
}
}
if (!isWhite) throw exception.getTargetException();
}
retryCount++;
if (retryCount > retryPolicy.getMaxRetries()) {
log.error("远程调用失败:", e);
break;
}
// 8.等待intervalTime时间后重试请求,随机值可以缓解重试风暴问题
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(intervalTime));
intervalTime = retryPolicy.getNextRetryInterval(retryCount);
}
}
throw new RequestFailException("远程调用失败");
}