달력

1

« 2025/1 »

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
2006. 10. 30. 19:26

nio Selector을 이용한 Multiplexed I/O Java2006. 10. 30. 19:26

새로운 소켓 컨넥션(채널)마다 새로운 쓰레드를 생성하는 대신 채널을 multiplexing하면 하나 이상의 쓰레드가 모든 채널을 처리할 수 있다.
특정 오퍼레이션에 관심사항을 등록한다. 그러면 가용한 쓰레드가1G사용해서 키의 상태를 검사할 수 있다. 이러한 메소드들은 어떤 키를 기다리는 오퍼레이션이 처리될 것인지 알려준다.

  if (selectionKey.isAcceptable()) {
    socket = serverSocket.accept();
    channel = socket.getChannel();
    if (channel != null) {
      channel.configureBlocking(false);
      channel.register(selector, SelectionKey.OP_READ);
    }
  }

만일 exception이 발생하면 아래와 같이 key를 cancel할 수 있다.

  } catch (SomeException e) {
    selectionKey.cancel();
  }

위의 코드는 key와 관련된 컨넥션을 무효화한다.

다음은 지금까지 설명한 예로 작성한 서버와 클라이언트이다.

package test;

import java.io.*;
import java.net.*;
import java.nio.*;
import java.nio.channels.*;
import java.util.*;

public class SelectorTest {
  private static int PORT = 9876;
  private static int BUFFER_SIZE = 1024;
 
  public static void main (String args[]) {
       ByteBuffer sharedBuffer = ByteBuffer.allocateDirect(BUFFER_SIZE);
       Selector selector = null;
       ServerSocket serverSocket = null;
      
       try {
           ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
           serverSocketChannel.configureBlocking(false);
          
           serverSocket = serverSocketChannel.socket();
           InetSocketAddress inetSocketAddress = new InetSocketAddress(PORT);
           serverSocket.bind(inetSocketAddress);
          
           selector = Selector.open();
           serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
       }
       catch (IOException e) {
           System.err.println("Unable to setup environment\");
           System.exit(-1);
       }
      
       try {
           while (true) {
               int count = selector.select();
              
               // nothing to process
               if (count == 0) {
                   continue;
               }
              
               Set keySet = selector.selectedKeys();
               Iterator itor = keySet.iterator();

               while (itor.hasNext()) {
                   SelectionKey selectionKey = (SelectionKey) itor.next();
                   itor.remove();
                  
                   Socket socket = null;
                   SocketChannel channel = null;
                  
                   if (selectionKey.isAcceptable()) {
                       System.out.println("["+Thread.currentThread().getName()+"] Got acceptable key\");
                       try {
                           socket = serverSocket.accept();
                           System.out.println("["+Thread.currentThread().getName()+"] Connection from: " + socket);
                           channel = socket.getChannel();
                       }
                       catch (IOException e) {
                           System.err.println("["+Thread.currentThread().getName()+"] Unable to accept channel");
                           e.printStackTrace();
                           selectionKey.cancel();
                       }
                       if (channel != null) {
                           try {
                               System.out.println("["+Thread.currentThread().getName()+"] Watch for something to read");
                               channel.configureBlocking(false);
                               channel.register(selector, SelectionKey.OP_READ);
                           }
                           catch (IOException e) {
                               System.err.println("["+Thread.currentThread().getName()+"] Unable to use channel\");
                               e.printStackTrace();
                               selectionKey.cancel();
                           }
                       }
                   }
                  
                   if (selectionKey.isReadable()) {
                       System.out.println("["+Thread.currentThread().getName()+"] Reading channel");
                       SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                       sharedBuffer.clear();
                      
                       int bytes = -1;
                       try {
                           while ((bytes = socketChannel.read(sharedBuffer)) > 0) {
                               System.out.println("["+Thread.currentThread().getName()+"] Reading...\");
                               sharedBuffer.flip();
                               while (sharedBuffer.hasRemaining()) {
                                   System.out.println("["+Thread.currentThread().getName()+"] Writing...");
                                   socketChannel.write(sharedBuffer);
                               }
                               sharedBuffer.clear();
                           }
                       }
                       catch (IOException e) {
                           System.err.println("["+Thread.currentThread().getName()+"] Error writing back bytes\");
                           e.printStackTrace();
                           selectionKey.cancel();
                       }
                      
                       try {
                           System.out.println("["+Thread.currentThread().getName()+"] Closing...\");
                           socketChannel.close();
                       }
                       catch (IOException e) {
                           e.printStackTrace();
                           selectionKey.cancel();
                       }
                   }
                   System.out.println("["+Thread.currentThread().getName()+"] Next...\");
               }
           }
       }
       catch (IOException e) {
           System.err.println("["+Thread.currentThread().getName()+"] Error during select()\");
           e.printStackTrace();
       }
  }
}


package test;

import java.net.*;
import java.io.*;

public class Connection {
  private static final int LOOP_COUNT = 100;
  private static final int SLEEP_TIME = 500;
  private static final int PORT = 9876;
 
  public static void main(String args[]) throws IOException, InterruptedException {
       for (int i=0; i<LOOP_COUNT; i++) {
           Socket socket = new Socket("localhost", PORT);
           InputStream is = socket.getInputStream();
           OutputStream os = socket.getOutputStream();
           Writer writer = new OutputStreamWriter(os, "US-ASCII");
           PrintWriter out = new PrintWriter(writer, true);
           out.println("Hello, World");
           BufferedReader in = new BufferedReader(new InputStreamReader(is, "US-ASCII"));
           String line;
           while ((line = in.readLine()) != null) {
               System.out.println(i + \": \" + line);
           }
           socket.close();
           Thread.sleep(SLEEP_TIME);
       }
  }
}

:
Posted by codetemplate