聊聊 Dubbo 的 SPI 续之自适应拓展

Adaptive

这个应该是 Dubbo SPI 里最玄妙的东西了,一开始没懂,自适应扩展点加载,
dubbo://123.123.123.123:1234/com.nicksxs.demo.service.HelloWorldService?anyhost=true&application=demo&default.loadbalance=random&default.service.filter=LoggerFilter&dubbo=2.5.3&interface=com.nicksxs.demo.service.HelloWorldService&logger=slf4j&methods=method1,method2,method3,method4&pid=4292&retries=0&side=provider&threadpool=fixed&threads=200&timeout=2000&timestamp=1590647155886
那我从比较能理解的角度或者说思路去讲讲我的理解,因为直接将原理如果脱离了使用,对于我这样的理解能力比较差的可能会比较吃力,从使用场景开始讲可能会比较舒服了,这里可以看到参数里有蛮多的,举个例子,比如这个 threadpool = fixed,说明线程池使用的是 fixed 对应的实现,也就是下图的这个

这样子似乎没啥问题了,反正就是用dubbo 的 spi 加载嘛,好像没啥问题,其实问题还是存在的,或者说不太优雅,比如要先判断我这个 fixed 对应的实现类是哪个,这里可能就有个 if-else 判断了,但是 dubbo 的开发人员似乎不太想这么做这个事情,

譬如我们在引用一个服务时,在ReferenceConfig 中的

1
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

就获取了自适应拓展,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}

return (T) instance;
}

这里也使用了 DCL,来锁cachedAdaptiveInstance,当缓存中没有时就去创建自适应拓展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private T createAdaptiveExtension() {
try {
// 获取自适应拓展类然后实例化
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}

private Class<?> getAdaptiveExtensionClass() {
// 这里会获取拓展类,如果没有自适应的拓展类,那么就需要调用createAdaptiveExtensionClass
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
private Class<?> createAdaptiveExtensionClass() {
// 这里去生成了自适应拓展的代码,具体生成逻辑比较复杂先不展开讲
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

生成的代码像这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;


public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}

public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}

public com.alibaba.dubbo.rpc.Exporter export(
com.alibaba.dubbo.rpc.Invoker arg0)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument == null");
}

if (arg0.getUrl() == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}

com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}

com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);

return extension.export(arg0);
}

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,
com.alibaba.dubbo.common.URL arg1)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}

com.alibaba.dubbo.common.URL url = arg1;
// 其实前面所说的逻辑就在这里呈现了
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
// 在这就是实际的通过dubbo 的 spi 去加载实际对应的扩展
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);

return extension.refer(arg0, arg1);
}
}