public class SumUpMessageRecognizer implements MessageRecognizer {
public static final int CLIENT_MODE = 1;
public static final int SERVER_MODE = 2;
private int mode;
public SumUpMessageRecognizer(int mode) {
switch (mode) {
case CLIENT_MODE:
case SERVER_MODE:
this.mode = mode;
break;
default:
throw new IllegalArgumentException("invalid mode: " + mode);
}
}
public Message recognize(ByteBuffer buf) throws MessageParseException {
// return null if message type is not arrived yet.
if (buf.remaining() < Constants.TYPE_LEN)
return null;
int type = buf.getShort();
switch (mode) {
// 如果是server模式,只让它接收ADD消息.
case SERVER_MODE:
switch (type) {
case Constants.ADD:
return new AddMessage();
default:
throw new MessageParseException("unknown type: " + type);
}
//如果是客户端模式,只让它接收RESULT消息.
case CLIENT_MODE:
switch (type) {
case Constants.RESULT:
return new ResultMessage();
default:
throw new MessageParseException("unknown type: " + type);
}
default:
throw new InternalError(); // this cannot happen
}
}
}
实现ADD和RESULT消息
netty_s5.png
public class Client {
private static final String HOSTNAME = "localhost";
private static final int PORT = 8080;
private static final int CONNECT_TIMEOUT = 30; // seconds
private static final int DISPATCHER_THREAD_POOL_SIZE = 4;
public static void main(String[] args) throws Throwable {
// 预备要加总的值。
int[] values = new int[args.length];
for (int i = 0; i < args.length; i++) {
values = Integer.parseInt(args);
}
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();
// 开始缺省数量的I/O工作线程
ioProcessor.start();
// 启动指定数量的event dispatcher线程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE
eventDispatcher.start();
// 准备 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.CLIENT_MODE);
// 准备客户端会话。
Session session = new Session(ioProcessor, new InetSocketAddress(
HOSTNAME, PORT), recognizer, eventDispatcher);
session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);
// 开始会话,并使用ClientSessionListener监听。
ClientSessionListener listener = new ClientSessionListener(values);
session.addSessionListener(listener);
session.start();
// 一直等到加总完成
while ( !listener.isComplete() ) {
Thread.sleep(1000);
}
// 停止 I/O processor 和 event dispatcher
eventDispatcher.stop();
ioProcessor.stop();
}
}
public class ClientSessionListener implements SessionListener {
private final int[] values;
private boolean complete;
public ClientSessionListener(int[] values) {
this.values = values;
}
public boolean isComplete() {
return complete;
}
// 当连接建立好后会调用此方法。
public void connectionEstablished(Session session) {
System.out.println("connected to " + session.getSocketAddress());
// 发送加总请求。
for (int i = 0; i < values.length; i++) {
AddMessage m = new AddMessage();
m.setSequence(i);
m.setValue(values);
session.write(m);
}
}
public void connectionClosed(Session session) {
System.out.println("disconnected from " + session.getSocketAddress());
}
// 当收到server的回应信息时,会调用此方法
public void messageReceived(Session session, Message message) {
System.out.println("RCVD: " + message);