Nicksxs's Blog

What hurts more, the pain of hard work or the pain of regret?

Adaptive

这个应该是 Dubbo SPI 里最玄妙的东西了,一开始没懂,自适应扩展点加载,
dubbo://123.123.123.123:1234/com.nicksxs.demo.service.HelloWorldService?anyhost=true&application=demo&default.loadbalance=random&default.service.filter=LoggerFilter&dubbo=2.5.3&interface=com.nicksxs.demo.service.HelloWorldService&logger=slf4j&methods=method1,method2,method3,method4&pid=4292&retries=0&side=provider&threadpool=fixed&threads=200&timeout=2000&timestamp=1590647155886
那我从比较能理解的角度或者说思路去讲讲我的理解,因为直接将原理如果脱离了使用,对于我这样的理解能力比较差的可能会比较吃力,从使用场景开始讲可能会比较舒服了,这里可以看到参数里有蛮多的,举个例子,比如这个 threadpool = fixed,说明线程池使用的是 fixed 对应的实现,也就是下图的这个

这样子似乎没啥问题了,反正就是用dubbo 的 spi 加载嘛,好像没啥问题,其实问题还是存在的,或者说不太优雅,比如要先判断我这个 fixed 对应的实现类是哪个,这里可能就有个 if-else 判断了,但是 dubbo 的开发人员似乎不太想这么做这个事情,

譬如我们在引用一个服务时,在ReferenceConfig 中的

1
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

就获取了自适应拓展,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}

return (T) instance;
}

这里也使用了 DCL,来锁cachedAdaptiveInstance,当缓存中没有时就去创建自适应拓展

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private T createAdaptiveExtension() {
try {
// 获取自适应拓展类然后实例化
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}

private Class<?> getAdaptiveExtensionClass() {
// 这里会获取拓展类,如果没有自适应的拓展类,那么就需要调用createAdaptiveExtensionClass
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
private Class<?> createAdaptiveExtensionClass() {
// 这里去生成了自适应拓展的代码,具体生成逻辑比较复杂先不展开讲
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

生成的代码像这样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package com.alibaba.dubbo.rpc;

import com.alibaba.dubbo.common.extension.ExtensionLoader;


public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}

public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
}

public com.alibaba.dubbo.rpc.Exporter export(
com.alibaba.dubbo.rpc.Invoker arg0)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg0 == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument == null");
}

if (arg0.getUrl() == null) {
throw new IllegalArgumentException(
"com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
}

com.alibaba.dubbo.common.URL url = arg0.getUrl();
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}

com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);

return extension.export(arg0);
}

public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0,
com.alibaba.dubbo.common.URL arg1)
throws com.alibaba.dubbo.rpc.RpcException {
if (arg1 == null) {
throw new IllegalArgumentException("url == null");
}

com.alibaba.dubbo.common.URL url = arg1;
// 其实前面所说的逻辑就在这里呈现了
String extName = ((url.getProtocol() == null) ? "dubbo"
: url.getProtocol());

if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
// 在这就是实际的通过dubbo 的 spi 去加载实际对应的扩展
com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class)
.getExtension(extName);

return extension.refer(arg0, arg1);
}
}

SPI全称是Service Provider Interface,咋眼看跟api是不是有点相似,api是application interface,这两个其实在某些方面有类似的地方,也有蛮大的区别,比如我们基于 dubbo 的微服务,一般我们可以提供服务,然后非泛化调用的话,我们可以把 api 包提供给应用调用方,他们根据接口签名传对应参数并配置好对应的服务发现如 zk 等就可以调用我们的服务了,然后 spi 会有点类似但是是反过来的关系,相当于是一种规范,比如我约定完成这个功能需要两个有两个接口,一个是连接的,一个是断开的,其实就可以用 jdbc 的驱动举例,比较老套了,然后各个厂家去做具体的实现吧,到时候根据我接口的全限定名的文件来加载实际的实现类,然后运行的时候调用对应实现类的方法就完了

3sKdpg

看上面的图,java.sql.Driver就是 spi,对应在classpath 的 META-INF/services 目录下的这个文件,里边的内容就是具体的实现类

1590735097909

简单介绍了 Java的 SPI,再来说说 dubbo 的,dubbo 中为啥要用 SPI 呢,主要是为了框架的可扩展性和性能方面的考虑,比如协议层 dubbo 默认使用 dubbo 协议,同时也支持很多其他协议,也支持用户自己实现协议,那么跟 Java 的 SPI 会有什么区别呢,我们也来看个文件

bqxWMp

是不是看着很想,又有点不一样,在 Java 的 SPI 配置文件里每一行只有一个实现类的全限定名,在 Dubbo的 SPI配置文件中是 key=value 的形式,我们只需要对应的 key 就能加载对应的实现,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 返回指定名字的扩展。如果指定名字的扩展不存在,则抛异常 {@link IllegalStateException}.
*
* @param name
* @return
*/
@SuppressWarnings("unchecked")
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}

这里其实就可以看出来第二个不同点了,就是这个cachedInstances,第一个是不用像 Java 原生的 SPI 那样去遍历加载对应的服务类,只需要通过 key 去寻找,并且寻找的时候会先从缓存的对象里去取,还有就是注意下这里的 DCL(double check lock)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@SuppressWarnings("unchecked")
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}

然后就是创建扩展了,这里如果 wrapperClasses 就会遍历生成wrapper实例,并做 setter 依赖注入,但是这里cachedWrapperClasses的来源还是有点搞不清楚,得再看下 com.alibaba.dubbo.common.extension.ExtensionLoader#loadFile的具体逻辑
又看了遍新的代码,这个函数被抽出来了

1
2
3
4
5
6
7
8
9
10
11
12
13
/**
* test if clazz is a wrapper class
* <p>
* which has Constructor with given class type as its only argument
*/
private boolean isWrapperClass(Class<?> clazz) {
try {
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}

是否是 wrapperClass 其实就看构造函数的。

因为传说中的出身问题,我以前写的是PHP,在使用 swoole 之前,基本的应用调试手段就是简单粗暴的 var_dump,exit,对于单进程模型的 PHP 也是简单有效,技术栈换成 Java 之后,就变得没那么容易,一方面是需要编译,另一方面是一般都是基于 spring 的项目,如果问题定位比较模糊,那框架层的是很难靠简单的 System.out.println 或者打 log 解决,(PS:我觉得可能我写的东西比较适合从 PHP 这种弱类型语言转到 Java 的小白同学)这个时候一方面因为是 Java,有了非常好用的 idea IDE 的支持,可以各种花式调试,条件断点尤其牛叉,但是又因为有 Spring+Java 的双重原因,有些情况下单步调试可以把手按废掉,这也是我之前一直比较困惑苦逼的点,后来随着慢慢精(jiang)进(you)之后,比如对于一个 oom 的情况,我们可以通过启动参数加上-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=xx/xx 来配置溢出时的堆dump 日志,获取到这个文件后,我们可以通过像 Memory Analyzer (MAT)[https://www.eclipse.org/mat/] (The Eclipse Memory Analyzer is a fast and feature-rich Java heap analyzer that helps you find memory leaks and reduce memory consumption.)来查看诊断问题所在,之前用到的时候是因为有个死循环一直往链表里塞数据,属于比较简单的,后来一次是由于运维进行应用迁移时按默认的统一配置了堆内存大小,导致内存的确不够用,所以溢出了,
今天想说的其实主要是我们的 thread dump,这也是我最近才真正用的一个方法,可能真的很小白了,用过 ide 的单步调试其实都知道会有一个一层层的玩意,比如函数从 A,调用了 B,再从 B 调用了 C,一直往下(因为是 Java,所以还有很多🤦‍♂️),这个其实也是大部分语言的调用模型,利用了栈这个数据结构,通过这个结构我们可以知道代码的调用链路,由于对于一个 spring 应用,在本身框架代码量非常庞大的情况下,外加如果应用代码也是非常多的时候,有时候通过单步调试真的很难短时间定位到问题,需要非常大的耐心和仔细观察,当然不是说完全不行,举个例子当我的应用经常启动需要非常长的时间,因为本身应用有非常多个 bean,比较难说究竟是 bean 的加载的确很慢还是有什么异常原因,这种时候就可以使用 thread dump 了,具体怎么操作呢

如果在idea 中运行或者调试时,可以直接点击这个照相机一样的按钮,右边就会出现了左边会显示所有的线程,右边会显示线程栈,

1
2
3
4
5
6
7
"main@1" prio=5 tid=0x1 nid=NA runnable
java.lang.Thread.State: RUNNABLE
at TreeDistance.treeDist(TreeDistance.java:64)
at TreeDistance.treeDist(TreeDistance.java:65)
at TreeDistance.treeDist(TreeDistance.java:65)
at TreeDistance.treeDist(TreeDistance.java:65)
at TreeDistance.main(TreeDistance.java:45)

这就是我们主线程的堆栈信息了,main 表示这个线程名,prio表示优先级,默认是 5,tid 表示线程 id,nid 表示对应的系统线程,后面的runnable 表示目前线程状态,因为是被我打了断点,所以是就许状态,然后下面就是对应的线程栈内容了,在TreeDistance类的 treeDist方法中,对应的文件行数是 64 行。
这里使用 thread dump一般也不会是上面我截图代码里的这种代码量很少的,一般是大型项目,有时候跑着跑着没反应,又不知道跑到哪了,特别是一些刚接触的大项目或者需要定位一个大项目的一个疑难问题,一时没思路时,可以使用这个方法,个人觉得非常有帮助。

前面说了mysql数据库的事务相关的,那事务是用来干嘛的,这里得补一下ACID,

ACID,是指数据库管理系统DBMS)在写入或更新资料的过程中,为保证事务(transaction)是正确可靠的,所必须具备的四个特性:原子性(atomicity,或称不可分割性)、一致性(consistency)、隔离性(isolation,又称独立性)、持久性(durability)。

  • Atomicity(原子性):一个事务(transaction)中的所有操作,或者全部完成,或者全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被回滚(Rollback)到事务开始前的状态,就像这个事务从来没有执行过一样。即,事务不可分割、不可约简。[1]

  • Consistency(一致性):在事务开始之前和事务结束以后,数据库的完整性没有被破坏。这表示写入的资料必须完全符合所有的预设约束触发器级联回滚等。[1]

  • Isolation(隔离性):数据库允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以防止多个事务并发执行时由于交叉执行而导致数据的不一致。事务隔离分为不同级别,包括未提交读(Read uncommitted)、提交读(read committed)、可重复读(repeatable read)和串行化(Serializable)。[1]

  • Durability(持久性):事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。[1]

在mysql中,借助于MVCC,各种级别的锁,日志等特性来实现了事务的ACID,但是这个我们通常是对于一个数据库服务的定义,常见的情况下我们的数据库随着业务发展也会从单实例变成多实例,组成主从Master-Slave架构,这个时候其实会有一些问题随之出现,比如说主从同步延迟,假如在业务代码中做了读写分离,对于一些敏感度较低的数据其实问题不是很大,只要主从延迟不到特别夸张的地步一般都是可以忍受的,但是对于一些核心的业务数据,比如订单之类的,不能忍受数据不一致,下了单了,付了款了,一刷订单列表,发现这个订单还没支付,甚至订单都没在,这对于用户来讲是恨不能容忍的错误,那么这里就需要一些措施,要不就不读写分离,要不就在 redis 这类缓存下订单,或者支付后加个延时等,这些都是一些补偿措施,并且这也是一个不太切当的例子,比较合适的例子也可以用这个下单来说,一般在电商平台下单会有挺多要做的事情,比如像下面这个图

下单的是后要冻结核销优惠券,如果账户里有钱要冻结扣除账户里的钱,如果使用了J 豆也一样,可能还有 E 卡,忽略我借用的平台,因为目前一般后台服务化之后,可能每一项都是对应的一个后台服务,我们期望的执行过程是要不全成功,要不就全保持执行前状态,不能是部分扣减核销成功了,部分还不行,所以我们处理这种情况会引入一些通用的方案,第一种叫二阶段提交,

二阶段提交(英语:Two-phase Commit)是指在计算机网络以及数据库领域内,为了使基于分布式系统架构下的所有节点在进行事务提交时保持一致性而设计的一种算法。通常,二阶段提交也被称为是一种协议(Protocol)。在分布式系统中,每个节点虽然可以知晓自己的操作时成功或者失败,却无法知道其他节点的操作的成功或失败。当一个事务跨越多个节点时,为了保持事务的ACID特性,需要引入一个作为协调者的组件来统一掌控所有节点(称作参与者)的操作结果并最终指示这些节点是否要把操作结果进行真正的提交(比如将更新后的数据写入磁盘等等)。因此,二阶段提交的算法思路可以概括为: 参与者将操作成败通知协调者,再由协调者根据所有参与者的反馈情报决定各参与者是否要提交操作还是中止操作。

对于上面的例子,我们将整个过程分成两个阶段,首先是提交请求阶段,这个阶段大概需要做的是确定资源存在,锁定资源,可能还要做好失败后回滚的准备,如果这些都 ok 了那么就响应成功,这里其实用到了一个叫事务的协调者的角色,类似于裁判员,每个节点都反馈第一阶段成功后,开始执行第二阶段,也就是实际执行操作,这里也是需要所有节点都反馈成功后才是执行成功,要不就是失败回滚。其实常用的分布式事务的解决方案主要也是基于此方案的改进,比如后面介绍的三阶段提交,有三阶段提交就是因为二阶段提交比较尴尬的几个点,

  • 第一是对于两阶段提交,其中默认只有协调者有超时时间,当一个参与者进入卡死状态时只能依赖协调者的超时来结束任务,这中间的时间参与者都是锁定着资源
  • 第二是协调者的单点问题,万一挂了,参与者就会在那傻等着

所以三阶段提交引入了各节点的超时机制和一个准备阶段,首先是一个can commit阶段,询问下各个节点有没有资源,能不能进行操作,这个阶段不阻塞,只是提前做个摸底,这个阶段其实人畜无害,但是能提高成功率,在这个阶段如果就有节点反馈是不接受的,那就不用执行下去了,也没有锁资源,然后第二阶段是 pre commit ,这个阶段做的事情跟原来的 第一阶段比较类似,然后是第三阶段do commit,其实三阶段提交我个人觉得只是加了个超时,和准备阶段,好像木有根本性的解决的两阶段提交的问题,后续可以再看看一些论文来思考讨论下。

2020年05月24日22:11 更新
这里跟朋友讨论了下,好像想通了最核心的一点,对于前面说的那个场景,如果是两阶段提交,如果各个节点中有一个没回应,并且协调者也挂了,这个时候会有什么情况呢,再加一个假设,其实比如这个一阶段其实是检验就失败的,理论上应该大家都释放资源,那么对于这种异常情况,其他的参与者就不知所措了,就傻傻地锁着资源阻塞着,那么三阶段提交的意义就出现了,把第一阶段拆开,那么即使在这个阶段出现上述的异常,即也不会锁定资源,同时参与者也有超时机制,在第二阶段锁定资源出现异常是,其他参与者节点等超时后就自动释放资源了,也就没啥问题了,不过对于那种异常恢复后的一些情况还是没有很好地解决,需要借助 zk 等,后面有空可以讲讲 paxos 跟 raft 等

看完前面两篇水文之后,感觉不得不来分析下 mysql 的锁了,其实前面说到幻读的时候是有个前提没提到的,比如一个select * from table1 where id = 1这种查询语句其实是不会加传说中的锁的,当然这里是指在 RR 或者 RC 隔离级别下,
看一段 mysql官方文档

SELECT ... FROM is a consistent read, reading a snapshot of the database and setting no locks unless the transaction isolation level is set to SERIALIZABLE. For SERIALIZABLE level, the search sets shared next-key locks on the index records it encounters. However, only an index record lock is required for statements that lock rows using a unique index to search for a unique row.

纯粹的这种一致性读,实际读取的是快照,也就是基于 read view 的读取方式,除非当前隔离级别是SERIALIZABLE
但是对于以下几类

  • select * from table where ? lock in share mode;
  • select * from table where ? for update;
  • insert into table values (...);
  • update table set ? where ?;
  • delete from table where ?;

除了第一条是 S 锁之外,其他都是 X 排他锁,这边在顺带下,S 锁表示共享锁, X 表示独占锁,同为 S 锁之间不冲突,S 与 X,X 与 S,X 与 X 之间都冲突,也就是加了前者,后者就加不上了
我们知道对于 RC 级别会出现幻读现象,对于 RR 级别不会出现,主要的区别是 RR 级别下对于以上的加锁读取都根据情况加上了 gap 锁,那么是不是 RR 级别下以上所有的都是要加 gap 锁呢,当然不是
举个例子,RR 事务隔离级别下,table1 有个主键id 字段
select * from table1 where id = 10 for update
这条语句要加 gap 锁吗?
答案是不需要,这里其实算是我看了这么久的一点自己的理解,啥时候要加 gap 锁,判断的条件是根据我查询的数据是否会因为不加 gap 锁而出现数量的不一致,我上面这条查询语句,在什么情况下会出现查询结果数量不一致呢,只要在这条记录被更新或者删除的时候,有没有可能我第一次查出来一条,第二次变成两条了呢,不可能,因为是主键索引。
再变更下这个题的条件,当 id 不是主键,但是是唯一索引,这样需要怎么加锁,注意问题是怎么加锁,不是需不需要加 gap 锁,这里呢就是稍微延伸一下,把聚簇索引(主键索引)和二级索引带一下,当 id 不是主键,说明是个二级索引,但是它是唯一索引,体会下,首先对于 id = 10这个二级索引肯定要加锁,要不要锁 gap 呢,不用,因为是唯一索引,id = 10 只可能有这一条记录,然后呢,这样是不是就好了,还不行,因为啥,因为它是二级索引,对应的主键索引的记录才是真正的数据,万一被更新掉了咋办,所以在 id = 10 对应的主键索引上也需要加上锁(默认都是 record lock行锁),那主键索引上要不要加 gap 呢,也不用,也是精确定位到这一条记录
最后呢,当 id 不是主键,也不是唯一索引,只是个普通的索引,这里就需要大名鼎鼎的 gap 锁了,
是时候画个图了

其实核心的目的还是不让这个 id=10 的记录不会出现幻读,那么就需要在 id 这个索引上加上三个 gap 锁,主键索引上就不用了,在 id 索引上已经控制住了id = 10 不会出现幻读,主键索引上这两条对应的记录已经锁了,所以就这样 OK 了

0%