高性能的网络编程都离不开反应器模式,Nginx、Redis、Netty都采用了反应器模式。
Reactor反应器模式
反应器模式由Reactor反应器线程、Handlers处理器两大角色组成:
Reactor反应器线程的职责:负责响应NIO选择器监控的IO事件,并且分发到Handlers处理器。
Handlers处理器的职责:非阻塞的执行业务处理逻辑。
单线程的Reactor反应器
代码演示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set;
public class Reactor implements Runnable{
final Selector selector; final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException { selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey selectionKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT); selectionKey.attach(new AcceptorHandler()); } @Override public void run() { try { while(!Thread.interrupted()) { selector.select(); Set selected = selector.selectedKeys(); Iterator iterator = selected.iterator(); while(iterator.hasNext()) { SelectionKey selectionKey = (SelectionKey) iterator.next(); dispatcher(selectionKey); } } } catch(IOException e) {
} }
void dispatcher(SelectionKey selectionKey) { Runnable handler = (Runnable) (selectionKey.attachment()); if (handler != null) { handler.run(); } }
class AcceptorHandler implements Runnable { @Override public void run() { try { SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { new Handler(selector, socketChannel); } } catch (IOException e) {
} } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| import com.sun.media.jfxmedia.logging.Logger;
import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel;
final class Handler implements Runnable {
final SocketChannel socketChannel; final SelectionKey selectionKey; ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector selector, SocketChannel socketChannel) throws IOException { this.socketChannel = socketChannel; socketChannel.configureBlocking(false); selectionKey = socketChannel.register(selector, 0); selectionKey.attach(this); selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); }
@Override public void run() { try { if (state == SENDING) { socketChannel.write(byteBuffer); byteBuffer.clear(); selectionKey.interestOps(SelectionKey.OP_READ); state = READING; } else if (state == READING) { int length = 0; while((length = socketChannel.read(byteBuffer)) > 0) { System.out.println(new String(byteBuffer.array(), 0, length)); } byteBuffer.flip(); selectionKey.interestOps(SelectionKey.OP_WRITE); state = SENDING; } } catch (IOException e) { e.printStackTrace(); } } }
|
多线程Reactor反应器
总体来说,多线程池反应器的模式,大致如下:
将负责输入输出处理的IOHandler处理器的执行,放入独立的线程池中。这样,业务处理线程与负责服务监听和IO事件查询的反应器线程相隔离,避免服务器的连接监听受到阻塞。
如果服务器为多核的CPU,可以将反应器线程拆分为多个子反应器(SubReactor)线程;同时,引入多个选择器,每一个SubReactor子线程负责一个选择器。这样,充分释放了系统资源的能力;也提高了反应器管理大量连接,提升选择大量通道的能力。
多线程Reactor反应器实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87
| class MultiThreadEchoServerReactor { ServerSocketChannelserverSocket; AtomicInteger next = new AtomicInteger(0); Selector[] selectors = new Selector[2]; SubReactor[] subReactors = null; MultiThreadEchoServerReactor() throws IOException { selectors[0] = Selector.open(); selectors[1] = Selector.open(); serverSocket = ServerSocketChannel.open(); InetSocketAddress address = new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP, NioDemoConfig.SOCKET_SERVER_PORT); serverSocket.socket().bind(address); serverSocket.configureBlocking(false); SelectionKeysk = serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT); sk.attach(new AcceptorHandler()); SubReactor subReactor1 = new SubReactor(selectors[0]); SubReactor subReactor2 = new SubReactor(selectors[1]); subReactors = new SubReactor[]{subReactor1, subReactor2}; }
private void startService() { new Thread(subReactors[0]).start(); new Thread(subReactors[1]).start(); }
class SubReactor implements Runnable { final Selector selector; public SubReactor(Selector selector) { this.selector = selector; } public void run() { try { while (! Thread.interrupted()) { selector.select(); Set<SelectionKey>keySet = selector.selectedKeys(); Iterator<SelectionKey> it = keySet.iterator(); while (it.hasNext()) { SelectionKeysk = it.next(); dispatch(sk); } keySet.clear(); } } catch (IOException ex) { ex.printStackTrace(); } } void dispatch(SelectionKeysk) { Runnable handler = (Runnable) sk.attachment(); if (handler ! = null) { handler.run(); } } } class AcceptorHandler implements Runnable { public void run() { try { SocketChannel channel = serverSocket.accept(); if (channel ! = null) new MultiThreadEchoHandler(selectors[next.get()], channel); } catch (IOException e) { e.printStackTrace(); } if (next.incrementAndGet() == selectors.length) { next.set(0); } } } public static void main(String[] args) throws IOException { MultiThreadEchoServerReactor server = new MultiThreadEchoServerReactor(); server.startService(); } }
|
多线程Handler处理器实践
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| class MultiThreadEchoHandler implements Runnable { final SocketChannel channel; final SelectionKeysk; final ByteBufferbyteBuffer = ByteBuffer.allocate(1024); static final int RECIEVING = 0, SENDING = 1; int state = RECIEVING; static ExecutorService pool = Executors.newFixedThreadPool(4); MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException { channel = c; c.configureBlocking(false); sk = channel.register(selector, 0); sk.attach(this); sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } public void run() { pool.execute(new AsyncTask()); } public synchronized void asyncRun() { try { if (state == SENDING) { channel.write(byteBuffer); byteBuffer.clear(); sk.interestOps(SelectionKey.OP_READ); state = RECIEVING; } else if (state == RECIEVING) { int length = 0; while ((length = channel.read(byteBuffer)) > 0) { Logger.info(new String(byteBuffer.array(), 0, length)); } byteBuffer.flip(); sk.interestOps(SelectionKey.OP_WRITE); state = SENDING; } } catch (IOException ex) { ex.printStackTrace(); } } class AsyncTask implements Runnable { public void run() { MultiThreadEchoHandler.this.asyncRun(); } }
}
|