翔子 发表于 2014-10-2 21:54:00

NETTY2-事件驱动的NIO框架(使用范例)

原文: http://www.blogjava.net/lihuaxajh/articles/5278.html

本文将告诉你如何使用Netty2来编一个网络应用程序(包括客户端和服务端)。我会介绍一个简单的SumUp协议,用来对整数求和。通过源代码的一步步讲解,你会了解到Netty2的每个特性。

SumUp 协议
SumUp服务会加总从客户端送来的ADD消息中的所有值,并且为每个ADD消息返回一个RESULT消息。所有消息都是由header和body两部分组成:

netty_s1.png

header包含type和sequence两个字段。type表示消息的类型(0是RESULT消息,1是ADD消息)。sequence用来表示一组对应的ADD和RESULT(也就是说,服务器回应ADD消息时,应在RESULT中使用与ADD一样的sequence值)。

ADD 消息
ADD消息包含了要被求和的值。

netty_s2.png

RESULT消息
RESULT具有不固定长度的消息体。当计算没问题时,body内容是加总的值(4bytes),如果有错误或溢位,则是2bytes. 见下图:

netty_s3.png

netty_s4.png

实现MessageRecognizer
MessageRecognizer从送来的数据中重组出Message对象。这儿我们实现了一个SumUpMessageRecognizer,用于客户端和服务端的信息重组。

public class SumUpMessageRecognizer implements MessageRecognizer {

   public static final int CLIENT_MODE = 1;

   public static final int SERVER_MODE = 2;

   private int mode;

   public SumUpMessageRecognizer(int mode) {
      switch (mode) {
            case CLIENT_MODE:
                  case SERVER_MODE:
               this.mode = mode;
               break;
                     default:
               throw new IllegalArgumentException("invalid mode: " + mode);
      }
   }

   public Message recognize(ByteBuffer buf) throws MessageParseException {
      // return null if message type is not arrived yet.
      if (buf.remaining() < Constants.TYPE_LEN)
             return null;
   
      int type = buf.getShort();
      switch (mode) {
            // 如果是server模式,只让它接收ADD消息.
            case SERVER_MODE:
               switch (type) {
                     case Constants.ADD:
                      return new AddMessage();
                     default:
                        throw new MessageParseException("unknown type: " + type);
               }
            //如果是客户端模式,只让它接收RESULT消息.
            case CLIENT_MODE:
                         switch (type) {
                     case Constants.RESULT:
                        return new ResultMessage();
                     default:
                              throw new MessageParseException("unknown type: " + type);
               }
            default:
               throw new InternalError(); // this cannot happen
      }
   }
}
实现ADD和RESULT消息
netty_s5.png

我们必须实现ADD和RESULT消息: ADD和RESULT。 它们都有公共的header,最好的方式是实现一个AbstractMessage,并且从它继承出Add和Result消息。

源代码:

AbstractMessage.java
AddMessage.java
ResultMessage.java
实现协议处理流程
实现了Messagerecognizer和Message之后,要实现Server和Client是非常容易的事情,通过下面的代码,你会很容易理解如何去实现协议的处理流程。

实现Server
实现服务端两个主要的类,一个是Server类,另一个是ServerSessionListener. Server类负责启动主程序并监听连接。而ServerSessionListener用于处理和发送消息。

public class Server {

private static final int SERVER_PORT = 8080;
private static final int DISPATCHER_THREAD_POOL_SIZE = 16;

public static void main(String[] args) throws Throwable {
// 初始化 I/O processor 和 event dispatcher
IoProcessor ioProcessor = new IoProcessor();
ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();

// 启动缺省数量的I/O工作线程
ioProcessor.start();

// 启动指定数量的event dispatcher 线程
eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE);
eventDispatcher.start();

// 准备 message recognizer
MessageRecognizer recognizer = new SumUpMessageRecognizer(
SumUpMessageRecognizer.SERVER_MODE);

// 准备session监听器,用于处理通讯过程.
ServerSessionListener listener = new ServerSessionListener();

// 开启server socket通道
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(SERVER_PORT));

// 监听连接,并开始通讯
System.out.println("listening on port " + SERVER_PORT);
for (;;) {
// 接受connection
SocketChannel channel = ssc.accept();

// 建立新的session
Session session = new Session(ioProcessor, channel, recognizer, eventDispatcher);

// 添加session监听器
session.addSessionListener(listener);

// 开始通讯
session.start();
}
}
}


public class ServerSessionListener implements SessionListener {

      public ServerSessionListener() {
      }

      public void connectionEstablished(Session session) {
               System.out.println(session.getSocketAddress() + " connected");
               // 设置空闲时间为60秒
               session.getConfig().setIdleTime(60);
               // 设置sum的初始值为0。
               session.setAttachment(new Integer(0));
      }

      public void connectionClosed(Session session) {
               System.out.println(session.getSocketAddress() + " closed");
      }

      // 当收到client发来的消息时,此方法被调用
      public void messageReceived(Session session, Message message) {
               System.out.println(session.getSocketAddress() + " RCVD: " + message);
               // client端只发送AddMessage. 其它情况要另作处理
               // 在这里只是简单的进行类型转换处理
               AddMessage am = (AddMessage) message;
               // 将收到的消息里的值加上当前sum的值.
               int sum = ((Integer) session.getAttachment()).intValue();
               int value = am.getValue();
               long expectedSum = (long) sum + value;
               if (expectedSum > Integer.MAX_VALUE || expectedSum < Integer.MIN_VALUE) {
                     // 如果溢位返回错误消息
                     ResultMessage rm = new ResultMessage();
                     rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。
                     rm.setOk(false);
                     session.write(rm);
               } else {
                     //加总
                     sum = (int) expectedSum;
                     session.setAttachment(new Integer(sum));
                     // 返回结果消息
                     ResultMessage rm = new ResultMessage();
                     rm.setSequence(am.getSequence()); // 从送来的Add消息中得到sequence值。
                     rm.setOk(true);
                     rm.setValue(sum);
                     session.write(rm);
               }
      }



      public void messageSent(Session session, Message message) {
               System.out.println(session.getSocketAddress() + " SENT: " + message);
      }



      public void sessionIdle(Session session) {
               System.out.println(session.getSocketAddress()
                               + " disconnecting the idle");
               // 关闭空闲的会话。
               session.close();
      }

      // 异常发生时,将调用此方法
      public void exceptionCaught(Session session, Throwable cause) {
               System.out.println(Thread.currentThread().getName()
                               + session.getSocketAddress() + " exception:");
               cause.printStackTrace(System.out);

               if (cause instanceof MessageParseException) {
                     // 印出错误信息内容,便于调试
                     MessageParseException mpe = (MessageParseException) cause;
                     ByteBuffer buf = mpe.getBuffer();
                     System.out.println(buf);
                     System.out.print("Buffer Content: ");
                     while (buf.remaining() > 0) {
                               System.out.print(buf.get() & 0xFF);
                               System.out.print(' ');
                     }

                     System.out.println();
               }
               // 关闭会话
               session.close();
      }
}

服务端运行后,其输出的内容示例如下:

listening on port 8080
/127.0.0.1:4753 connected
/127.0.0.1:4753 RCVD: 0:ADD(4)
/127.0.0.1:4753 RCVD: 1:ADD(6)
/127.0.0.1:4753 RCVD: 2:ADD(2)
/127.0.0.1:4753 RCVD: 3:ADD(7)
/127.0.0.1:4753 RCVD: 4:ADD(8)
/127.0.0.1:4753 RCVD: 5:ADD(1)
/127.0.0.1:4753 SENT: 0:RESULT(4)
/127.0.0.1:4753 SENT: 1:RESULT(10)
/127.0.0.1:4753 SENT: 2:RESULT(12)
/127.0.0.1:4753 SENT: 3:RESULT(19)
/127.0.0.1:4753 SENT: 4:RESULT(27)
/127.0.0.1:4753 SENT: 5:RESULT(28)
/127.0.0.1:4753 closed
实现客户端
跟服务端对应,主要由Client和ClientSessionListener组成。

public class Client {      
private static final String HOSTNAME = "localhost";         
private static final int PORT = 8080;         
private static final int CONNECT_TIMEOUT = 30; // seconds         
private static final int DISPATCHER_THREAD_POOL_SIZE = 4;         

public static void main(String[] args) throws Throwable {               
        // 预备要加总的值。               
        int[] values = new int;               
        for (int i = 0; i < args.length; i++) {                     
                values = Integer.parseInt(args);               
        }               
        // 初始化 I/O processor 和 event dispatcher               
        IoProcessor ioProcessor = new IoProcessor();               
        ThreadPooledEventDispatcher eventDispatcher = new OrderedEventDispatcher();               
        // 开始缺省数量的I/O工作线程               
        ioProcessor.start();               
        // 启动指定数量的event dispatcher线程      
        eventDispatcher.setThreadPoolSize(DISPATCHER_THREAD_POOL_SIZE               
        eventDispatcher.start();               
        // 准备 message recognizer               
        MessageRecognizer recognizer = new SumUpMessageRecognizer(                              
        SumUpMessageRecognizer.CLIENT_MODE);               
        // 准备客户端会话。               
        Session session = new Session(ioProcessor, new InetSocketAddress(                              
        HOSTNAME, PORT), recognizer, eventDispatcher);                                             
        session.getConfig().setConnectTimeout(CONNECT_TIMEOUT);                              
        // 开始会话,并使用ClientSessionListener监听。               
        ClientSessionListener listener = new ClientSessionListener(values);               
        session.addSessionListener(listener);               
        session.start();                              
        // 一直等到加总完成               
        while ( !listener.isComplete() ) {                     
                Thread.sleep(1000);               
        }                              
        // 停止 I/O processor 和 event dispatcher               
        eventDispatcher.stop();               
        ioProcessor.stop();      
        }
}
public class ClientSessionListener implements SessionListener {
      private final int[] values;
      private boolean complete;
      public ClientSessionListener(int[] values) {
               this.values = values;
      }
      public boolean isComplete() {
               return complete;
      }
      // 当连接建立好后会调用此方法。
      public void connectionEstablished(Session session) {
               System.out.println("connected to " + session.getSocketAddress());
               // 发送加总请求。
               for (int i = 0; i < values.length; i++) {
                     AddMessage m = new AddMessage();
                     m.setSequence(i);
                     m.setValue(values);
                     session.write(m);
               }
      }
      public void connectionClosed(Session session) {
               System.out.println("disconnected from " + session.getSocketAddress());
      }
      // 当收到server的回应信息时,会调用此方法
      public void messageReceived(Session session, Message message) {
               System.out.println("RCVD: " + message);

               // 服务端只发送ResultMessage. 其它情况下
               // 要通过instanceOf来判断它的类型.
               ResultMessage rm = (ResultMessage) message;
               if (rm.isOk()) {
                     // 如果ResultMessage是OK的.
                     // 根据ResultMessage的sequence值来判断如果,
                     // 一次消息的sequence值,则
                     if (rm.getSequence() == values.length - 1) {
                               // 打印出结果.
                               System.out.println("The sum: " + rm.getValue());
                               // 关闭会话
                               session.close();
                               complete = true;
                     }
               } else {
                     // 如有错误,则打印错误信息,并结束会话.
                     System.out.println("server error, disconnecting...");
                     session.close();
                     complete = true;
               }
      }

      public void messageSent(Session session, Message message) {
                  System.out.println("SENT: " + message);
      }

      public void sessionIdle(Session session) {
      }

         public void exceptionCaught(Session session, Throwable cause) {
               cause.printStackTrace(System.out);
               if (cause instanceof ConnectException) {
                     // 如果连接server失败, 则间隔5秒重试连接.
                     System.out.println("sleeping...");
                     try {
                               Thread.sleep(5000);
                     } catch (InterruptedException e) {
                     }
                     System.out.println("reconnecting... " + session.getSocketAddress());
                     session.start();
               } else {
                     session.close();
               }
      }
}
通过上面的例子,你也许会发现实现一个自定义的协议原来如此简单。你如果用Netty试着去实现自己的smtp或pop协议,我想也不会是一件难事了。

Netty2的首页在http://gleamynode.net/dev/projects/netty2/index.html,你可以在这找到本文的全部源码。
页: [1]
查看完整版本: NETTY2-事件驱动的NIO框架(使用范例)