Reactor模型是同步I/O事件处理的一种常见模型,其核心思想:将关注的I/O事件注册到多路复用器上,一旦有I/O事件触发,将事件分发到事件处理器中,执行就绪I/O事件对应的处理函数中。
单线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| public class Reactor implements Runnable { final Selector selector; final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException { selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.configureBlocking(false); SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); }
class Acceptor implements Runnable { @Override public void run() { try { SocketChannel channel = serverSocketChannel.accept(); if (channel != null) new Handler(selector, channel); } catch (IOException e) { e.printStackTrace(); } } }
static final class Handler implements Runnable { final int MAX_IN = 1024, MAX_OUT = 1024; final SocketChannel socketChannl; final SelectionKey sk; ByteBuffer input = ByteBuffer.allocate(MAX_IN); ByteBuffer output = ByteBuffer.allocate(MAX_OUT); static final int READING = 0, SENDING = 1; int state = READING;
Handler(Selector selector, SocketChannel channel) throws IOException { socketChannl = channel; socketChannl.configureBlocking(false); sk = socketChannl.register(selector, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); }
@Override public void run() { try { if (state == READING) read(); else if (state == SENDING) send(); } catch (IOException e) { e.printStackTrace(); } }
void read() throws IOException { socketChannl.read(input); if (inputIsComplete()) { process(); state = SENDING; sk.interestOps(SelectionKey.OP_WRITE); } }
void send() throws IOException { socketChannl.write(output); if (outputIsComplete()) sk.cancel(); }
boolean inputIsComplete() { return false; }
boolean outputIsComplete() { return false; }
void process() { } }
@Override public void run() { while (!Thread.interrupted()) { try { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); for (SelectionKey key : selected) { dispatch(key); } selected.clear(); } catch (IOException e) { e.printStackTrace(); } } }
void dispatch(SelectionKey key) { Runnable r = (Runnable) key.attachment(); if (r != null) r.run(); }
public static void main(String[] args) throws IOException { new Reactor(8080).run(); } }
|
代码逻辑如下图所示。
多线程
主从模式
与Netty线程模型类似。
参考:
DL’s NIO