2012년 9월 13일 목요일

Pool 예제

솔직히 Pool 에 대해 제가 정확히 이해했는지 자신 없습니다. 아래 내용은 제 사견입니다.

Pool 은 미리 사용하려는 양 만큼의 버퍼를 할당해 놓고 객체를 사용하고 사용한 객체는 다시 버퍼에 넣고 재활용하는 방식의 방법입니다.

이런 방식의 장점으로는 버퍼를 재활용함으로 인해 객체 할당 및 제거에 드는 시간을 없앨 수 있다는 점이 있겠습니다.

Pool.java
 - queue 가 두개 필요합니다. 유휴 버퍼와 사용 가능 버퍼로 나눕니다.
 - 유휴 버퍼는 미리 빈 객체들로 원하는 갯수 만큼 채워 넣습니다.
 - 사용 가능 버퍼에는 유휴 버퍼에서 얻은 사용 객체로 하나씩 채워지게 됩니다.
 - 사용 가능 버퍼를 다 사용하고 나면 다시 유휴 버퍼로 옮겨 넣음으로써 나중에 다시 사용 가능하게 됩니다.

소스 코드 :
package pool.test;

import java.util.concurrent.LinkedBlockingQueue;

/**
 *
 * @param <T>
 */
public class Pool<T> {
    
    private LinkedBlockingQueue<T> free = new LinkedBlockingQueue<T>();
    private LinkedBlockingQueue<T> work = new LinkedBlockingQueue<T>();
    
    /**
     * 
     * @param creator
     * @param count
     */
    public Pool(PoolObjectCreator<T> creator, int count) {
        for (int i = 0;i < count; i++) {
            free.add(creator.create());
        }
    }
    
    /**
     * 
     * @return
     */
    public final T acquire() {
        return free.size() > 0 ? free.remove() : null;
    }
    
    /**
     * 
     * @param item
     */
    public final void enqueue(T item) {
        work.add(item);
    }
    
    /**
     * 
     * @return
     */
    public final T dequeue() {
        return work.size() > 0 ? work.remove() : null;
    }
    
    /**
     * 
     * @param item
     */
    public final void release(T item) {
        free.add(item);
    }
    
    /**
     *
     * @param <T>
     */
    public static interface PoolObjectCreator<T> {
        public T create();
    }
}

아래 내용은 Pool 사용 예제 입니다.
 - Producer 스레드에서는 1초 마다 pool 에 데이터를 넣습니다.
 - Consumer 스레드에서는 pool 에서 데이터를 받아서 화면에 값을 출력합니다.

Main.java :
package pool.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 *
 */
public class Main {
    
    private Consumer consumer;
    private Producer producer;
    
    /**
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        
        Main main = new Main();
        main.start();
        
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();
        
        main.stop();
        
        System.out.println("Done");
    }
    
    /**
     * 
     */
    public void start() {
        
        Pool<Container> pool = new Pool<Container>(new Pool.PoolObjectCreator<Container>() {
            @Override
            public Container create() {
                return new Container();
            }
        } , 10);
        
        
        producer = new Producer(pool);
        consumer = new Consumer(pool);
        
        producer.start();
        consumer.start();
    }
    
    /**
     * 
     */
    public void stop() {
        
        producer.interrupt();
        consumer.interrupt();
        
    }
    
    /**
     * 
     */
    public class Container {
        public int x;
    }
    
    
    
    /**
     *
     */
    public class Producer extends Thread {
        
        private Pool<Container> pool;
        
        public Producer(Pool<Container> pool) {
            this.pool = pool;
        }

        @Override
        public void run() {
            super.run();
            
            try {
                
                System.out.println("Producer :: start");
                
                int count = 0;
                
                Container container = null;
                while (!Thread.interrupted()) {
                    
                    if ((container = pool.acquire()) != null) {
                        
                        container.x = count++;
                        
                        pool.enqueue(container);
                        
                        Thread.sleep(1000);
                        
                    } else {
                        Thread.sleep(10);
                    }
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("Producer :: done");
            }
        }
    }
    
    
    /**
     *
     */
    public class Consumer extends Thread {
        
        private Pool<Container> pool;
        
        public Consumer(Pool<Container> pool) {
            this.pool = pool;
        }

        @Override
        public void run() {
            super.run();
            
            try {
                
                System.out.println("Consumer :: start");
                
                Container container = null;
                while (!Thread.interrupted()) {
                    
                    if ((container = pool.dequeue()) != null) {
                        
                        System.out.printf("x : %d\n", container.x);
                        
                        pool.release(container);
                        
                    } else {
                        Thread.sleep(10);
                    }
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                System.out.println("Consumer :: done");
            }
        }
    }
    
}


SocketChannel 이용한 non-blocking server/client 예제

예제는 SocketChannel 을 이용한 non-blocking 서버와 클라이언트 입니다.
서버는 여러개의 클라이언트 접속을 처리합니다.
클라이언트는 서버에 메시지를 전달할 수 있고 서버는 전달받은 메시지를 화면에 출력합니다.

실행 예)

- 서버측 -

Server :: started
Server :: waiting for accept
Server :: accepted - client [java.nio.channels.SocketChannel[connected local=/127.0.0.1:9891 remote=/127.0.0.1:61334]]
Client :: started
From Client : Hello
From Client : Hello World

- 클라이언트 측 -

Client :: started
Client :: connected
From Server : Welcome
Hello
[write :: text : Hello / len : 5]
Hello World
[write :: text : Hello World / len : 11]

폴더 구조:


소스 코드:


Abortable.java : 간단하게 종료를 나타내는 클래스를 만들었습니다.
package com.tcpsample;

public class Abortable {
    public boolean done = false;
    
    public Abortable() {
        init();
    }
    
    public void init() {
        done = false;
    }
    
    public boolean isDone() {
        return done;
    }
}

Server.java
- ServerThread 클래스는 accept 처리하는 스레드입니다.
- ClientHandlerThread 클래스는 접속한 client 를 처리하는 스레드입니다.
package com.tcpsample;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

/**
 *
 */
public class Server {
    
    public static final int PORT_NUMBER = 9891;
    
    private Abortable abortable;
    private ServerThread serverThread;
    
    /**
     * 
     */
    public Server() {
        abortable = new Abortable();
    }
    
    /**
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        
        
        Server server = new Server();
        server.start();
        
        Thread.sleep(500);
        
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        reader.readLine();
        
        server.stop();

        System.out.println("BYE");
    }
    
    /**
     * start server
     */
    public void start() {
        
        abortable.init();
        
        if (serverThread == null || !serverThread.isAlive()) {
            serverThread = new ServerThread(abortable);
            serverThread.start();
        }
    }
    
    /**
     * stop server
     */
    public void stop() {
        
        abortable.done = true;
        
        if (serverThread != null && serverThread.isAlive()) {
            serverThread.interrupt();
        }
        
    }
    
    /**
     *
     */
    public class ServerThread extends Thread {
        
        private Abortable abortable;
        private List<Thread> clientList = new ArrayList<Thread>();
        
        public ServerThread(Abortable abortable) {
            this.abortable = abortable;
        }

        @Override
        public void run() {
            super.run();
            
            ServerSocketChannel server = null;
            Selector selector = null;
            
            try {
                
                System.out.println("Server :: started");
                
                server = ServerSocketChannel.open();
                server.socket().bind(new InetSocketAddress("", PORT_NUMBER));
                server.configureBlocking(false);
                
                selector = Selector.open();
                server.register(selector, SelectionKey.OP_ACCEPT);
                
                System.out.println("Server :: waiting for accept");
                
                while (!Thread.interrupted() && !abortable.isDone()) {
                    selector.select(3000);
                    
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        
                        SelectionKey key = iter.next();
                        if (key.isAcceptable()) {
                            
                            SocketChannel client = server.accept();
                            
                            if (client != null) {
                                System.out.printf("Server :: accepted - client [%s]\n", client);
                                Thread t = new ClientHandlerThread(abortable, client);
                                t.start();
                                clientList.add(t);
                            }
                        }
                    }
                }
                
            } catch (Exception e) {
                
                e.printStackTrace();
                
            } finally {
                
                for (Thread t : clientList) {
                    
                    if (t != null && t.isAlive())
                        t.interrupt();
                    
                    try {
                        t.join(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                
                if (server != null) {
                    
                    try {
                        server.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                System.out.println("Server :: done");
                
            }
        }
    }

    /**
     *
     */
    public class ClientHandlerThread extends Thread {
        
        private Abortable abortable;
        private SocketChannel client;
        
        public ClientHandlerThread(Abortable abortable, SocketChannel client) {
            this.abortable = abortable;
            this.client = client;
        }

        @Override
        public void run() {
            super.run();
            
            Selector selector = null;
            Charset cs = Charset.forName("UTF-8");
            
            boolean done = false;
            
            try {
                
                System.out.println("Client :: started");
                
                client.configureBlocking(false);
                selector = Selector.open();
                
                client.register(selector, SelectionKey.OP_READ);
                
                // send welcome message
                client.write(ByteBuffer.wrap(new String("Welcome").getBytes()));
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                while (!Thread.interrupted() && !abortable.isDone() && !done) {
                    selector.select(3000);
                    
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (!abortable.isDone() && iter.hasNext() && !done) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            int len = client.read(buffer);
                            if (len < 0) {
                                done = true;
                                break;
                            } else if (len == 0) {
                                continue;
                            }
                            buffer.flip();
                            
                            CharBuffer cb = cs.decode(buffer);
                            
                            System.out.printf("From Client : ");
                            while (cb.hasRemaining()) {
                                System.out.printf("%c", cb.get());
                            }
                            System.out.println();
                            
                            buffer.compact();
                        }
                    }
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                
                if (client != null) {
                    try {
                        client.socket().close();
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                System.out.println("Client :: bye");
            }
        }
    }
}

Client.java
- ClientThread 클래스는 서버에 접속하고 통신하는 스레드입니다.
package com.tcpsample;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;

/**
 *
 */
public class Client {
    
    private Abortable abortable = new Abortable();
    private ClientThread clientThread;
    
    /**
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        
        Client client = new Client();
        client.start("127.0.0.1", Server.PORT_NUMBER);
        
        Thread.sleep(500);

        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        
        while (true) {
            String line = reader.readLine();
            
            if (line.equals("quit"))
                break;
            
            try {
                client.sayToServer(line);
            } catch (Exception e) {
                e.printStackTrace();
                break;
            }
            
        }
        
        client.stop();
         
        System.out.println("BYE");
    }
    
    /**
     * start client
     * 
     * @param host
     * @param port
     */
    public void start(String host, int port) {
        
        abortable.init();
        
        
        if (clientThread == null || !clientThread.isAlive()) {
            clientThread = new ClientThread(abortable, host, port);
            clientThread.start();
        }
    }
    
    /**
     * stop client
     */
    public void stop() {
        
        abortable.done = true;
        
        if (clientThread != null && clientThread.isAlive()) {
            clientThread.interrupt();
        }
        
    }
    
    /**
     * 
     * @param text
     * @throws IOException
     */
    public void sayToServer(String text) throws IOException {
        clientThread.sayToServer(text);
    }
    
    /**
     *
     */
    public class ClientThread extends Thread {
        
        private Abortable abortable;
        private String host;
        private int port;
        private SocketChannel client;
        
        /**
         * 
         * @param abortable
         * @param host
         * @param port
         */
        public ClientThread(Abortable abortable, String host, int port) {
            this.abortable = abortable;
            this.host = host;
            this.port = port;
        }

        /**
         * 
         * @param text
         * @throws IOException 
         */
        public void sayToServer(String text) throws IOException {
            int len = client.write(ByteBuffer.wrap(text.getBytes()));
            System.out.printf("[write :: text : %s / len : %d]\n", text, len);
        }

        @Override
        public void run() {
            super.run();
            
            boolean done = false;
            Selector selector = null;
            Charset cs = Charset.forName("UTF-8");
            
            try {
                
                System.out.println("Client :: started");
                
                client = SocketChannel.open();
                client.configureBlocking(false);
                client.connect(new InetSocketAddress(host, port));
                
                selector = Selector.open();
                client.register(selector, SelectionKey.OP_READ);
                
                while (!Thread.interrupted() && !abortable.isDone() && !client.finishConnect()) {
                    Thread.sleep(10);
                }
                
                System.out.println("Client :: connected");
                
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                
                while (!Thread.interrupted() && !abortable.isDone() && !done) {
                    
                    selector.select(3000);
                    
                    Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                    while (!Thread.interrupted() && !abortable.isDone() && !done && iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isReadable()) {
                            int len = client.read(buffer);
                            if (len < 0) {
                                System.out.println("Client :: server closed");
                                done = true;
                                break;
                            } else if (len == 0) {
                                continue;
                            }
                            buffer.flip();
                            
                            CharBuffer cb = cs.decode(buffer);
                            
                            System.out.printf("From Server : ");
                            while (cb.hasRemaining()) {
                                System.out.printf("%c", cb.get());
                            }
                            System.out.println();
                            
                            buffer.compact();
                        }
                    }
                }
                
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                
                if (client != null) {
                    try {
                        client.socket().close();
                        client.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                
                System.out.println("Client :: done");
            }
            
        }
    }
}