예제는 SocketChannel 을 이용한 non-blocking 서버와 클라이언트 입니다.
서버는 여러개의 클라이언트 접속을 처리합니다.
클라이언트는 서버에 메시지를 전달할 수 있고 서버는 전달받은 메시지를 화면에 출력합니다.
실행 예)
- 서버측 -
Server :: started
Server :: waiting for accept
Server :: accepted - client [java.nio.channels.SocketChannel[connected local=/127.0.0.1:9891 remote=/127.0.0.1:61334]]
Client :: started
From Client : Hello
From Client : Hello World
- 클라이언트 측 -
Client :: started
Client :: connected
From Server : Welcome
Hello
[write :: text : Hello / len : 5]
Hello World
[write :: text : Hello World / len : 11]
폴더 구조:
소스 코드:
Abortable.java : 간단하게 종료를 나타내는 클래스를 만들었습니다.
package com.tcpsample;
public class Abortable {
public boolean done = false;
public Abortable() {
init();
}
public void init() {
done = false;
}
public boolean isDone() {
return done;
}
}
Server.java
- ServerThread 클래스는 accept 처리하는 스레드입니다.
- ClientHandlerThread 클래스는 접속한 client 를 처리하는 스레드입니다.
package com.tcpsample;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
/**
*
*/
public class Server {
public static final int PORT_NUMBER = 9891;
private Abortable abortable;
private ServerThread serverThread;
/**
*
*/
public Server() {
abortable = new Abortable();
}
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Server server = new Server();
server.start();
Thread.sleep(500);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
reader.readLine();
server.stop();
System.out.println("BYE");
}
/**
* start server
*/
public void start() {
abortable.init();
if (serverThread == null || !serverThread.isAlive()) {
serverThread = new ServerThread(abortable);
serverThread.start();
}
}
/**
* stop server
*/
public void stop() {
abortable.done = true;
if (serverThread != null && serverThread.isAlive()) {
serverThread.interrupt();
}
}
/**
*
*/
public class ServerThread extends Thread {
private Abortable abortable;
private List<Thread> clientList = new ArrayList<Thread>();
public ServerThread(Abortable abortable) {
this.abortable = abortable;
}
@Override
public void run() {
super.run();
ServerSocketChannel server = null;
Selector selector = null;
try {
System.out.println("Server :: started");
server = ServerSocketChannel.open();
server.socket().bind(new InetSocketAddress("", PORT_NUMBER));
server.configureBlocking(false);
selector = Selector.open();
server.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server :: waiting for accept");
while (!Thread.interrupted() && !abortable.isDone()) {
selector.select(3000);
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
SocketChannel client = server.accept();
if (client != null) {
System.out.printf("Server :: accepted - client [%s]\n", client);
Thread t = new ClientHandlerThread(abortable, client);
t.start();
clientList.add(t);
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
for (Thread t : clientList) {
if (t != null && t.isAlive())
t.interrupt();
try {
t.join(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (server != null) {
try {
server.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Server :: done");
}
}
}
/**
*
*/
public class ClientHandlerThread extends Thread {
private Abortable abortable;
private SocketChannel client;
public ClientHandlerThread(Abortable abortable, SocketChannel client) {
this.abortable = abortable;
this.client = client;
}
@Override
public void run() {
super.run();
Selector selector = null;
Charset cs = Charset.forName("UTF-8");
boolean done = false;
try {
System.out.println("Client :: started");
client.configureBlocking(false);
selector = Selector.open();
client.register(selector, SelectionKey.OP_READ);
// send welcome message
client.write(ByteBuffer.wrap(new String("Welcome").getBytes()));
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (!Thread.interrupted() && !abortable.isDone() && !done) {
selector.select(3000);
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (!abortable.isDone() && iter.hasNext() && !done) {
SelectionKey key = iter.next();
if (key.isReadable()) {
int len = client.read(buffer);
if (len < 0) {
done = true;
break;
} else if (len == 0) {
continue;
}
buffer.flip();
CharBuffer cb = cs.decode(buffer);
System.out.printf("From Client : ");
while (cb.hasRemaining()) {
System.out.printf("%c", cb.get());
}
System.out.println();
buffer.compact();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
try {
client.socket().close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Client :: bye");
}
}
}
}
Client.java
- ClientThread 클래스는 서버에 접속하고 통신하는 스레드입니다.
package com.tcpsample;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
*
*/
public class Client {
private Abortable abortable = new Abortable();
private ClientThread clientThread;
/**
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Client client = new Client();
client.start("127.0.0.1", Server.PORT_NUMBER);
Thread.sleep(500);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
while (true) {
String line = reader.readLine();
if (line.equals("quit"))
break;
try {
client.sayToServer(line);
} catch (Exception e) {
e.printStackTrace();
break;
}
}
client.stop();
System.out.println("BYE");
}
/**
* start client
*
* @param host
* @param port
*/
public void start(String host, int port) {
abortable.init();
if (clientThread == null || !clientThread.isAlive()) {
clientThread = new ClientThread(abortable, host, port);
clientThread.start();
}
}
/**
* stop client
*/
public void stop() {
abortable.done = true;
if (clientThread != null && clientThread.isAlive()) {
clientThread.interrupt();
}
}
/**
*
* @param text
* @throws IOException
*/
public void sayToServer(String text) throws IOException {
clientThread.sayToServer(text);
}
/**
*
*/
public class ClientThread extends Thread {
private Abortable abortable;
private String host;
private int port;
private SocketChannel client;
/**
*
* @param abortable
* @param host
* @param port
*/
public ClientThread(Abortable abortable, String host, int port) {
this.abortable = abortable;
this.host = host;
this.port = port;
}
/**
*
* @param text
* @throws IOException
*/
public void sayToServer(String text) throws IOException {
int len = client.write(ByteBuffer.wrap(text.getBytes()));
System.out.printf("[write :: text : %s / len : %d]\n", text, len);
}
@Override
public void run() {
super.run();
boolean done = false;
Selector selector = null;
Charset cs = Charset.forName("UTF-8");
try {
System.out.println("Client :: started");
client = SocketChannel.open();
client.configureBlocking(false);
client.connect(new InetSocketAddress(host, port));
selector = Selector.open();
client.register(selector, SelectionKey.OP_READ);
while (!Thread.interrupted() && !abortable.isDone() && !client.finishConnect()) {
Thread.sleep(10);
}
System.out.println("Client :: connected");
ByteBuffer buffer = ByteBuffer.allocate(1024);
while (!Thread.interrupted() && !abortable.isDone() && !done) {
selector.select(3000);
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (!Thread.interrupted() && !abortable.isDone() && !done && iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isReadable()) {
int len = client.read(buffer);
if (len < 0) {
System.out.println("Client :: server closed");
done = true;
break;
} else if (len == 0) {
continue;
}
buffer.flip();
CharBuffer cb = cs.decode(buffer);
System.out.printf("From Server : ");
while (cb.hasRemaining()) {
System.out.printf("%c", cb.get());
}
System.out.println();
buffer.compact();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (client != null) {
try {
client.socket().close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
System.out.println("Client :: done");
}
}
}
}