自研RPC框架之SPI插件化机制

自研RPC框架之SPI插件化机制

1.启动类初始化

启动类初始化时会创建默认的Configuration对象,其中包含了一些默认的参数选项,同时还会顺序加载解析XML文件中的配置项和SPI加载插件,JDK默认的SPI机制会遍历加载所有的实现类,这样效率还是相对较低的,无法做到按需加载,因此我们后面会借鉴Dubbo中的SPI改造自己的ServiceLoader。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
public class OpenrpcBootStrap {
private static final Logger log = LoggerFactory.getLogger(OpenrpcBootStrap.class);
// 静态单例对象
private static final OpenrpcBootStrap openrpcBootStrap = new OpenrpcBootStrap();

/*****************************服务提供方相关字段*****************************/

// 全局默认配置
public static Configuration configuration = new Configuration();
// 维护服务提供者发布的服务列表映射
public static final Map<String, ServiceConfig> serviceList = new ConcurrentHashMap<>(16);
// 启动预热钩子方法
OpenrpcStartupHook startupHook = new OpenrpcStartupHook() {
@Override
public void preheat() {
OpenrpcStartupHook.super.preheat();
}
};

/*****************************服务调用方相关字段*****************************/

// 维护服务调用方需要的Netty连接Channel
public static final Map<InetSocketAddress, Channel> channelCache = new ConcurrentHashMap<>(16);
// 维护服务调用方的异步结果
public static final Map<Long, CompletableFuture<Object>> pendingCache = new ConcurrentHashMap<>(128);
// 维护请求的本地线程变量
public static final ThreadLocal<OpenrpcRequest> threadLocal = new ThreadLocal<>();

/*****************************公用的一些方法*****************************/

// 私有构造
private OpenrpcBootStrap() {
}

public static OpenrpcBootStrap getInstance() {
return openrpcBootStrap;
}

public OpenrpcBootStrap application(String appName) {
configuration.setAppName(appName);
return this;
}

/*****************************服务提供方相关方法*****************************/

public void limiter(Limiter limiter) {
configuration.setLimiter(limiter);
}

public void setStartupHook(OpenrpcStartupHook startupHook) {
this.startupHook = startupHook;
}

public OpenrpcBootStrap publish(ServiceConfig service) {
configuration.getRegistry().register(service);
serviceList.put(service.getInterface().getName(), service);
return this;
}

public OpenrpcBootStrap scanPackage(String packageName) {
configuration.setPackageName(packageName);
return this;
}

public void start() {
log.info("服务正在启动...");
// 注册关闭服务端应用程序的钩子函数
Runtime.getRuntime().addShutdownHook(new OpenrpcShutdownHook());
log.info("添加关闭钩子");

// Netty的Reactor线程池,初始化了一个NioEventLoop数组,用来处理I/O操作,如接受新的连接和读写数据
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
// TCP默认开启了Nagle算法,该算法的作用是尽可能的发送大数据块,减少网络传输。TCP_NODELAY参数的作用就是控制是否启用Nagle算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启TCP底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
// 表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.localAddress(configuration.getPort())
.childHandler(new ProviderChannelInitializer());
ChannelFuture channelFuture = bootstrap.bind().sync();

// 启动预热钩子,用户可以模拟调用和启动初始化缓存等工作
startupHook.preheat();

// 服务已经完全启动,开始进行服务注册
registerService(configuration.getPackageName());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
try {
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

private void registerService(String packageName) {
// 延迟暴露
log.info("服务已经启动完毕,开始进行服务注册");
// 扫描指定路径下的所有包含OpenrpcService注解的实现类
Set<Class<?>> classes = ClassScanner.scanPackageByAnnotation(packageName, OpenrpcService.class);
// 服务注册
for (Class<?> clazz : classes) {
Class<?>[] interfaces = clazz.getInterfaces();
for (Class<?> anInterface : interfaces) {
ServiceConfig serviceConfig = new ServiceConfig();
serviceConfig.setInterface(anInterface);
serviceConfig.setWeight(clazz.getAnnotation(OpenrpcService.class).weight());
serviceConfig.setGroup(clazz.getAnnotation(OpenrpcService.class).group());
try {
Object serviceImpl = clazz.getConstructor().newInstance();
serviceConfig.setRef(serviceImpl);
} catch (InstantiationException | IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
log.error("接口实现类的无参构造函数不存在");
throw new RuntimeException(e);
}
publish(serviceConfig);
log.info("服务注册成功:{}", serviceConfig.getInterface().getName());
}
}
}

/*****************************服务调用方相关方法*****************************/

public OpenrpcBootStrap reference(ReferenceConfig<?> reference) {
reference.setRegistry(configuration.getRegistry());
return this;
}

public OpenrpcBootStrap serializeProtocol(String serializeProtocol) {
configuration.setSerializeProtocol(serializeProtocol);
configuration.setSerializer(SerializerFactory.getSerializer(serializeProtocol));
return this;
}

public OpenrpcBootStrap compressProtocol(String compressProtocol) {
configuration.setCompressProtocol(compressProtocol);
configuration.setCompressor(CompressorFactory.getCompressor(compressProtocol));
return this;
}
}

2.配置解析过程

我们的自研RPC框架有四种配置声明方式,优先级从高到低分别是代码显式配置SPI配置XML配置默认配置,其中应用名称默认为”default”,服务提供方端口号默认为8123,包扫描路径默认为空,注册中心默认为空,负载均衡策略默认为轮询式负载均衡,序列化协议默认是hession,压缩协议默认不开启,压缩阈值默认为0,分布式ID生成器的节点ID默认为空即使用主机MAC地址的低10位,服务限流器默认不开启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* 全局配置信息
* 优先级:代码显式声明 > SPI配置 > XML配置 > 默认配置
*/
@Data
public class Configuration {
private static final Logger log = LoggerFactory.getLogger(Configuration.class);
// 默认应用名称
private String appName = "default";
// 默认端口号配置
private int port = 8123;
// 默认包扫描路径
private String packageName = "";
// 注册中心配置
private Registry registry;
private RegistryConfig registryConfig;
// 默认负载均衡器配置
private String loadBalancerType = "roundRobin";
private LoadBalancer loadBalancer = new RoundRobinLoadBalancer();
// 默认序列化协议配置
private String serializeProtocol = "hessian";
private ObjectWrapper<Serializer> serializer = SerializerFactory.getSerializer(serializeProtocol);
// 默认压缩协议配置
private String compressProtocol = "none";
private ObjectWrapper<Compressor> compressor = CompressorFactory.getCompressor(compressProtocol);
// 默认压缩阈值
private int compressThreshold = 0;
// 默认分布式ID生成器
private IdWorker idWorker = new IdWorker(null);
// 默认服务限流器配置
private Limiter limiter = new NoneLimiter();

public Configuration() {
// 1.默认配置

// 2.读取XML配置文件的配置信息
XMLResolver xmlResolver = new XMLResolver(this);
xmlResolver.loadFromXML();

// 3.SPI机制发现相关配置
SPIResolver spiResolver = new SPIResolver(this);
spiResolver.loadFromSPI();

// 4.代码显式配置
}

}

3.XML配置加载

首先我们自定义了XML配置文件的元信息openrpc-config.dtd,其中包含了XML配置文件中所有可能的配置选项和形式,完整的配置文件openrpc-config.xml的内容如下:

image-20240209225707312

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public void loadFromXML() {
try {
// 创建文档对象
DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
// 禁用DTD校验
factory.setValidating(false);
// 禁用外部实体解析
factory.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd", false);
// 读取XML配置文件的配置项
DocumentBuilder builder = factory.newDocumentBuilder();
InputStream inputStream = ClassLoader.getSystemClassLoader().getResourceAsStream("openrpc-config.xml");
Document doc = builder.parse(inputStream);
// 获取Xpath解析器
XPathFactory xPathfactory = XPathFactory.newInstance();
XPath xpath = xPathfactory.newXPath();
// 解析端口号
configuration.setPort(resolvePort(doc, xpath));
// 解析应用名称
configuration.setAppName(resolveAppName(doc, xpath));
// 解析注册中心
configuration.setRegistryConfig(resolveRegistryConfig(doc, xpath));
// 解析负载均衡器
configuration.setLoadBalancerType(resolveLoadBalancerType(doc, xpath));
configuration.setLoadBalancer(LoadBalancerFactory.getLoadBalancer(configuration.getLoadBalancerType()));
// 解析压缩类型
configuration.setCompressProtocol(resolveCompressType(doc, xpath));
configuration.setCompressor(CompressorFactory.getCompressor(configuration.getCompressProtocol()));
// 解析压缩阈值
configuration.setCompressThreshold(resolveCompressThreshold(doc, xpath));
// 解析序列化类型
configuration.setSerializeProtocol(resolveSerializeType(doc, xpath));
configuration.setSerializer(SerializerFactory.getSerializer(configuration.getSerializeProtocol()));
// 解析自定义负载均衡器
configuration.setLoadBalancer(resolveLoadBalancer(doc, xpath));
// 解析ID生成器
configuration.setIdWorker(resolveIdWorker(doc, xpath));
} catch (ParserConfigurationException | SAXException | IOException e) {
log.error("解析过程中发生异常", e);
}
}

上面涉及到的所有解析方法内部其实都是调用了三个XML配置解析的工具方法,分别是解析配置节点的内容(如<port>8080</port>中的8080)解析配置节点的属性值(如<serializeType type="hessian"/>中的type属性值)解析并实例化配置节点表示的类对象(如<loadBalancer class="io.github.hulingf.loadbalancer.impl.RoundRobinLoadBalancer"/>中的class属性表示的类对象)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private String parseString(Document doc, XPath xpath, String expression) {
try {
XPathExpression expr = xpath.compile(expression);
Node targetNode = (Node) expr.evaluate(doc, XPathConstants.NODE);
if (targetNode == null) return null;
return targetNode.getTextContent();
} catch (XPathExpressionException e) {
log.error("解析节点的内容时发生异常", e);
}
return null;
}

private String parseString(Document doc, XPath xpath, String expression, String attributeName) {
try {
XPathExpression expr = xpath.compile(expression);
Node targetNode = (Node) expr.evaluate(doc, XPathConstants.NODE);
if (targetNode == null) return null;
Node item = targetNode.getAttributes().getNamedItem(attributeName);
if (item == null) return null;
return item.getNodeValue();
} catch (XPathExpressionException e) {
log.error("解析节点的属性时发生异常", e);
}
return null;
}

private <T> T parseObject(Document doc, XPath xpath, String expression, Class<?>[] paramTypes, Object... paramValues) {
try {
XPathExpression expr = xpath.compile(expression);
Node targetNode = (Node) expr.evaluate(doc, XPathConstants.NODE);
if (targetNode == null) return null;
Node item = targetNode.getAttributes().getNamedItem("class");
if (item == null) return null;
String className = item.getNodeValue();
Class<?> aClass = Class.forName(className);
Object instant = null;
if (paramTypes == null) {
instant = aClass.getConstructor().newInstance();
} else {
instant = aClass.getConstructor(paramTypes).newInstance(paramValues);
}
return (T) instant;
} catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException | XPathExpressionException e) {
log.error("解析节点对应的类对象时发现异常", e);
}
return null;
}

4.SPI配置加载

1
2
3
4
5
6
7
8
9
10
11
12
public void loadFromSPI() {
try {
// 加载负载均衡器插件
ExtensionLoader<LoadBalancer> extensionLoader = ExtensionLoader.getExtensionLoader(LoadBalancer.class);
LoadBalancer loadBalancer = extensionLoader.getExtension(configuration.getLoadBalancerType());
configuration.setLoadBalancer(loadBalancer);
// 加载服务限流器插件,逻辑跟上面类似,省略
// ...
} catch (Exception e) {

}
}

借鉴Dubbo的SPI机制,我们设计了自己的服务加载器,可以实现按需加载指定插件,效率更高,大致流程就是根据负载均衡插件名称如random从缓存cachedInstances中查找实现类实例,如果发现不存在则需要加锁+双重检查进行实现类单实例的创建;创建时首先从cachedClasses(如果为空同样需要加锁+双重检查创建)中根据指定负载均衡插件名称从META-INF/extensions/目录下的SPI文件中查找其类对象,然后根据类对象从EXTENSION_INSTANCES中获取对应实例,如果不存在则通过反射创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
public final class ExtensionLoader<T> {
private static final Logger log = LoggerFactory.getLogger(ExtensionLoader.class);
// SPI配置基础目录
private static final String SERVICE_DIRECTORY = "META-INF/extensions/";
// 每个接口类型对应的扩展加载器
private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();
// 接口实现类对应的实例对象缓存
private static final Map<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();
// 接口类型
private final Class<?> type;
// 接口实现类实例的缓存(SPI文件的内容类似于random=com.huling.RandomLoadBalancer)
private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<>();
// 接口实现类的缓存,会查看SPI目录下的所有接口实现类
private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<>();

private ExtensionLoader(Class<?> type) {
this.type = type;
}

public static <S> ExtensionLoader<S> getExtensionLoader(Class<S> type) {
// 接口类型不能为空
if (type == null) {
throw new IllegalArgumentException("Extension type should not be null.");
}
// 接口类型保证
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type must be an interface.");
}
// 接口类型需要标注有@SPI注解
if (type.getAnnotation(SPI.class) == null) {
throw new IllegalArgumentException("Extension type must be annotated by @SPI");
}
// firstly get from cache, if not hit, create one
ExtensionLoader<S> extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type);
if (extensionLoader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<S>(type));
extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type);
}
return extensionLoader;
}

public T getExtension(String name) {
if (StrUtil.isBlank(name)) {
throw new IllegalArgumentException("Extension name should not be null or empty.");
}
// firstly get from cache, if not hit, create one
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<>());
holder = cachedInstances.get(name);
}
// create a singleton if no instance exists
Object instance = holder.get();
if (instance == null) {
// 锁的是接口实现类的holder实例
// 多实例间的并发加锁可以同时进行加快多实例的生成,单个实例的并发加锁只会允许一个实例的生成
synchronized (holder) {
// 双重检查的单例实例生成,holder内部维护的实例使用volatile修饰
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}

private T createExtension(String name) {
// load all extension classes of type T from file and get specific one by name
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw new RuntimeException("No such extension of name " + name);
}
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
try {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
} catch (Exception e) {
log.error(e.getMessage());
}
}
return instance;
}

private Map<String, Class<?>> getExtensionClasses() {
// get the loaded extension class from the cache
Map<String, Class<?>> classes = cachedClasses.get();
// double check
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = new HashMap<>();
// load all extensions from our extensions directory
loadDirectory(classes);
cachedClasses.set(classes);
}
}
}
return classes;
}

private void loadDirectory(Map<String, Class<?>> extensionClasses) {
// 拼接SPI基础目录+接口名称获取SPI文件的路径
String fileName = ExtensionLoader.SERVICE_DIRECTORY + type.getName();
try {
Enumeration<URL> urls;
ClassLoader classLoader = ExtensionLoader.class.getClassLoader();
urls = classLoader.getResources(fileName);
if (urls != null) {
while (urls.hasMoreElements()) {
URL resourceUrl = urls.nextElement();
loadResource(extensionClasses, classLoader, resourceUrl);
}
}
} catch (IOException e) {
log.error(e.getMessage());
}
}

private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader, URL resourceUrl) {
try (BufferedReader reader = new BufferedReader(new InputStreamReader(resourceUrl.openStream(), UTF_8))) {
String line;
// read every line
while ((line = reader.readLine()) != null) {
// get index of comment
final int ci = line.indexOf('#');
if (ci >= 0) {
// string after # is comment so we ignore it
line = line.substring(0, ci);
}
line = line.trim();
if (line.length() > 0) {
try {
final int ei = line.indexOf('=');
String name = line.substring(0, ei).trim();
String clazzName = line.substring(ei + 1).trim();
// our SPI use key-value pair so both of them must not be empty
if (name.length() > 0 && clazzName.length() > 0) {
Class<?> clazz = classLoader.loadClass(clazzName);
extensionClasses.put(name, clazz);
}
} catch (ClassNotFoundException e) {
log.error(e.getMessage());
}
}

}
} catch (IOException e) {
log.error(e.getMessage());
}
}
}