Dubbo基础知识点
Dubbo 架构简介
下图展示了 Dubbo 核心架构:
Dubbo 核心架构图
- Registry:注册中心。 负责服务地址的注册与查找,服务的 Provider 和 Consumer 只在启动时 与注册中心交互。注册中心通过长连接感知 Provider 的存在,在 Provider 出现宕机的时候,注册 中心会立即推送相关事件通知 Consumer。
- Provider:服务提供者。 在它启动的时候,会向 Registry 进行注册操作,将自己服务的地址和相 关配置信息封装成 URL 添加到 ZooKeeper 中。
- Consumer:服务消费者。 在它启动的时候,会向 Registry 进行订阅操作。订阅操作会从 ZooKeeper 中获取 Provider 注册的 URL,并在 ZooKeeper 中添加相应的监听器。获取到 Provider URL 之后,Consumer 会根据负载均衡算法从多个 Provider 中选择一个 Provider 并与 其建立连接,最后发起对 Provider 的 RPC 调用。 如果 Provider URL 发生变更,Consumer 将会 通过之前订阅过程中在注册中心添加的监听器,获取到最新的 Provider URL 信息,进行相应的调 整,比如断开与宕机 Provider 的连接,并与新的 Provider 建立连接。Consumer 与 Provider 建 立的是长连接,且 Consumer 会缓存 Provider 信息,所以一旦连接建立,即使注册中心宕机,也 不会影响已运行的 Provider 和 Consumer。
- Monitor:监控中心。 用于统计服务的调用次数和调用时间。Provider 和 Consumer 在运行过程 中,会在内存中统计调用次数和调用时间,定时每分钟发送一次统计数据到监控中心。监控中心在 上面的架构图中并不是必要角色,监控中心宕机不会影响 Provider、Consumer 以及 Registry 的 功能,只会丢失监控数据而已。
搭建 Dubbo 源码环境
https://github.com/apache/dubboFork 到自己的仓库,直接执行下面的命令去下载代码:
git clone git@github.com:xxxxxxxx/dubbo.git
然后切换分支,因为目前最新的是 Dubbo 2.7.7 版本,所以这里我们就用这个新版本:
git checkout -b dubbo-2.7.7 dubbo-2.7.7
接下来,执行 mvn 命令进行编译:
mvn clean install -Dmaven.test.skip=true
最后,执行下面的命令转换成 IDEA 项目: 然后,在 IDEA 中导入源码,因为这个导入过程中会下载所需的依赖包,所以会耗费点时间。
Dubbo 源码核心模块
在 IDEA 成功导入 Dubbo 源码之后,你看到的项目结构如下图所示:
下面我们就来简单介绍一下这些核心模块的功能
dubbo-common 模块: Dubbo 的一个公共模块,其中有很多工具类以及公共逻辑,例如 Dubbo SPI 实现、时间轮实现、动态编译器等。
dubbo-remoting 模块: Dubbo 的远程通信模块,其中的子模块依赖各种开源组件实现远程通 信。在 dubbo-remoting-api 子模块中定义该模块的抽象概念,在其他子模块中依赖其他开源组件 进行实现,例如,dubbo-remoting-netty4 子模块依赖 Netty 4 实现远程通信,dubbo-remoting-zookeeper 通过 Apache Curator 实现与 ZooKeeper 集群的交互。
dubbo-rpc 模块: Dubbo 中对远程调用协议进行抽象的模块,其中抽象了各种协议,依赖于 dubbo-remoting 模块的远程调用功能。dubbo-rpc-api 子模块是核心抽象,其他子模块是针对具 体协议的实现,例如,dubbo-rpc-dubbo 子模块是对 Dubbo 协议的实现,依赖了 dubboremoting-netty4 等 dubbo-remoting 子模块。 dubbo-rpc 模块的实现中只包含一对一的调用, 不关心集群的相关内容。
dubbo-cluster 模块: Dubbo 中负责管理集群的模块,提供了负载均衡、容错、路由等一系列集 群相关的功能,最终的目的是将多个 Provider 伪装为一个 Provider,这样 Consumer 就可以像调 用一个 Provider 那样调用 Provider 集群了。
dubbo-registry 模块: Dubbo 中负责与多种开源注册中心进行交互的模块,提供注册中心的能 力。其中, dubbo-registry-api 子模块是顶层抽象,其他子模块是针对具体开源注册中心组件的 具体实现,例如,dubbo-registry-zookeeper 子模块是 Dubbo 接入 ZooKeeper 的具体实现。
dubbo-monitor 模块: Dubbo 的监控模块,主要用于统计服务调用次数、调用时间以及实现调 用链跟踪的服务。
dubbo-config 模块: Dubbo对外暴露的配置都是由该模块进行解析的。例如,dubbo-config-api 子模块负责处理 API 方式使用时的相关配置,dubbo-config-spring 子模块负责处理与 Spring集成使用时的相关配置方式。有了 dubbo-config模块,用户只需要了解Dubbo配置的规则即可,无须了解Dubbo 内部的细节。
dubbo-metadata 模块: Dubbo 的元数据模块 dubbo-metadata 模块的实现套路也是有一个 api 子模块进行抽象,然后其他子模块进行具体实 现。
dubbo-configcenter 模块: Dubbo 的动态配置模块,主要负责外部化配置以及服务治理规则的 存储与通知,提供了多个子模块用来接入多种开源的服务发现组件。
Dubbo 源码中的 Demo 示例
在 Dubbo 源码中我们可以看到一个 dubbo-demo 模块,共包括三个非常基础 的 Dubbo 示例项目,分 别是: 使用 XML 配置的 Demo 示例、使用注解配置的 Demo 示例 以及 直接使用 API 的 Demo 示例 。下面我们将从这三个示例的角度,简单介绍 Dubbo 的基本使用。同时,这三个项目也将作为后续 Debug Dubbo 源码的入口,我们会根据需要在其之上进行修改 。不过在这儿之前,你需要先启动 ZooKeeper 作为注册中心,然后编写一个业务接口作为 Provider 和 Consumer 的公约。
启动 ZooKeeper
在前面 Dubbo 的架构图中,你可以看到 Provider 的地址以及配置信息是通过注册中心传递给 Consumer 的。 Dubbo 支持的注册中心尽管有很多, 但在生产环境中, 基本都是用 ZooKeeper 作为 注册中心 。因此,在调试 Dubbo 源码时,自然需要在本地启动 ZooKeeper。
业务接口
在使用 Dubbo 之前,你还需要一个业务接口,这个业务接口可以认为是 Dubbo Provider 和 Dubbo Consumer 的公约,反映出很多信息:
- Provider ,如何提供服务、提供的服务名称是什么、需要接收什么参数、需要返回什么响应;
- Consumer ,如何使用服务、使用的服务名称是什么、需要传入什么参数、会得到什么响应。
dubbo-demo-interface 模块就是定义业务接口的地方,如下图所示:
其中,DemoService 接口中定义了两个方法:
public interface DemoService {
String sayHello(String name);
default CompletableFuture<String> sayHelloAsync(String name) {
return CompletableFuture.completedFuture(sayHello(name));
}
}
Demo 1:基于 XML 配置
在 dubbo-demo 模块下的 dubbo-demo-xml 模块,提供了基于 Spring XML 的 Provider 和 Consumer。 我们先来看 dubbo-demo-xml-provider 模块,其结构如下图所示:
在其 pom.xml 中除了一堆 dubbo 的依赖之外,还有依赖了 DemoService 这个公共接口:
<dependency>
<groupId>org.apache.dubbo</groupId>
<artifactId>dubbo-demo-interface</artifactId>
<version>${project.parent.version}</version>
</dependency>
DemoServiceImpl 实现了 DemoService 接口,sayHello() 方法直接返回一个字符串,sayHelloAsync() 方法返回一个 CompletableFuture 对象。 在 dubbo-provider.xml 配置文件中,会将 DemoServiceImpl 配置成一个 Spring Bean,并作为 DemoService 服务暴露出去:
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/
还有就是指定注册中心地址(就是前面 ZooKeeper 的地址),这样 Dubbo 才能把暴露的 DemoService 服务注册到 ZooKeeper 中:
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
最后,在 Application 中写个 main() 方法,指定 Spring 配置文件并启动 ClassPathXmlApplicationContext 即可。 接下来再看 dubbo-demo-xml-consumer 模块,结构如下图所示:
在 pom.xml 中同样依赖了 dubbo-demo-interface 这个公共模块。
在 dubbo-consumer.xml 配置文件中,会指定注册中心地址(就是前面 ZooKeeper 的地址),这样 Dubbo 才能从 ZooKeeper 中拉取到 Provider 暴露的服务列表信息:
<dubbo:registry address="zookeeper://127.0.0.1:2181"/>
还会使用 dubbo:reference 引入 DemoService 服务,后面可以作为 Spring Bean 使用:
<dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService"/>
最后,在 Application 中写个 main() 方法,指定 Spring 配置文件并启动 ClassPathXmlApplicationContext 之后,就可以远程调用 Provider 端的 DemoService 的 sayHello() 方法了。
Demo 2:基于注解配置
dubbo-demo-annotation 模块是基于 Spring 注解配置的示例,无非就是将 XML 的那些配置信息转移 到了注解上。
我们先来看 dubbo-demo-annotation-provider 这个示例模块:
public class Application {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
context.start();
System.in.read();
}
@Configuration
@EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.provider")
@PropertySource("classpath:/spring/dubbo-provider.properties")
static class ProviderConfiguration {
@Bean
public RegistryConfig registryConfig() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
return registryConfig;
}
}
}
这里,同样会有一个 DemoServiceImpl 实现了 DemoService 接口,并且在 org.apache.dubbo.demo.provider 目录下,能被扫描到,暴露成 Dubbo 服务。
接着再来看 dubbo-demo-annotation-consumer 模块,其中 Application 中也是通过 AnnotationConfigApplicationContext 初始化 Spring 容器,也会扫描指定目录下的 Bean,会扫到 DemoServiceComponent 这个 Bean,其中就通过 @Reference 注解注入 Dubbo 服务相关的 Bean:
@Component("demoServiceComponent")
public class DemoServiceComponent implements DemoService {
@Reference
private DemoService demoService;
@Override
public String sayHello(String name) {
return demoService.sayHello(name);
}
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return null;
}
}
Demo 3:基于 API 配置
在有的场景中,不能依赖于 Spring 框架,只能使用 API 来构建 Dubbo Provider 和 Consumer,比较 典型的一种场景就是在写 SDK 的时候。 先来看 dubbo-demo-api-provider 模块,其中 Application.main() 方法是入口:
public class Application {
public static void main(String[] args) throws Exception {
if (isClassic(args)) {
startWithExport();
} else {
startWithBootstrap();
}
}
private static boolean isClassic(String[] args) {
return args.length > 0 && "classic".equalsIgnoreCase(args[0]);
}
private static void startWithBootstrap() {
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
bootstrap.application(new ApplicationConfig("dubbo-demo-api-provider"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.service(service)
.start()
.await();
}
private static void startWithExport() throws InterruptedException {
ServiceConfig<DemoServiceImpl> service = new ServiceConfig<>();
service.setInterface(DemoService.class);
service.setRef(new DemoServiceImpl());
service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));
service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
service.export();
System.out.println("dubbo service started");
new CountDownLatch(1).await();
}
}
这里,同样会有一个 DemoServiceImpl 实现了 DemoService 接口,并且在 org.apache.dubbo.demo.provider 目录下,能被扫描到,暴露成 Dubbo 服务。 再来看 dubbo-demo-api-consumer 模块,其中 Application 中包含一个普通的 main() 方法入口:
public class Application {
public static void main(String[] args) {
if (isClassic(args)) {
runWithRefer();
} else {
runWithBootstrap();
}
}
private static boolean isClassic(String[] args) {
return args.length > 0 && "classic".equalsIgnoreCase(args[0]);
}
private static void runWithBootstrap() {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setInterface(DemoService.class);
reference.setGeneric("true");
DubboBootstrap bootstrap = DubboBootstrap.getInstance();
bootstrap.application(new ApplicationConfig("dubbo-demo-api-consumer"))
.registry(new RegistryConfig("zookeeper://127.0.0.1:2181"))
.reference(reference)
.start();
DemoService demoService = ReferenceConfigCache.getCache().get(reference);
String message = demoService.sayHello("dubbo");
System.out.println(message);
// generic invoke
GenericService genericService = (GenericService) demoService;
Object genericInvokeResult = genericService.$invoke("sayHello", new String[] { String.class.getName() },
new Object[] { "dubbo generic invoke" });
System.out.println(genericInvokeResult);
}
private static void runWithRefer() {
ReferenceConfig<DemoService> reference = new ReferenceConfig<>();
reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface(DemoService.class);
DemoService service = reference.get();
String message = service.sayHello("dubbo");
System.out.println(message);
}
}
在互联网领域,每个信息资源都有统一的且在网上唯一的地址,该地址就叫 URL(Uniform Resource Locator,统一资源定位符),它是互联网的统一资源定位标志,也就是指网络地址。
URL 本质上就是一个特殊格式的字符串。一个标准的 URL 格式可以包含如下的几个部分:
- protocol:URL 的协议。我们常见的就是 HTTP 协议和 HTTPS 协议,当然,还有其他协议,如 FTP 协议、SMTP 协议等。
- username/password:用户名 / 密码。 HTTP Basic Authentication 中多会使用在 URL 的协议 之后直接携带用户名和密码的方式。
- host/port:主机 / 端口。在实践中一般会使用域名,而不是使用具体的 host 和 port。
- path:请求的路径。
- parameters:参数键值对。一般在 GET 请求中会将参数放到 URL 中,POST 请求会将参数放到 请求体中。
URL 是整个 Dubbo 中非常基础,也是非常核心的一个组件,阅读源码的过程中你会发现很多方法都是 以 URL 作为参数的,在方法内部解析传入的 URL 得到有用的参数,所以有人将 URL 称为 Dubbo 的配 置总线。
Dubbo 中的 URL
Dubbo 中任意的一个实现都可以抽象为一个 URL,Dubbo 使用 URL 来统一描述了所有对象和配置信 息,并贯穿在整个 Dubbo 框架之中。这里我们来看 Dubbo 中一个典型 URL 的示例,如下:
这个 Demo Provider 注册到 ZooKeeper 上的 URL 信息,简单解析一下这个 URL 的各个部分:
- protocol:dubbo 协议。
- username/password:没有用户名和密码。
- host/port:172.17.32.91:20880。
- path:org.apache.dubbo.demo.DemoService。
- parameters:参数键值对,这里是问号后面的参数。
下面是 URL 的构造方法,你可以看到其核心字段与前文分析的 URL 基本一致:
public URL(String protocol,
String username,
String password,
String host,
int port,
String path,
Map<String, String> parameters,
Map<String, Map<String, String>> methodParameters) {
if (StringUtils.isEmpty(username)
&& StringUtils.isNotEmpty(password)) {
throw new IllegalArgumentException("Invalid url, password without username!");
}
this.protocol = protocol;
this.username = username;
this.password = password;
this.host = host;
this.port = Math.max(port, 0);
this.address = getAddress(this.host, this.port);
// trim the beginning "/"
while (path != null && path.startsWith("/")) {
path = path.substring(1);
}
this.path = path;
if (parameters == null) {
parameters = new HashMap<>();
} else {
parameters = new HashMap<>(parameters);
}
this.parameters = Collections.unmodifiableMap(parameters);
this.methodParameters = Collections.unmodifiableMap(methodParameters);
}
另外,在 dubbo-common 包中还提供了 URL 的辅助类:
- URLBuilder, 辅助构造 URL;
- URLStrParser, 将字符串解析成 URL 对象。
契约的力量
对于 Dubbo 中的 URL,很多人称之为 “配置总线”,也有人称之为 “统一配置模型”。虽然说法不同,但 都是在表达一个意思,URL 在 Dubbo 中被当作是 “公共的契约”。一个 URL 可以包含非常多的扩展点参 数,URL 作为上下文信息贯穿整个扩展点设计体系。
其实,一个优秀的开源产品都有一套灵活清晰的扩展契约,不仅是第三方可以按照这个契约进行扩展, 其自身的内核也可以按照这个契约进行搭建。如果没有一个公共的契约,只是针对每个接口或方法进行 约定,就会导致不同的接口甚至同一接口中的不同方法,以不同的参数类型进行传参,一会儿传递 Map,一会儿传递字符串,而且字符串的格式也不确定,需要你自己进行解析,这就多了一层没有明确 表现出来的隐含的约定。
所以说,在 Dubbo 中使用 URL 的好处多多,增加了便捷性:
- 使用 URL 这种公共契约进行上下文信息传递,最重要的就是代码更加易读、易懂,不用花大量时 间去揣测传递数据的格式和含义,进而形成一个统一的规范,使得代码易写、易读。
- 使用 URL 作为方法的入参(相当于一个 Key/Value 都是 String 的 Map),它所表达的含义比单个 参数更丰富,当代码需要扩展的时候,可以将新的参数以 Key/Value 的形式追加到 URL 之中,而 不需要改变入参或是返回值的结构。
- 使用 URL 这种 “公共的契约” 可以简化沟通,人与人之间的沟通消耗是非常大的,信息传递的效率 非常低,使用统一的契约、术语、词汇范围,可以省去很多沟通成本,尽可能地提高沟通效率。
Dubbo 中的 URL 示例
URL 在 SPI 中的应用
Dubbo SPI 中有一个依赖 URL 的重要场景——适配器方法,是被 @Adaptive 注解标注的, URL 一个 很重要的作用就是与 @Adaptive 注解一起选择合适的扩展实现类。
例如,在 dubbo-registry-api 模块中我们可以看到 RegistryFactory 这个接口,其中的 getRegistry() 方 法上有@Adaptive({"protocol"})
注解,说明这是一个适配器方法,Dubbo 在运行时会为其动态生成相 应的 “$Adaptive”
类型,如下所示:
public class RegistryFactory$Adaptive implements RegistryFactory {
public Registry getRegistry(org.apache.dubbo.common.URL arg0) {
if (arg0 == null)
throw new IllegalArgumentException("...");
org.apache.dubbo.common.URL url = arg0;
String extName = (url.getProtocol() == null ? "dubbo" :url.getProtocol());
if (extName == null)
throw new IllegalStateException("...");
RegistryFactory extension = (RegistryFactory) ExtensionLoader
.getExtensionLoader(RegistryFactory.class)
.getExtension(extName);
return extension.getRegistry(arg0);
}
}
我们会看到,在生成的 RegistryFactory$Adaptive
类中会自动实现 getRegistry()
方法,其中会根据 URL 的 Protocol 确定扩展名称,从而确定使用的具体扩展实现类。我们可以找到 RegistryProtocol
这 个类,并在其 getRegistry() 方法中打一个断点, Debug 启动任意一个 Demo 示例中的 Provider,得到如下图所示的内容:
这里传入的 registryUrl 值为: 那么在 RegistryFactory$Adaptive 中得到的扩展名称为 zookeeper,此次使用的 Registry 扩展实现类 就是 ZookeeperRegistryFactory。
URL 在服务暴露中的应用
Provider 在启 动时,会将自身暴露的服务注册到 ZooKeeper 上,具体是注册哪些信息到 ZooKeeper 上呢?我们来看 ZookeeperRegistry.doRegister() 方法,在其中打个断点,然后 Debug 启动 Provider,会得到下图:
传入的 URL 中包含了 Provider 的地址(172.25.48.1:20880)、暴露的接口 (org.apache.dubbo.demo.DemoService)等信息, toUrlPath() 方法会根据传入的 URL 参数确定在 ZooKeeper 上创建的节点路径,还会通过 URL 中的 dynamic 参数值确定创建的 ZNode 是临时节点还是持久节点。
URL 在服务订阅中的应用
Consumer 启动后会向注册中心进行订阅操作,并监听自己关注的 Provider。那 Consumer 是如何告 诉注册中心自己关注哪些 Provider 呢?
我们来看 ZookeeperRegistry 这个实现类,它是由上面的 ZookeeperRegistryFactory 工厂类创建的 Registry 接口实现,其中的 doSubscribe() 方法是订阅操作的核心实现,在第 175 行打一个断点,并 Debug 启动 Demo 中 Consumer,会得到下图所示的内容:
我们看到传入的 URL 参数如下: 其中 Protocol 为 consumer ,表示是 Consumer 的订阅协议,其中的 category 参数表示要订阅的分 类,这里要订阅 providers、configurators 以及 routers 三个分类;interface 参数表示订阅哪个服务 接口,这里要订阅的是暴露 org.apache.dubbo.demo.DemoService 实现的 Provider。 通过 URL 中的上述参数,ZookeeperRegistry 会在 toCategoriesPath() 方法中将其整理成一个 ZooKeeper 路径,然后调用 zkClient 在其上添加监听。
Dubbo 为了更好地达到 OCP 原则(即 “对扩展开放,对修改封闭” 的原则),采用了“微内核 + 插件” 的 架构。那什么是微内核架构呢?
微内核架构也被称为插件化架构(Plug-in Architecture),这是一种面向功能进行拆分的可扩展性架构。内核功能是比较稳定的,只负责管理插件的生命周期,不会因为系统功能的扩展而不断进行修改。功能上的扩展全部封装到插件之中,插件模块是独立存在的模块,包含特 定的功能,能拓展内核系统的功能。 微内核架构中,内核通常采用 Factory、IoC、OSGi 等方式管理插件生命周期,Dubbo 最终决定采用 SPI 机制来加载插件,Dubbo SPI 参考 JDK 原生的 SPI 机制,进行了性能优化以及功能增强。因此,在 讲解 Dubbo SPI 之前,我们有必要先来介绍一下 JDK SPI 的工作原理。
JDK SPI
SPI(Service Provider Interface)主要是被框架开发人员使用的一种技术。例如,使用 Java 语言访问 数据库时我们会使用到 java.sql.Driver 接口,不同数据库产品底层的协议不同,提供的 java.sql.Driver 实现也不同,在开发 java.sql.Driver 接口时,开发人员并不清楚用户最终会使用哪个数据库,在这种情 况下就可以使用 Java SPI 机制在实际运行过程中,为 java.sql.Driver 接口寻找具体的实现。
JDK SPI 机制
当服务的提供者提供了一种接口的实现之后,需要在 Classpath 下的 META-INF/services/
目录里创建 一个以服务接口命名的文件,此文件记录了该 jar 包提供的服务接口的具体实现类。当某个应用引入了 该 jar 包且需要使用该服务时,JDK SPI 机制就可以通过查找这个 jar 包的META-INF/services/
中的配 置文件来获得具体的实现类名,进行实现类的加载和实例化,最终使用该实现类完成业务功能。
下面我们通过一个简单的示例演示下 JDK SPI 的基本使用方式:
首先我们需要创建一个 Log 接口,来模拟日志打印的功能:
public interface Log {
void log(String info);
}
接下来提供两个实现—— Logback 和 Log4j,分别代表两个不同日志框架的实现,如下所示:
public class Log4j implements Log {
@Override
public void log(String info) {
System.out.println("Log4j:" + info);
}
}
public class Logback implements Log {
@Override
public void log(String info) {
System.out.println("Logback:" + info);
}
}
在项目的resources/META-INF/services
目录下添加一个名为 yang.org.Log 的文件,这是 JDK SPI 需要 读取的配置文件,具体内容如下:
yang.org.impl.Log4j
yang.org.impl.Logback
最后创建 main() 方法,其中会加载上述配置文件,创建全部 Log 接口实现的实例,并执行其 log() 方 法,如下所示:
public class Main {
public static void main(String[] args) {
ServiceLoader<Log> serviceLoader = ServiceLoader.load(Log.class);
for (Log log : serviceLoader) {
log.log("JDK SPI");
}
}
}
JDK SPI 源码分析
过上述示例,我们可以看到 JDK SPI 的入口方法是 ServiceLoader.load() 方法,接下来我们就对其具 体实现进行深入分析。 在 ServiceLoader.load() 方法中,首先会尝试获取当前使用的 ClassLoader(获取当前线程绑定的 ClassLoader,查找失败后使用 SystemClassLoader),然后调用 reload() 方法,调用关系如下图所 示:
在 reload() 方法中,首先会清理 providers 缓存(LinkedHashMap 类型的集合),该缓存用来记录 ServiceLoader 创建的实现对象,其中 Key 为实现类的完整类名,Value 为实现类的对象。之后创建 LazyIterator 迭代器,用于读取 SPI 配置文件并实例化实现类对象。
ServiceLoader.reload() 方法的具体实现,如下所示:
// Cached providers, in instantiation order
private LinkedHashMap<String,S> providers = new LinkedHashMap<>();
public void reload() {
providers.clear();
lookupIterator = new LazyIterator(service, loader);
}
在前面的示例中,main() 方法中使用的迭代器底层就是调用了 ServiceLoader.LazyIterator 实现的。 Iterator 接口有两个关键方法:hasNext() 方法和 next() 方法。这里的 LazyIterator 中的 next() 方法最 终调用的是其 nextService() 方法,hasNext() 方法最终调用的是 hasNextService() 方法,调用关系如 下图所示:
public boolean hasNext() {
if (acc == null) {
return hasNextService();
} else {
PrivilegedAction<Boolean> action = new PrivilegedAction<Boolean>() {
public Boolean run() { return hasNextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}
public S next() {
if (acc == null) {
return nextService();
} else {
PrivilegedAction<S> action = new PrivilegedAction<S>() {
public S run() { return nextService(); }
};
return AccessController.doPrivileged(action, acc);
}
}
首先来看 LazyIterator.hasNextService() 方法,该方法主要负责查找 META-INF/services
目录下的 SPI 配置文件,并进行遍历,大致实现如下所示:
private static final String PREFIX = "META-INF/services/";
Enumeration<URL> configs = null;
Iterator<String> pending = null;
String nextName = null;
private boolean hasNextService() {
if (nextName != null) {
return true;
}
if (configs == null) {
try {
String fullName = PREFIX + service.getName();
if (loader == null)
configs = ClassLoader.getSystemResources(fullName);
else
configs = loader.getResources(fullName);
} catch (IOException x) {
fail(service, "Error locating configuration files", x);
}
}
while ((pending == null) || !pending.hasNext()) {
if (!configs.hasMoreElements()) {
return false;
}
pending = parse(service, configs.nextElement());
}
nextName = pending.next();
return true;
}
在 hasNextService() 方法中完成 SPI 配置文件的解析之后,再来看 LazyIterator.nextService() 方法, 该方法负责实例化 hasNextService() 方法读取到的实现类,其中会将实例化的对象放到 providers 集 合中缓存起来,核心实现如下所示:
private S nextService() {
if (!hasNextService())
throw new NoSuchElementException();
String cn = nextName;
nextName = null;
Class<?> c = null;
try {
c = Class.forName(cn, false, loader);
} catch (ClassNotFoundException x) {
fail(service,
"Provider " + cn + " not found");
}
if (!service.isAssignableFrom(c)) {
fail(service,
"Provider " + cn + " not a subtype");
}
try {
S p = service.cast(c.newInstance());
providers.put(cn, p);
return p;
} catch (Throwable x) {
fail(service,
"Provider " + cn + " could not be instantiated",
x);
}
throw new Error(); // This cannot happen
}
以上就是在 main() 方法中使用的迭代器的底层实现。最后,我们再来看一下 main() 方法中使用 ServiceLoader.iterator() 方法拿到的迭代器是如何实现的,这个迭代器是依赖 LazyIterator 实现的一个 匿名内部类,核心实现如下:
public Iterator<S> iterator() {
return new Iterator<S>() {
Iterator<Map.Entry<String,S>> knownProviders
= providers.entrySet().iterator();
public boolean hasNext() {
if (knownProviders.hasNext())
return true;
return lookupIterator.hasNext();
}
public S next() {
if (knownProviders.hasNext())
return knownProviders.next().getValue();
return lookupIterator.next();
}
public void remove() {
throw new UnsupportedOperationException();
}
};
}
JDK SPI 在 JDBC 中的应用
了解了 JDK SPI 实现的原理之后,我们再来看实践中 JDBC 是如何使用 JDK SPI 机制加载不同数据库厂 商的实现类。 JDK 中只定义了一个 java.sql.Driver 接口,具体的实现是由不同数据库厂商来提供的。这里我们就以 MySQL 提供的 JDBC 实现包为例进行分析。 在 mysql-connector-java-*.jar
包中的META-INF/services
目录下,有一个java.sql.Driver
文件中只有 一行内容,如下所示: 在使用 mysql-connector-java-*.jar
包连接 MySQL 数据库的时候,我们会用到如下语句创建数据库连 接:
String url = "jdbc:xxx://xxx:xxx/xxx";
Connection conn = DriverManager.getConnection(url, username, pwd);
DriverManager 是 JDK 提供的数据库驱动管理器,其中的代码片段,如下所示:
static {
loadInitialDrivers();
println("JDBC DriverManager initialized");
}
在调用 getConnection() 方法的时候,DriverManager 类会被 Java 虚拟机加载、解析并触发 static 代 码块的执行;在 loadInitialDrivers() 方法中通过 JDK SPI 扫描 Classpath 下 java.sql.Driver 接口实现类 并实例化,核心实现如下所示:
private static void loadInitialDrivers() {
String drivers = System.getProperty("jdbc.drivers")
ServiceLoader<Driver> loadedDrivers =
ServiceLoader.load(Driver.class);
Iterator<Driver> driversIterator = loadedDrivers.iterator();
while(driversIterator.hasNext()) {
driversIterator.next();
}
String[] driversList = drivers.split(":");
// 初始化Driver实现类
for (String aDriver : driversList) {
Class.forName(aDriver, true,ClassLoader.getSystemClassLoader());
}
}
在 MySQL 提供的 com.mysql.cj.jdbc.Driver
实现类中,同样有一段 static 静态代码块,这段代码会创建一个 com.mysql.cj.jdbc.Driver
对象并注册到 DriverManager.registeredDrivers
集合中 (CopyOnWriteArrayList 类型),如下所示:
static {
java.sql.DriverManager.registerDriver(new Driver());
}
在 getConnection()
方法中,DriverManager
从该 registeredDrivers
集合中获取对应的 Driver 对象创 建 Connection,核心实现如下所示:
private static Connection getConnection(String url, java.util.Properties info,
Class<?> caller) throws SQLException {
for(DriverInfo aDriver : registeredDrivers) {
Connection con = aDriver.driver.connect(url, info);
return con;
}
}
Dubbo SPI
在开始介绍 Dubbo SPI 实现之前,我们先来统一下面两个概念。
- 扩展点:通过 SPI 机制查找并加载实现的接口(又称 “扩展接口”)。前文示例中介绍的 Log 接口、com.mysql.cj.jdbc.Driver 接口,都是扩展点。
- 扩展点实现:实现了扩展接口的实现类。
通过前面的分析可以发现,JDK SPI 在查找扩展实现类的过程中,需要遍历 SPI 配置文件中定义的所有 实现类,该过程中会将这些实现类全部实例化。如果 SPI 配置文件中定义了多个实现类,而我们只需要 使用其中一个实现类时,就会生成不必要的对象。例如,org.apache.dubbo.rpc.Protocol 接口有 InjvmProtocol、DubboProtocol、RmiProtocol、HttpProtocol、HessianProtocol、ThriftProtocol 等多个实现,如果使用 JDK SPI,就会加载全部实现类,导致资源的浪费。
Dubbo SPI 不仅解决了上述资源浪费的问题,还对 SPI 配置文件扩展和修改。
首先,Dubbo 按照 SPI 配置文件的用途,将其分成了三类目录。
META-INF/services/
目录:该目录下的 SPI 配置文件用来兼容 JDK SPI 。META-INF/dubbo/
目录:该目录用于存放用户自定义 SPI 配置文件。META-INF/dubbo/internal/
目录:该目录用于存放 Dubbo 内部使用的 SPI 配置文件。
然后,Dubbo 将 SPI 配置文件改成了 KV 格式,例如
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
其中 key 被称为扩展名(也就是 ExtensionName),当我们在为一个接口查找具体实现类时,可以指 定扩展名来选择相应的扩展实现。例如,这里指定扩展名为 dubbo,Dubbo SPI 就知道我们要使用: org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
这个扩展实现类,只实例化这一个扩展实现 即可,无须实例化 SPI 配置文件中的其他扩展实现类。
使用 KV 格式的 SPI 配置文件的另一个好处是:让我们更容易定位到问题。假设我们使用的一个扩展实 现类所在的 jar 包没有引入到项目中,那么 Dubbo SPI 在抛出异常的时候,会携带该扩展名信息,而不 是简单地提示扩展实现类无法加载。这些更加准确的异常信息降低了排查问题的难度,提高了排查问题的效率。
下面我们正式进入 Dubbo SPI 核心实现的介绍。
@SPI 注解
Dubbo 中某个接口被 @SPI 注解修饰时,就表示该接口是扩展接口,前文示例中的 org.apache.dubbo.rpc.Protocol
接口就是一个扩展接口:
@SPI 注解的 value 值指定了默认的扩展名称,例如,在通过 Dubbo SPI 加载 Protocol 接口实现时, 如果没有明确指定扩展名,则默认会将 @SPI 注解的 value 值作为扩展名,即加载 dubbo 这个扩展名 对应的 org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
这个扩展实现类,相关的 SPI 配置文 件在 dubbo-rpc-dubbo 模块中,如下图所示:
那 ExtensionLoader 是如何处理 @SPI 注解的呢? ExtensionLoader 位于 dubbo-common 模块中的 extension 包中,功能类似于 JDK SPI 中的 java.util.ServiceLoader。Dubbo SPI 的核心逻辑几乎都封装在 ExtensionLoader 之中(其中就包括 @SPI 注解的处理逻辑),其使用方式如下所示:
Protocol protocol = ExtensionLoader
.getExtensionLoader(Protocol.class).getExtension("dubbo");
这里首先来了解一下 ExtensionLoader 中三个核心的静态字段。
strategies(LoadingStrategy[] 类型): LoadingStrategy 接口有三个实现(通过 JDK SPI 方式 加载的),如下图所示,分别对应前面介绍的三个 Dubbo SPI 配置文件所在的目录,且都继承了 Prioritized 这个优先级接口,默认优先级是
DubboInternalLoadingStrategy > DubboLoadingStrategy > ServicesLoadingStrateg
EXTENSION_LOADERS(ConcurrentMap 类型) :Dubbo 中一个扩展接口对应一个 ExtensionLoader 实例,该集合缓存了全部 ExtensionLoader 实例,其中的 Key 为扩展接口,Value 为加载其扩展实现的 ExtensionLoader 实例。
下面我们再来关注一下 ExtensionLoader 的实例字段。
- type(Class 类型):当前 ExtensionLoader 实例负责加载扩展接口。
- cachedDefaultName(String 类型):记录了 type 这个扩展接口上 @SPI 注解的 value 值,也就是默认扩展名。
- cachedNames(ConcurrentMap, String > 类型):缓存了该 ExtensionLoader 加 载的扩展实现类与扩展名之间的映射关系。
- cachedClasses(Holder>> 类型):缓存了该 ExtensionLoader 加载 的扩展名与扩展实现类之间的映射关系。cachedNames 集合的反向关系缓存。
- cachedInstances(ConcurrentMap > 类型):缓存了该 ExtensionLoader 加载的扩展名与扩展实现对象之间的映射关系。
ExtensionLoader.getExtensionLoader() 方法会根据扩展接口从 EXTENSION_LOADERS 缓存中查找相 应的 ExtensionLoader 实例,核心实现如下:
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) {
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}
ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}
得到接口对应的 ExtensionLoader 对象之后会调用其 getExtension() 方法,根据传入的扩展名称从 cachedInstances 缓存中查找扩展实现的实例,最终将其实例化后返回:
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
在 createExtension() 方法中完成了 SPI 配置文件的查找以及相应扩展实现类的实例化,同时还实现了 自动装配以及自动 Wrapper 包装等功能。其核心流程是这样的:
- 获取 cachedClasses 缓存,根据扩展名从 cachedClasses 缓存中获取扩展实现类。如果 cachedClasses 未初始化,则会扫描前面介绍的三个 SPI 目录获取查找相应的 SPI 配置文件,然后 加载其中的扩展实现类,最后将扩展名和扩展实现类的映射关系记录到 cachedClasses 缓存中。 这部分逻辑在 loadExtensionClasses() 和 loadDirectory() 方法中。
- 根据扩展实现类从 EXTENSION_INSTANCES 缓存中查找相应的实例。如果查找失败,会通过反射 创建扩展实现对象。
- 自动装配扩展实现对象中的属性(即调用其 setter)。
- 自动包装扩展实现对象。
- 如果扩展实现类实现了 Lifecycle 接口,在 initExtension() 方法中会调用 initialize() 方法进行初始 化。
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
initExtension(instance);
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}
@Adaptive 注解与适配器
@Adaptive 注解用来实现 Dubbo 的适配器功能,那什么是适配器呢?这里我们通过一个示例进行说 明。Dubbo 中的 ExtensionFactory 接口有三个实现类,如下图所示,ExtensionFactory 接口上有 @SPI 注解,AdaptiveExtensionFactory 实现类上有 @Adaptive 注解。
AdaptiveExtensionFactory 不实现任何具体的功能,而是用来适配 ExtensionFactory 的 SpiExtensionFactory 和 SpringExtensionFactory 这两种实现。AdaptiveExtensionFactory 会根据运 行时的一些状态来选择具体调用 ExtensionFactory 的哪个实现。
@Adaptive 注解还可以加到接口方法之上,Dubbo 会动态生成适配器类。例如,Transporter 接口有 两个被 @Adaptive 注解修饰的方法:
@SPI("netty")
public interface Transporter {
/**
* Bind a server.
*
* @param url server url
* @param handler
* @return server
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
*/
@Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException;
/**
* Connect to a server.
*
* @param url server url
* @param handler
* @return client
* @throws RemotingException
* @see org.apache.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
*/
@Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
Client connect(URL url, ChannelHandler handler) throws RemotingException;
}
Dubbo 会生成一个 Transporter$Adaptive 适配器类,该类继承了 Transporter 接口:
public class Transporter$Adaptive implements Transporter {
public org.apache.dubbo.remoting.Client connect(URL arg0, ChannelHandler arg1)
throws RemotingException {
if (arg0 == null) throw new IllegalArgumentException("url == null");
URL url = arg0;
String extName = url.getParameter("client",
url.getParameter("transporter", "netty"));
if (extName == null)
throw new IllegalStateException("...");
Transporter extension = (Transporter) ExtensionLoader
.getExtensionLoader(Transporter.class)
.getExtension(extName);
return extension.connect(arg0, arg1);
}
...
}
生成 Transporter$Adaptive 这个类的逻辑位于 ExtensionLoader.createAdaptiveExtensionClass() 方 法,若感兴趣你可以看一下相关代码,其中涉及的 javassist 等方面的知识。
明确了 @Adaptive 注解的作用之后,我们回到 ExtensionLoader.createExtension() 方法,其中在扫描 SPI 配置文件的时候,会调用 loadClass() 方法加载 SPI 配置文件中指定的类,如下图所示:
loadClass() 方法中会识别加载扩展实现类上的 @Adaptive 注解,将该扩展实现的类型缓存到 cachedAdaptiveClass 这个实例字段上(volatile 修饰):
private void loadClass(){
if (clazz.isAnnotationPresent(Adaptive.class)) {
// 缓存到cachedAdaptiveClass字段
cacheAdaptiveClass(clazz, overridden);
} else ...
}
我们可以通过 ExtensionLoader.getAdaptiveExtension() 方法获取适配器实例,并将该实例缓存到 cachedAdaptiveInstance 字段(Holder 类型)中,核心流程如下:
- 首先,检查 cachedAdaptiveInstance 字段中是否已缓存了适配器实例,如果已缓存,则直接返回 该实例即可。
- 然后,调用 getExtensionClasses() 方法,其中就会触发前文介绍的 loadClass() 方法,完成 cachedAdaptiveClass 字段的填充。
- 如果存在 @Adaptive 注解修饰的扩展实现类,该类就是适配器类,通过 newInstance() 将其实例 化即可。如果不存在 @Adaptive 注解修饰的扩展实现类,就需要通过 createAdaptiveExtensionClass() 方法扫描扩展接口中方法上的 @Adaptive 注解,动态生成适配 器类,然后实例化。
- 接下来,调用 injectExtension() 方法进行自动装配,就能得到一个完整的适配器实例。
- 最后,将适配器实例缓存到 cachedAdaptiveInstance 字段,然后返回适配器实例。
此外,我们还可以通过 API 方式(addExtension() 方法)设置 cachedAdaptiveClass 这个字段,指定 适配器类型(这个方法你知道即可)。 总之,适配器什么实际工作都不用做,就是根据参数和状态选择其他实现来完成工作。
自动包装特性
Dubbo 中的一个扩展接口可能有多个扩展实现类,这些扩展实现类可能会包含一些相同的逻辑,如果在每个实现类中都写一遍,那么这些重复代码就会变得很难维护。Dubbo 提供的自动包装特性,就可 以解决这个问题。 Dubbo 将多个扩展实现类的公共逻辑,抽象到 Wrapper 类中,Wrapper 类与普通 的扩展实现类一样,也实现了扩展接口,在获取真正的扩展实现对象时,在其外面包装一层 Wrapper 对象,你可以理解成一层装饰器。
了解了 Wrapper 类的基本功能,我们回到 ExtensionLoader.loadClass() 方法中,可以看到:
private void loadClass(){
...
} else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else ...
}
- 在 isWrapperClass() 方法中,会判断该扩展实现类是否包含拷贝构造函数(即构造函数只有一个参数且为扩展接口类型),如果包含,则为Wrapper类,这就是判断 Wrapper类的标准。
- 将 Wrapper 类记录到 cachedWrapperClasses(Set> 类型)这个实例字段中进行缓存。
createExtension() 方法时的 4 处,有下面这段代码,其中会遍历全部 Wrapper 类并一层层 包装到真正的扩展实例对象外层:
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass
.getConstructor(type).newInstance(instance));
}
}
自动装配特性
在 createExtension() 方法中我们看到,Dubbo SPI 在拿到扩展实现类的对象(以及 Wrapper 类的对象)之后,还会调用 injectExtension() 方法扫描其全部 setter 方法,并根据 setter 方法的名称以及参 数的类型,加载相应的扩展实现,然后调用相应的 setter 方法填充属性,这就实现了 Dubbo SPI 的自 动装配特性。简单来说,自动装配属性就是在加载一个扩展点的时候,将其依赖的扩展点一并加载,并进行装配。
下面简单看一下 injectExtension() 方法的具体实现:
private T injectExtension(T instance) {
if (objectFactory == null) {
return instance;
}
try {
for (Method method : instance.getClass().getMethods()) {
if (!isSetter(method)) {
continue;
}
/**
* Check {@link DisableInject} to see if we need auto injection for this property
*/
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
String property = getSetterProperty(method);
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
injectExtension() 方法实现的自动装配依赖了 ExtensionFactory(即 objectFactory 字段),前面我们 提到过 ExtensionFactory 有 SpringExtensionFactory 和 SpiExtensionFactory 两个真正的实现(还有 一个实现是 AdaptiveExtensionFactory 是适配器)。下面我们分别介绍下这两个真正的实现。
第一个,SpiExtensionFactory。 根据扩展接口获取相应的适配器,没有到属性名称:
@Override
public <T> T getExtension(Class<T> type, String name) {
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
// 查找type对应的ExtensionLoader实例
ExtensionLoader<T> loader = ExtensionLoader.getExtensionLoader(type);
if (!loader.getSupportedExtensions().isEmpty()) {
return loader.getAdaptiveExtension();
}
}
return null;
}
第二个,SpringExtensionFactory。 将属性名称作为 Spring Bean 的名称,从 Spring 容器中获取 Bean:
public <T> T getExtension(Class<T> type, String name) {
//SPI should be get from SpiExtensionFactory
if (type.isInterface() && type.isAnnotationPresent(SPI.class)) {
return null;
}
for (ApplicationContext context : CONTEXTS) {
T bean = BeanFactoryUtils.getOptionalBean(context, name, type);
if (bean != null) {
return bean;
}
}
logger.warn("No spring extension (bean) named:" + name + ", try to find an extension (bean) of type " + type.getName());
return null;
}
@Activate 注解与自动激活特性
这里以 Dubbo 中的 Filter 为例说明自动激活特性的含义,org.apache.dubbo.rpc.Filter 接口有非常多 的扩展实现类,在一个场景中可能需要某几个 Filter 扩展实现类协同工作,而另一个场景中可能需要另 外几个实现类一起工作。这样,就需要一套配置来指定当前场景中哪些 Filter 实现是可用的,这就是 @Activate 注解要做的事情。
@Activate 注解标注在扩展实现类上,有 group、value 以及 order 三个属性。
- group 属性:修饰的实现类是在 Provider 端被激活还是在 Consumer 端被激活。
- value 属性:修饰的实现类只在 URL 参数中出现指定的 key 时才会被激活。
- order 属性:用来确定扩展实现类的排序。
我们先来看 loadClass() 方法对 @Activate 的扫描,其中会将包含 @Activate 注解的实现类缓存到 cachedActivates 这个实例字段(Map 类型,Key 为扩展名,Value 为 @Activate 注 解):
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
if (!type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error occurred when loading extension class (interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + " is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
} else if (isWrapperClass(clazz)) {
cacheWrapperClass(clazz);
} else {
clazz.getConstructor();
if (StringUtils.isEmpty(name)) {
name = findAnnotationName(clazz);
if (name.length() == 0) {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
}
}
String[] names = NAME_SEPARATOR.split(name);
if (ArrayUtils.isNotEmpty(names)) {
cacheActivateClass(clazz, names[0]);
for (String n : names) {
cacheName(clazz, n);
saveInExtensionClass(extensionClasses, clazz, n, overridden);
}
}
}
}
使用 cachedActivates 这个集合的地方是 getActivateExtension() 方法。首先来关注 getActivateExtension() 方法的参数:url 中包含了配置信息,values 是配置中指定的扩展名,group 为 Provider 或 Consumer。下面是 getActivateExtension() 方法的核心逻辑:
- 首先,获取默认激活的扩展集合。默认激活的扩展实现类有几个条件:①在 cachedActivates 集合 中存在;②@Activate 注解指定的 group 属性与当前 group 匹配;③扩展名没有出现在 values 中 (即未在配置中明确指定,也未在配置中明确指定删除);④URL 中出现了 @Activate 注解中指 定的 Key。
- 然后,按照 @Activate 注解中的 order 属性对默认激活的扩展集合进行排序。
- 最后,按序添加自定义扩展实现类的对象。
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> activateExtensions = new ArrayList<>();
List<String> names = values == null ? new ArrayList<>(0) : asList(values);
if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Object activate = entry.getValue();
String[] activateGroup, activateValue;
if (activate instanceof Activate) {
activateGroup = ((Activate) activate).group();
activateValue = ((Activate) activate).value();
} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {
activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();
activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();
} else {
continue;
}
if (isMatchGroup(group, activateGroup)
&& !names.contains(name)
&& !names.contains(REMOVE_VALUE_PREFIX + name)
&& isActive(activateValue, url)) {
activateExtensions.add(getExtension(name));
}
}
activateExtensions.sort(ActivateComparator.COMPARATOR);
}
List<T> loadedExtensions = new ArrayList<>();
for (int i = 0; i < names.size(); i++) {
String name = names.get(i);
if (!name.startsWith(REMOVE_VALUE_PREFIX)
&& !names.contains(REMOVE_VALUE_PREFIX + name)) {
if (DEFAULT_KEY.equals(name)) {
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(0, loadedExtensions);
loadedExtensions.clear();
}
} else {
loadedExtensions.add(getExtension(name));
}
}
}
if (!loadedExtensions.isEmpty()) {
activateExtensions.addAll(loadedExtensions);
}
return activateExtensions;
}
在很多开源框架中,都需要定时任务的管理功能,例如 ZooKeeper、Netty、Quartz、Kafka 以及 Linux 操作系统。 JDK 提供的 java.util.Timer 和 DelayedQueue 等工具类,可以帮助我们实现简单的定时任务管理,其 底层实现使用的是堆这种数据结构,存取操作的复杂度都是 O(nlog(n)),无法支持大量的定时任务。在 定时任务量比较大、性能要求比较高的场景中,为了将定时任务的存取操作以及取消操作的时间复杂度 降为 O(1),一般会使用时间轮的方式。
时间轮是一种高效的、批量管理定时任务的调度模型。时间轮一般会实现成一个环形结构,类似一个时 钟,分为很多槽,一个槽代表一个时间间隔,每个槽使用双向链表存储定时任务;指针周期性地跳动, 跳动到一个槽位,就执行该槽位的定时任务。
需要注意的是,单层时间轮的容量和精度都是有限的,对于精度要求特别高、时间跨度特别大或是海量 定时任务需要调度的场景,通常会使用多级时间轮以及持久化存储与时间轮结合的方案。
那在 Dubbo 中,时间轮的具体实现方式是怎样的呢?
Dubbo 的时间轮实现 位于 dubbo-common 模块的 org.apache.dubbo.common.timer 包中,下面我们就来分析时间轮涉及 的核心接口和实现。
核心接口
在 Dubbo 中,所有的定时任务都要继承 TimerTask 接口。TimerTask 接口非常简单,只定义了一个 run() 方法,该方法的入参是一个 Timeout 接口的对象。Timeout 对象与 TimerTask 对象一一对应,两 者的关系类似于线程池返回的 Future 对象与提交到线程池中的任务对象之间的关系。通过 Timeout 对 象,我们不仅可以查看定时任务的状态,还可以操作定时任务(例如取消关联的定时任务)。Timeout 接口中的方法如下图所示:
Timer 接口定义了定时器的基本行为,如下图所示,其核心是 newTimeout() 方法:提交一个定时任务 (TimerTask)并返回关联的 Timeout 对象,这有点类似于向线程池提交任务的感觉。
HashedWheelTimeout
HashedWheelTimeout 是 Timeout 接口的唯一实现,是 HashedWheelTimer 的内部类。 HashedWheelTimeout 扮演了两个角色:
- 第一个,时间轮中双向链表的节点,即定时任务 TimerTask 在 HashedWheelTimer 中的容器。
- 第二个,定时任务 TimerTask 提交到 HashedWheelTimer 之后返回的句柄(Handle),用于在 时间轮外部查看和控制定时任务。
HashedWheelTimeout 中的核心字段如下:
- prev、next(HashedWheelTimeout 类型),分别对应当前定时任务在链表中的前驱节点和后 继节点。
- task(TimerTask 类型),指实际被调度的任务。
- deadline(long 类型),指定时任务执行的时间。这个时间是在创建 HashedWheelTimeout 时 指定的,计算公式是:currentTime(创建 HashedWheelTimeout 的时间) + delay(任务延迟 时间) - startTime(HashedWheelTimer 的启动时间),时间单位为纳秒。
- state(volatile int 类型),指定时任务当前所处状态,可选的有三个,分别是 INIT(0)、 CANCELLED(1)和 EXPIRED(2)。另外,还有一个 STATE_UPDATER 字段 (AtomicIntegerFieldUpdater 类型)实现 state 状态变更的原子性。
- remainingRounds(long 类型),指当前任务剩余的时钟周期数。时间轮所能表示的时间长度是有限的,在任务到期时间与当前时刻的时间差,超过时间轮单圈能表示的时长,就出现了套圈的 情况,需要该字段值表示剩余的时钟周期。
HashedWheelTimeout 中的核心方法有:
- isCancelled()、isExpired() 、state() 方法, 主要用于检查当前 HashedWheelTimeout 状态。
- cancel() 方法, 将当前 HashedWheelTimeout 的状态设置为 CANCELLED,并将当前 HashedWheelTimeout 添加到 cancelledTimeouts 队列中等待销毁。
- expire() 方法, 当任务到期时,会调用该方法将当前 HashedWheelTimeout 设置为 EXPIRED 状 态,然后调用其中的 TimerTask 的 run() 方法执行定时任务。
- remove() 方法, 将当前 HashedWheelTimeout 从时间轮中删除。
HashedWheelBucket
HashedWheelBucket 是时间轮中的一个槽,时间轮中的槽实际上就是一个用于缓存和管理双向链表的 容器,双向链表中的每一个节点就是一个 HashedWheelTimeout 对象,也就关联了一个 TimerTask 定 时任务。
HashedWheelBucket 持有双向链表的首尾两个节点,分别是 head 和 tail 两个字段,再加上每个 HashedWheelTimeout 节点均持有前驱和后继的引用,这样就可以正向或是逆向遍历整个双向链表 了。
下面我们来看 HashedWheelBucket 中的核心方法。
- addTimeout() 方法:新增 HashedWheelTimeout 到双向链表的尾部。
- pollTimeout() 方法:移除双向链表中的头结点,并将其返回。
- remove() 方法:从双向链表中移除指定的 HashedWheelTimeout 节点。
- clearTimeouts() 方法:循环调用 pollTimeout() 方法处理整个双向链表,并返回所有未超时或者 未被取消的任务。
- expireTimeouts() 方法:遍历双向链表中的全部 HashedWheelTimeout 节点。 在处理到期的定 时任务时,会通过 remove() 方法取出,并调用其 expire() 方法执行;对于已取消的任务,通过 remove() 方法取出后直接丢弃;对于未到期的任务,会将 remainingRounds 字段(剩余时钟周 期数)减一。
HashedWheelTimer
HashedWheelTimer 是 Timer 接口的实现,它通过时间轮算法实现了一个定时器。
HashedWheelTimer 会根据当前时间轮指针选定对应的槽(HashedWheelBucket),从双向链表的头 部开始迭代,对每个定时任务(HashedWheelTimeout)进行计算,属于当前时钟周期则取出运行, 不属于则将其剩余的时钟周期数减一操作。
下面我们来看 HashedWheelTimer 的核心属性。
- workerState(volatile int 类型):时间轮当前所处状态,可选值有 init、started、 shutdown。同时,有相应的 AtomicIntegerFieldUpdater 实现 workerState 的原子修改。
- startTime(long 类型):当前时间轮的启动时间,提交到该时间轮的定时任务的 deadline 字段 值均以该时间戳为起点进行计算。
- wheel(HashedWheelBucket[] 类型):该数组就是时间轮的环形队列,每一个元素都是一个 槽。当指定时间轮槽数为 n 时,实际上会取大于且最靠近 n 的 2 的幂次方值。
- timeouts、cancelledTimeouts(LinkedBlockingQueue 类型):timeouts 队列用于缓冲外 部提交时间轮中的定时任务,cancelledTimeouts 队列用于暂存取消的定时任务。 HashedWheelTimer 会在处理 HashedWheelBucket 的双向链表之前,先处理这两个队列中的数 据。
- tick(long 类型):该字段在 HashedWheelTimer$Worker 中,是时间轮的指针,是一个步长为 1 的单调递增计数器。
- mask(int 类型):掩码, mask = wheel.length - 1,执行 ticks & mask 便能定位到对应的时钟 槽。
- ticksDuration(long 类型):时间指针每次加 1 所代表的实际时间,单位为纳秒。
- pendingTimeouts(AtomicLong 类型):当前时间轮剩余的定时任务总数。 workerThread(Thread 类型):时间轮内部真正执行定时任务的线程。
- worker(Worker 类型):真正执行定时任务的逻辑封装这个 Runnable 对象中。
时间轮对外提供了一个 newTimeout() 接口用于提交定时任务,在定时任务进入到 timeouts 队列之前 会先调用 start() 方法启动时间轮,其中会完成下面两个关键步骤:
- 确定时间轮的 startTime 字段;
- 启动 workerThread 线程,开始执行 worker 任务。
之后根据 startTime 计算该定时任务的 deadline 字段,最后才能将定时任务封装成 HashedWheelTimeout 并添加到 timeouts 队列。
下面我们来分析时间轮指针一次转动的全流程。
- 时间轮指针转动,时间轮周期开始。
- 清理用户主动取消的定时任务,这些定时任务在用户取消时,会记录到 cancelledTimeouts 队列 中。在每次指针转动的时候,时间轮都会清理该队列。
- 将缓存在 timeouts 队列中的定时任务转移到时间轮中对应的槽中。
- 根据当前指针定位对应槽,处理该槽位的双向链表中的定时任务。
- 检测时间轮的状态。如果时间轮处于运行状态,则循环执行上述步骤,不断执行定时任务。如果时间轮处于停止状态,则执行下面的步骤获取到未被执行的定时任务并加入 unprocessedTimeouts 队列:遍历时间轮中每个槽位,并调用 clearTimeouts() 方法;对 timeouts 队列中未被加入槽中 循环调用 poll()。
- 最后再次清理 cancelledTimeouts 队列中用户主动取消的定时任务。 上述核心逻辑在 HashedWheelTimer$Worker.run() 方法中,若你感兴趣的话,可以翻看一下源码进行分析。
Dubbo 中如何使用定时任务
在 Dubbo 中,时间轮并不直接用于周期性操作,而是只向时间轮提交执行单次的定时任务,在上一次 任务执行完成的时候,调用 newTimeout() 方法再次提交当前任务,这样就会在下个周期执行该任务。 即使在任务执行过程中出现了 GC、I/O 阻塞等情况,导致任务延迟或卡住,也不会有同样的任务源源不 断地提交进来,导致任务堆积。
Dubbo 中对时间轮的应用主要体现在如下两个方面:
- 失败重试, 例如,Provider 向注册中心进行注册失败时的重试操作,或是 Consumer 向注册中心 订阅时的失败重试等。
- 周期性定时任务, 例如,定期发送心跳请求,请求超时的处理,或是网络连接断开后的重连机 制。