自研RPC框架之SPI插件化机制 1.启动类初始化 启动类初始化时会创建默认的Configuration对象,其中包含了一些默认的参数选项,同时还会顺序加载解析XML文件中的配置项和SPI加载插件,JDK默认的SPI机制会遍历加载所有的实现类,这样效率还是相对较低的,无法做到按需加载
,因此我们后面会借鉴Dubbo中的SPI改造自己的ServiceLoader。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 public class OpenrpcBootStrap { private static final Logger log = LoggerFactory.getLogger(OpenrpcBootStrap.class); private static final OpenrpcBootStrap openrpcBootStrap = new OpenrpcBootStrap (); public static Configuration configuration = new Configuration (); public static final Map<String, ServiceConfig> serviceList = new ConcurrentHashMap <>(16 ); OpenrpcStartupHook startupHook = new OpenrpcStartupHook () { @Override public void preheat () { OpenrpcStartupHook.super .preheat(); } }; public static final Map<InetSocketAddress, Channel> channelCache = new ConcurrentHashMap <>(16 ); public static final Map<Long, CompletableFuture<Object>> pendingCache = new ConcurrentHashMap <>(128 ); public static final ThreadLocal<OpenrpcRequest> threadLocal = new ThreadLocal <>(); private OpenrpcBootStrap () { } public static OpenrpcBootStrap getInstance () { return openrpcBootStrap; } public OpenrpcBootStrap application (String appName) { configuration.setAppName(appName); return this ; } public void limiter (Limiter limiter) { configuration.setLimiter(limiter); } public void setStartupHook (OpenrpcStartupHook startupHook) { this .startupHook = startupHook; } public OpenrpcBootStrap publish (ServiceConfig service) { configuration.getRegistry().register(service); serviceList.put(service.getInterface().getName(), service); return this ; } public OpenrpcBootStrap scanPackage (String packageName) { configuration.setPackageName(packageName); return this ; } public void start () { log.info("服务正在启动..." ); Runtime.getRuntime().addShutdownHook(new OpenrpcShutdownHook ()); log.info("添加关闭钩子" ); EventLoopGroup boss = new NioEventLoopGroup (); EventLoopGroup worker = new NioEventLoopGroup (); try { ServerBootstrap bootstrap = new ServerBootstrap (); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true ) .childOption(ChannelOption.SO_KEEPALIVE, true ) .option(ChannelOption.SO_BACKLOG, 128 ) .localAddress(configuration.getPort()) .childHandler(new ProviderChannelInitializer ()); ChannelFuture channelFuture = bootstrap.bind().sync(); startupHook.preheat(); registerService(configuration.getPackageName()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { try { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); } catch (InterruptedException e) { throw new RuntimeException (e); } } } private void registerService (String packageName) { log.info("服务已经启动完毕,开始进行服务注册" ); Set<Class<?>> classes = ClassScanner.scanPackageByAnnotation(packageName, OpenrpcService.class); for (Class<?> clazz : classes) { Class<?>[] interfaces = clazz.getInterfaces(); for (Class<?> anInterface : interfaces) { ServiceConfig serviceConfig = new ServiceConfig (); serviceConfig.setInterface(anInterface); serviceConfig.setWeight(clazz.getAnnotation(OpenrpcService.class).weight()); serviceConfig.setGroup(clazz.getAnnotation(OpenrpcService.class).group()); try { Object serviceImpl = clazz.getConstructor().newInstance(); serviceConfig.setRef(serviceImpl); } catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) { log.error("接口实现类的无参构造函数不存在" ); throw new RuntimeException (e); } publish(serviceConfig); log.info("服务注册成功:{}" , serviceConfig.getInterface().getName()); } } } public OpenrpcBootStrap reference (ReferenceConfig<?> reference) { reference.setRegistry(configuration.getRegistry()); return this ; } public OpenrpcBootStrap serializeProtocol (String serializeProtocol) { configuration.setSerializeProtocol(serializeProtocol); configuration.setSerializer(SerializerFactory.getSerializer(serializeProtocol)); return this ; } public OpenrpcBootStrap compressProtocol (String compressProtocol) { configuration.setCompressProtocol(compressProtocol); configuration.setCompressor(CompressorFactory.getCompressor(compressProtocol)); return this ; } }
2.配置解析过程 我们的自研RPC框架有四种配置声明方式,优先级从高到低分别是代码显式配置
、SPI配置
、XML配置
、默认配置
,其中应用名称默认为”default”,服务提供方端口号默认为8123,包扫描路径默认为空,注册中心默认为空,负载均衡策略默认为轮询式负载均衡,序列化协议默认是hession,压缩协议默认不开启,压缩阈值默认为0,分布式ID生成器的节点ID默认为空即使用主机MAC地址的低10位
,服务限流器默认不开启。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 @Data public class Configuration { private static final Logger log = LoggerFactory.getLogger(Configuration.class); private String appName = "default" ; private int port = 8123 ; private String packageName = "" ; private Registry registry; private RegistryConfig registryConfig; private String loadBalancerType = "roundRobin" ; private LoadBalancer loadBalancer = new RoundRobinLoadBalancer (); private String serializeProtocol = "hessian" ; private ObjectWrapper<Serializer> serializer = SerializerFactory.getSerializer(serializeProtocol); private String compressProtocol = "none" ; private ObjectWrapper<Compressor> compressor = CompressorFactory.getCompressor(compressProtocol); private int compressThreshold = 0 ; private IdWorker idWorker = new IdWorker (null ); private Limiter limiter = new NoneLimiter (); public Configuration () { XMLResolver xmlResolver = new XMLResolver (this ); xmlResolver.loadFromXML(); SPIResolver spiResolver = new SPIResolver (this ); spiResolver.loadFromSPI(); } }
3.XML配置加载 首先我们自定义了XML配置文件的元信息openrpc-config.dtd
,其中包含了XML配置文件中所有可能的配置选项和形式,完整的配置文件openrpc-config.xml
的内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 public void loadFromXML () { try { DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance(); factory.setValidating(false ); factory.setFeature("http://apache.org/xml/features/nonvalidating/load-external-dtd" , false ); DocumentBuilder builder = factory.newDocumentBuilder(); InputStream inputStream = ClassLoader.getSystemClassLoader().getResourceAsStream("openrpc-config.xml" ); Document doc = builder.parse(inputStream); XPathFactory xPathfactory = XPathFactory.newInstance(); XPath xpath = xPathfactory.newXPath(); configuration.setPort(resolvePort(doc, xpath)); configuration.setAppName(resolveAppName(doc, xpath)); configuration.setRegistryConfig(resolveRegistryConfig(doc, xpath)); configuration.setLoadBalancerType(resolveLoadBalancerType(doc, xpath)); configuration.setLoadBalancer(LoadBalancerFactory.getLoadBalancer(configuration.getLoadBalancerType())); configuration.setCompressProtocol(resolveCompressType(doc, xpath)); configuration.setCompressor(CompressorFactory.getCompressor(configuration.getCompressProtocol())); configuration.setCompressThreshold(resolveCompressThreshold(doc, xpath)); configuration.setSerializeProtocol(resolveSerializeType(doc, xpath)); configuration.setSerializer(SerializerFactory.getSerializer(configuration.getSerializeProtocol())); configuration.setLoadBalancer(resolveLoadBalancer(doc, xpath)); configuration.setIdWorker(resolveIdWorker(doc, xpath)); } catch (ParserConfigurationException | SAXException | IOException e) { log.error("解析过程中发生异常" , e); } }
上面涉及到的所有解析方法内部其实都是调用了三个XML配置解析的工具方法,分别是解析配置节点的内容(如<port>8080</port>中的8080)
、解析配置节点的属性值(如<serializeType type="hessian"/>中的type属性值)
、解析并实例化配置节点表示的类对象(如<loadBalancer class="io.github.hulingf.loadbalancer.impl.RoundRobinLoadBalancer"/>中的class属性表示的类对象)
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 private String parseString (Document doc, XPath xpath, String expression) { try { XPathExpression expr = xpath.compile(expression); Node targetNode = (Node) expr.evaluate(doc, XPathConstants.NODE); if (targetNode == null ) return null ; return targetNode.getTextContent(); } catch (XPathExpressionException e) { log.error("解析节点的内容时发生异常" , e); } return null ; } private String parseString (Document doc, XPath xpath, String expression, String attributeName) { try { XPathExpression expr = xpath.compile(expression); Node targetNode = (Node) expr.evaluate(doc, XPathConstants.NODE); if (targetNode == null ) return null ; Node item = targetNode.getAttributes().getNamedItem(attributeName); if (item == null ) return null ; return item.getNodeValue(); } catch (XPathExpressionException e) { log.error("解析节点的属性时发生异常" , e); } return null ; } private <T> T parseObject (Document doc, XPath xpath, String expression, Class<?>[] paramTypes, Object... paramValues) { try { XPathExpression expr = xpath.compile(expression); Node targetNode = (Node) expr.evaluate(doc, XPathConstants.NODE); if (targetNode == null ) return null ; Node item = targetNode.getAttributes().getNamedItem("class" ); if (item == null ) return null ; String className = item.getNodeValue(); Class<?> aClass = Class.forName(className); Object instant = null ; if (paramTypes == null ) { instant = aClass.getConstructor().newInstance(); } else { instant = aClass.getConstructor(paramTypes).newInstance(paramValues); } return (T) instant; } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException | XPathExpressionException e) { log.error("解析节点对应的类对象时发现异常" , e); } return null ; }
4.SPI配置加载 1 2 3 4 5 6 7 8 9 10 11 12 public void loadFromSPI () { try { ExtensionLoader<LoadBalancer> extensionLoader = ExtensionLoader.getExtensionLoader(LoadBalancer.class); LoadBalancer loadBalancer = extensionLoader.getExtension(configuration.getLoadBalancerType()); configuration.setLoadBalancer(loadBalancer); } catch (Exception e) { } }
借鉴Dubbo的SPI机制,我们设计了自己的服务加载器,可以实现按需加载指定插件,效率更高,大致流程就是根据负载均衡插件名称如random
从缓存cachedInstances
中查找实现类实例,如果发现不存在则需要加锁+双重检查
进行实现类单实例的创建;创建时首先从cachedClasses
(如果为空同样需要加锁+双重检查
创建)中根据指定负载均衡插件名称从META-INF/extensions/
目录下的SPI文件中查找其类对象,然后根据类对象从EXTENSION_INSTANCES
中获取对应实例,如果不存在则通过反射创建。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 public final class ExtensionLoader <T> { private static final Logger log = LoggerFactory.getLogger(ExtensionLoader.class); private static final String SERVICE_DIRECTORY = "META-INF/extensions/" ; private static final Map<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap <>(); private static final Map<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap <>(); private final Class<?> type; private final Map<String, Holder<Object>> cachedInstances = new ConcurrentHashMap <>(); private final Holder<Map<String, Class<?>>> cachedClasses = new Holder <>(); private ExtensionLoader (Class<?> type) { this .type = type; } public static <S> ExtensionLoader<S> getExtensionLoader (Class<S> type) { if (type == null ) { throw new IllegalArgumentException ("Extension type should not be null." ); } if (!type.isInterface()) { throw new IllegalArgumentException ("Extension type must be an interface." ); } if (type.getAnnotation(SPI.class) == null ) { throw new IllegalArgumentException ("Extension type must be annotated by @SPI" ); } ExtensionLoader<S> extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type); if (extensionLoader == null ) { EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader <S>(type)); extensionLoader = (ExtensionLoader<S>) EXTENSION_LOADERS.get(type); } return extensionLoader; } public T getExtension (String name) { if (StrUtil.isBlank(name)) { throw new IllegalArgumentException ("Extension name should not be null or empty." ); } Holder<Object> holder = cachedInstances.get(name); if (holder == null ) { cachedInstances.putIfAbsent(name, new Holder <>()); holder = cachedInstances.get(name); } Object instance = holder.get(); if (instance == null ) { synchronized (holder) { instance = holder.get(); if (instance == null ) { instance = createExtension(name); holder.set(instance); } } } return (T) instance; } private T createExtension (String name) { Class<?> clazz = getExtensionClasses().get(name); if (clazz == null ) { throw new RuntimeException ("No such extension of name " + name); } T instance = (T) EXTENSION_INSTANCES.get(clazz); if (instance == null ) { try { EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance()); instance = (T) EXTENSION_INSTANCES.get(clazz); } catch (Exception e) { log.error(e.getMessage()); } } return instance; } private Map<String, Class<?>> getExtensionClasses() { Map<String, Class<?>> classes = cachedClasses.get(); if (classes == null ) { synchronized (cachedClasses) { classes = cachedClasses.get(); if (classes == null ) { classes = new HashMap <>(); loadDirectory(classes); cachedClasses.set(classes); } } } return classes; } private void loadDirectory (Map<String, Class<?>> extensionClasses) { String fileName = ExtensionLoader.SERVICE_DIRECTORY + type.getName(); try { Enumeration<URL> urls; ClassLoader classLoader = ExtensionLoader.class.getClassLoader(); urls = classLoader.getResources(fileName); if (urls != null ) { while (urls.hasMoreElements()) { URL resourceUrl = urls.nextElement(); loadResource(extensionClasses, classLoader, resourceUrl); } } } catch (IOException e) { log.error(e.getMessage()); } } private void loadResource (Map<String, Class<?>> extensionClasses, ClassLoader classLoader, URL resourceUrl) { try (BufferedReader reader = new BufferedReader (new InputStreamReader (resourceUrl.openStream(), UTF_8))) { String line; while ((line = reader.readLine()) != null ) { final int ci = line.indexOf('#' ); if (ci >= 0 ) { line = line.substring(0 , ci); } line = line.trim(); if (line.length() > 0 ) { try { final int ei = line.indexOf('=' ); String name = line.substring(0 , ei).trim(); String clazzName = line.substring(ei + 1 ).trim(); if (name.length() > 0 && clazzName.length() > 0 ) { Class<?> clazz = classLoader.loadClass(clazzName); extensionClasses.put(name, clazz); } } catch (ClassNotFoundException e) { log.error(e.getMessage()); } } } } catch (IOException e) { log.error(e.getMessage()); } } }