Disruptor 系列一

很久之前就听说过这个框架,不过之前有点跟消息队列混起来,这个也是种队列,但不是跟 rocketmq,nsq 那种一样的,而是在进程内部提供队列服务的,偏向于取代ArrayBlockingQueue,因为这个阻塞队列是使用了锁来控制阻塞,关于并发其实有一些通用的最佳实践,就是用锁,即使是 JDK 提供的锁,也是比较耗资源的,当然这是跟不加锁的对比,同样是锁,JDK 的实现还是性能比较优秀的。常见的阻塞队列中例如 ArrayBlockingQueueLinkedBlockingQueue 都有锁的身影的存在,区别在于 ArrayBlockingQueue 是一把锁,后者是两把锁,不过重点不在几把锁,这里其实是两个问题,一个是所谓的 lock free, 对于一个单生产者的 disruptor 来说,因为写入是只有一个线程的,是可以不用加锁,多生产者的时候使用的是 cas 来获取对应的写入坑位,另一个是解决“伪共享”问题,后面可以详细点分析,先介绍下使用
首先是数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class LongEvent {
private long value;

public void set(long value) {
this.value = value;
}

public long getValue() {
return value;
}

public void setValue(long value) {
this.value = value;
}
}

事件生产

1
2
3
4
5
6
7
public class LongEventFactory implements EventFactory<LongEvent>
{
public LongEvent newInstance()
{
return new LongEvent();
}
}

事件处理器

1
2
3
4
5
6
7
8
9
10
11
12
public class LongEventHandler implements EventHandler<LongEvent> {

// event 事件,
// sequence 当前的序列
// 是否当前批次最后一个数据
public void onEvent(LongEvent event, long sequence, boolean endOfBatch)
{
String str = String.format("long event : %s l:%s b:%s", event.getValue(), sequence, endOfBatch);
System.out.println(str);
}
}

主方法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package disruptor;

import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;

import java.nio.ByteBuffer;

public class LongEventMain
{
public static void main(String[] args) throws Exception
{
// 这个需要是 2 的幂次,这样在定位的时候只需要位移操作,也能减少各种计算操作
int bufferSize = 1024;

Disruptor<LongEvent> disruptor =
new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);

// 类似于注册处理器
disruptor.handleEventsWith(new LongEventHandler());
// 或者直接用 lambda
disruptor.handleEventsWith((event, sequence, endOfBatch) ->
System.out.println("Event: " + event));
// 启动我们的 disruptor
disruptor.start();


RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
ByteBuffer bb = ByteBuffer.allocate(8);
for (long l = 0; true; l++)
{
bb.putLong(0, l);
// 生产事件
ringBuffer.publishEvent((event, sequence, buffer) -> event.set(buffer.getLong(0)), bb);
Thread.sleep(1000);
}
}
}

运行下可以看到运行结果

这里其实就只是最简单的使用,生产者只有一个,然后也不是批量的。