nio Selector을 이용한 Multiplexed I/O Java2006. 10. 30. 19:26
새로운 소켓 컨넥션(채널)마다 새로운 쓰레드를 생성하는 대신 채널을 multiplexing하면 하나 이상의 쓰레드가 모든 채널을 처리할 수 있다.
특정 오퍼레이션에 관심사항을 등록한다. 그러면 가용한 쓰레드가1G사용해서 키의 상태를 검사할 수 있다. 이러한 메소드들은 어떤 키를 기다리는 오퍼레이션이 처리될 것인지 알려준다.
if (selectionKey.isAcceptable()) {
socket = serverSocket.accept();
channel = socket.getChannel();
if (channel != null) {
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
}
만일 exception이 발생하면 아래와 같이 key를 cancel할 수 있다.
} catch (SomeException e) {
selectionKey.cancel();
}
위의 코드는 key와 관련된 컨넥션을 무효화한다.
다음은 지금까지 설명한 예로 작성한 서버와 클라이언트이다.
package test;
import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;
public class SelectorTest {
private static int PORT = 9876;
private static int BUFFER_SIZE = 1024;
public static void main (String args[]) {
ByteBuffer sharedBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
Selector selector = null;
ServerSocket serverSocket = null;
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocket = serverSocketChannel.socket();
InetSocketAddress inetSocketAddress = new InetSocketAddress(PORT);
serverSocket.bind(inetSocketAddress);
selector = Selector.open();
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
catch (IOException e) {
System.err.println("Unable to setup environment\");
System.exit(-1);
}
try {
while (true) {
int count = selector.select();
// nothing to process
if (count == 0) {
continue;
}
Set keySet = selector.selectedKeys();
Iterator itor = keySet.iterator();
while (itor.hasNext()) {
SelectionKey selectionKey = (SelectionKey) itor.next();
itor.remove();
Socket socket = null;
SocketChannel channel = null;
if (selectionKey.isAcceptable()) {
System.out.println("["+Thread.currentThread().getName()+"] Got acceptable key\");
try {
socket = serverSocket.accept();
System.out.println("["+Thread.currentThread().getName()+"] Connection from: " + socket);
channel = socket.getChannel();
}
catch (IOException e) {
System.err.println("["+Thread.currentThread().getName()+"] Unable to accept channel");
e.printStackTrace();
selectionKey.cancel();
}
if (channel != null) {
try {
System.out.println("["+Thread.currentThread().getName()+"] Watch for something to read");
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
catch (IOException e) {
System.err.println("["+Thread.currentThread().getName()+"] Unable to use channel\");
e.printStackTrace();
selectionKey.cancel();
}
}
}
if (selectionKey.isReadable()) {
System.out.println("["+Thread.currentThread().getName()+"] Reading channel");
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
sharedBuffer.clear();
int bytes = -1;
try {
while ((bytes = socketChannel.read(sharedBuffer)) > 0) {
System.out.println("["+Thread.currentThread().getName()+"] Reading...\");
sharedBuffer.flip();
while (sharedBuffer.hasRemaining()) {
System.out.println("["+Thread.currentThread().getName()+"] Writing...");
socketChannel.write(sharedBuffer);
}
sharedBuffer.clear();
}
}
catch (IOException e) {
System.err.println("["+Thread.currentThread().getName()+"] Error writing back bytes\");
e.printStackTrace();
selectionKey.cancel();
}
try {
System.out.println("["+Thread.currentThread().getName()+"] Closing...\");
socketChannel.close();
}
catch (IOException e) {
e.printStackTrace();
selectionKey.cancel();
}
}
System.out.println("["+Thread.currentThread().getName()+"] Next...\");
}
}
}
catch (IOException e) {
System.err.println("["+Thread.currentThread().getName()+"] Error during select()\");
e.printStackTrace();
}
}
}
package test;
import java.net.*;
import java.io.*;
public class Connection {
private static final int LOOP_COUNT = 100;
private static final int SLEEP_TIME = 500;
private static final int PORT = 9876;
public static void main(String args[]) throws IOException, InterruptedException {
for (int i=0; i<LOOP_COUNT; i++) {
Socket socket = new Socket("localhost", PORT);
InputStream is = socket.getInputStream();
OutputStream os = socket.getOutputStream();
Writer writer = new OutputStreamWriter(os, "US-ASCII");
PrintWriter out = new PrintWriter(writer, true);
out.println("Hello, World");
BufferedReader in = new BufferedReader(new InputStreamReader(is, "US-ASCII"));
String line;
while ((line = in.readLine()) != null) {
System.out.println(i + \": \" + line);
}
socket.close();
Thread.sleep(SLEEP_TIME);
}
}
}