Reactor模型是同步I/O事件处理的一种常见模型,其核心思想:将关注的I/O事件注册到多路复用器上,一旦有I/O事件触发,将事件分发到事件处理器中,执行就绪I/O事件对应的处理函数中。

单线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocketChannel;

public Reactor(int port) throws IOException {
selector = Selector.open();

serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
sk.attach(new Acceptor());
}

class Acceptor implements Runnable {
@Override
public void run() {
try {
SocketChannel channel = serverSocketChannel.accept();
if (channel != null) new Handler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}

static final class Handler implements Runnable {
final int MAX_IN = 1024, MAX_OUT = 1024;
final SocketChannel socketChannl;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAX_IN);
ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
static final int READING = 0, SENDING = 1;
int state = READING;

Handler(Selector selector, SocketChannel channel) throws IOException {
socketChannl = channel;
socketChannl.configureBlocking(false);
sk = socketChannl.register(selector, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}

@Override
public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException e) {
e.printStackTrace();
}
}

void read() throws IOException {
socketChannl.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
sk.interestOps(SelectionKey.OP_WRITE);
}
}

void send() throws IOException {
socketChannl.write(output);
if (outputIsComplete()) sk.cancel();
}

boolean inputIsComplete() {
return false;
}

boolean outputIsComplete() {
return false;
}

void process() {
}
}

@Override
public void run() {
while (!Thread.interrupted()) {
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
for (SelectionKey key : selected) {
dispatch(key);
}
selected.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}

void dispatch(SelectionKey key) {
Runnable r = (Runnable) key.attachment();
if (r != null) r.run();
}

public static void main(String[] args) throws IOException {
new Reactor(8080).run();
}
}

代码逻辑如下图所示。

reactor.png

多线程

主从模式

与Netty线程模型类似。

参考:

DL’s NIO