2012년 9월 13일 목요일

SocketChannel 이용한 non-blocking server/client 예제

예제는 SocketChannel 을 이용한 non-blocking 서버와 클라이언트 입니다.
서버는 여러개의 클라이언트 접속을 처리합니다.
클라이언트는 서버에 메시지를 전달할 수 있고 서버는 전달받은 메시지를 화면에 출력합니다.

실행 예)

- 서버측 -

Server :: started
Server :: waiting for accept
Server :: accepted - client [java.nio.channels.SocketChannel[connected local=/127.0.0.1:9891 remote=/127.0.0.1:61334]]
Client :: started
From Client : Hello
From Client : Hello World

- 클라이언트 측 -

Client :: started
Client :: connected
From Server : Welcome
Hello
[write :: text : Hello / len : 5]
Hello World
[write :: text : Hello World / len : 11]

폴더 구조:


소스 코드:


Abortable.java : 간단하게 종료를 나타내는 클래스를 만들었습니다.
package com.tcpsample;

public class Abortable {
    public boolean done = false;
    
    public Abortable() {
        init();
    }
    
    public void init() {
        done = false;
    }
    
    public boolean isDone() {
        return done;
    }
}

Server.java
- ServerThread 클래스는 accept 처리하는 스레드입니다.
- ClientHandlerThread 클래스는 접속한 client 를 처리하는 스레드입니다.
package com.tcpsample;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 *
 */
public class Server {
    
    public static final int PORT_NUMBER = 9891;
    
    private Abortable abortable;
    private ServerThread serverThread;
    
    /**
     * 
     */
    public Server() {
        abortable = new Abortable();
    }
    
    /**
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        
        
        Server server = new Server();
        server.start();
        
        Thread.sleep(500);
        
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();
        
        server.stop();

        System.out.println("BYE");
    }
    
    /**
     * start server
     */
    public void start() {
        
        abortable.init();
        
        if (serverThread == null || !serverThread.isAlive()) {
            serverThread = new ServerThread(abortable);
            serverThread.start();
        }
    }
    
    /**
     * stop server
     */
    public void stop() {
        
        abortable.done = true;
        
        if (serverThread != null && serverThread.isAlive()) {
            serverThread.interrupt();
        }
        
    }
    
    /**
     *
     */
    public class ServerThread extends Thread {
        
        private Abortable abortable;
        private List<Thread> clientList = new ArrayList<Thread>();
        
        public ServerThread(Abortable abortable) {
            this.abortable = abortable;
        }

        @Override
        public void run() {
            super.run();
            
            ServerSocketChannel server = null;
            Selector selector = null;
            
            try {
                
                System.out.println("Server :: started");
                
                server = ServerSocketChannel.open();
                server.socket().bind(new InetSocketAddress("", PORT_NUMBER));
                server.configureBlocking(false);
                
                selector = Selector.open();
                server.register(selector, SelectionKey.OP_ACCEPT);
                
                System.out.println("Server :: waiting for accept");
                
                while (!Thread.interrupted() && !abortable.isDone()) {
                    selector.select(3000);
                    
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        
                        SelectionKey key = iter.next();
                        if (key.isAcceptable()) {
                            
                            SocketChannel client = server.accept();
                            
                            if (client != null) {
                                System.out.printf("Server :: accepted - client [%s]\n", client);
                                Thread t = new ClientHandlerThread(abortable, client);
                                t.start();
                                clientList.add(t);
                            }
                        }
                    }
                }
                
            } catch (Exception e) {
                
                e.printStackTrace();
                
            } finally {
                
                for (Thread t : clientList) {
                    
                    if (t != null && t.isAlive())
                        t.interrupt();
                    
                    try {
                        t.join(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
                if (server != null) {
                    
                    try {
                        server.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                System.out.println("Server :: done");
                
            }
        }
    }

    /**
     *
     */
    public class ClientHandlerThread extends Thread {
        
        private Abortable abortable;
        private SocketChannel client;
        
        public ClientHandlerThread(Abortable abortable, SocketChannel client) {
            this.abortable = abortable;
            this.client = client;
        }

        @Override
        public void run() {
            super.run();
            
            Selector selector = null;
            Charset cs = Charset.forName("UTF-8");
            
            boolean done = false;
            
            try {
                
                System.out.println("Client :: started");
                
                client.configureBlocking(false);
                selector = Selector.open();
                
                client.register(selector, SelectionKey.OP_READ);
                
                // send welcome message
                client.write(ByteBuffer.wrap(new String("Welcome").getBytes()));
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                while (!Thread.interrupted() && !abortable.isDone() && !done) {
                    selector.select(3000);
                    
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (!abortable.isDone() && iter.hasNext() && !done) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            int len = client.read(buffer);
                            if (len < 0) {
                                done = true;
                                break;
                            } else if (len == 0) {
                                continue;
                            }
                            buffer.flip();
                            
                            CharBuffer cb = cs.decode(buffer);
                            
                            System.out.printf("From Client : ");
                            while (cb.hasRemaining()) {
                                System.out.printf("%c", cb.get());
                            }
                            System.out.println();
                            
                            buffer.compact();
                        }
                    }
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                
                if (client != null) {
                    try {
                        client.socket().close();
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                System.out.println("Client :: bye");
            }
        }
    }
}

Client.java
- ClientThread 클래스는 서버에 접속하고 통신하는 스레드입니다.
package com.tcpsample;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 *
 */
public class Client {
    
    private Abortable abortable = new Abortable();
    private ClientThread clientThread;
    
    /**
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        
        Client client = new Client();
        client.start("127.0.0.1", Server.PORT_NUMBER);
        
        Thread.sleep(500);

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        
        while (true) {
            String line = reader.readLine();
            
            if (line.equals("quit"))
                break;
            
            try {
                client.sayToServer(line);
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
            
        }
        
        client.stop();
         
        System.out.println("BYE");
    }
    
    /**
     * start client
     * 
     * @param host
     * @param port
     */
    public void start(String host, int port) {
        
        abortable.init();
        
        
        if (clientThread == null || !clientThread.isAlive()) {
            clientThread = new ClientThread(abortable, host, port);
            clientThread.start();
        }
    }
    
    /**
     * stop client
     */
    public void stop() {
        
        abortable.done = true;
        
        if (clientThread != null && clientThread.isAlive()) {
            clientThread.interrupt();
        }
        
    }
    
    /**
     * 
     * @param text
     * @throws IOException
     */
    public void sayToServer(String text) throws IOException {
        clientThread.sayToServer(text);
    }
    
    /**
     *
     */
    public class ClientThread extends Thread {
        
        private Abortable abortable;
        private String host;
        private int port;
        private SocketChannel client;
        
        /**
         * 
         * @param abortable
         * @param host
         * @param port
         */
        public ClientThread(Abortable abortable, String host, int port) {
            this.abortable = abortable;
            this.host = host;
            this.port = port;
        }

        /**
         * 
         * @param text
         * @throws IOException 
         */
        public void sayToServer(String text) throws IOException {
            int len = client.write(ByteBuffer.wrap(text.getBytes()));
            System.out.printf("[write :: text : %s / len : %d]\n", text, len);
        }

        @Override
        public void run() {
            super.run();
            
            boolean done = false;
            Selector selector = null;
            Charset cs = Charset.forName("UTF-8");
            
            try {
                
                System.out.println("Client :: started");
                
                client = SocketChannel.open();
                client.configureBlocking(false);
                client.connect(new InetSocketAddress(host, port));
                
                selector = Selector.open();
                client.register(selector, SelectionKey.OP_READ);
                
                while (!Thread.interrupted() && !abortable.isDone() && !client.finishConnect()) {
                    Thread.sleep(10);
                }
                
                System.out.println("Client :: connected");
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                while (!Thread.interrupted() && !abortable.isDone() && !done) {
                    
                    selector.select(3000);
                    
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (!Thread.interrupted() && !abortable.isDone() && !done && iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            int len = client.read(buffer);
                            if (len < 0) {
                                System.out.println("Client :: server closed");
                                done = true;
                                break;
                            } else if (len == 0) {
                                continue;
                            }
                            buffer.flip();
                            
                            CharBuffer cb = cs.decode(buffer);
                            
                            System.out.printf("From Server : ");
                            while (cb.hasRemaining()) {
                                System.out.printf("%c", cb.get());
                            }
                            System.out.println();
                            
                            buffer.compact();
                        }
                    }
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                
                if (client != null) {
                    try {
                        client.socket().close();
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                System.out.println("Client :: done");
            }
            
        }
    }
}


댓글 2개:

  1. 164 줄의 if (len lt; 0) { 부분이 오류가 있네요
    제가 아직초보라 여부분좀 수정부탁 드립니다

    답글삭제
    답글
    1. 수정했습니다.
      len lt; 0 => len < 0

      len 값이 -1 인 경우 소켓 연결이 종료 되었음을 의미 합니다.

      감사합니다.

      삭제