`
youyu4
  • 浏览: 424900 次
社区版块
存档分类
最新评论

java Netty 之 字符串消息收发(ChannelBuffer)

 
阅读更多

java Netty 之 字符串消息收发(ChannelBuffer)

 

 

ChannelBuffer



 

      Netty中的消息传递,都必须以字节的形式,以ChannelBuffer为载体传递。简单的说,就是你想直接写个字符串过去,对不起,抛异常。虽然,Netty定义的writer的接口参数是Object的,这可能也是会给新上手的朋友容易造成误会的地方。Netty源码中,是这样判断的:

 

SendBuffer acquire(Object message) {
	if (message instanceof ChannelBuffer) {
		return acquire((ChannelBuffer) message);
	} else if (message instanceof FileRegion) {
		return acquire((FileRegion) message);
	}
 
	throw new IllegalArgumentException("unsupported message type: " + message.getClass());
}

 

 

所以,我们要想传递字符串,那么就必须转换成ChannelBuffer。明确了这一点,接下来我们上代码:

 

 

public class MessageServer {
 
	public static void main(String args[]) {
		// Server服务启动器
		ServerBootstrap bootstrap = new ServerBootstrap(
			new NioServerSocketChannelFactory(
			Executors.newCachedThreadPool(),
			Executors.newCachedThreadPool()));

		// 设置一个处理客户端消息和各种消息事件的类(Handler)
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new MessageServerHandler());
			}
		});

		// 开放8000端口供客户端访问。
		bootstrap.bind(new InetSocketAddress(8000));
	}
 
	private static class MessageServerHandler extends SimpleChannelHandler {
 
		/**
		* 用户接受客户端发来的消息,在有客户端消息到达时触发
		*
		*/
		@Override
		public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
			ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
			System.out.println(buffer.toString(Charset.defaultCharset()));
		}
		
	}
}


public class MessageClient {
 
	public static void main(String args[]) {
		// Client服务启动器
		ClientBootstrap bootstrap = new ClientBootstrap(
			new NioClientSocketChannelFactory(
			Executors.newCachedThreadPool(),
			Executors.newCachedThreadPool()));
	
		// 设置一个处理服务端消息和各种消息事件的类(Handler)
		bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
			@Override
			public ChannelPipeline getPipeline() throws Exception {
				return Channels.pipeline(new MessageClientHandler());
			}
		});

		// 连接到本地的8000端口的服务端
		bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000));
	}
 
	private static class MessageClientHandler extends SimpleChannelHandler {
 
		/**
		* 当绑定到服务端的时候触发,给服务端发消息。
		*/
		@Override
		public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
			
			// 将字符串,构造成ChannelBuffer,传递给服务端
			String msg = "Hello, I'm client.";
			ChannelBuffer buffer = ChannelBuffers.buffer(msg.length());
			buffer.writeBytes(msg.getBytes());
			e.getChannel().write(buffer);
		}
	}
}
 

 

       与 前面“Hello World” 样例代码不同的是,客户端在channel连通后,不是在本地打印,而是将消息转换成ChannelBuffer传递给服务端,服务端接受到ChannelBuffer后,解码成字符串打印出来。

 

 

 

 

数据的读和写

 

      在TCP/IP这种基于流传递的协议中。他识别的不是你每一次发送来的消息,不是分包的。而是,只认识一个整体的流,即使分三次分别发送三段话:ABC、DEF、GHI。在传递的过程中,他就是一个具有整体长度的流。在读流的过程中,如果我一次读取的长度选择的不是三个,我可以收到类似AB、CDEFG、H、I这样的信息。这显然是我们不想看到的。所以说,在你写的消息收发的系统里,需要预先定义好这种解析机制,规定每帧(次)读取的长度。通过代码来理解一下:

 

 

public class ServerBufferHandler extends SimpleChannelHandler {
 
	/**
	* 用户接受客户端发来的消息,在有客户端消息到达时触发
	*
	*/
	@Override
	public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
		
		ChannelBuffer buffer = (ChannelBuffer) e.getMessage();

		// 五位读取
		while (buffer.readableBytes() >= 5) {
			ChannelBuffer tempBuffer = buffer.readBytes(5);
			System.out.println(tempBuffer.toString(Charset.defaultCharset()));
		}

		// 读取剩下的信息
		System.out.println(buffer.toString(Charset.defaultCharset()));
	}
}
 

 

 

public class ClientBufferHandler extends SimpleChannelHandler {
 
	/**
	* 当绑定到服务端的时候触发,给服务端发消息。
	*
	*/
	@Override
	public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {

		// 分段发送信息
		sendMessageByFrame(e);
	}
 
	/**
	* 将<b>"Hello, I'm client."</b>分成三份发送。 
	* Hello, 
	* I'm
	* client.
	*
	* @param e	Netty事件
	*/
	private void sendMessageByFrame(ChannelStateEvent e) {
		String msgOne = "Hello, ";
		String msgTwo = "I'm ";
		String msgThree = "client.";
		e.getChannel().write(tranStr2Buffer(msgOne));
		e.getChannel().write(tranStr2Buffer(msgTwo));
		e.getChannel().write(tranStr2Buffer(msgThree));
	}
 
	/**
	* 将字符串转换成{@link ChannelBuffer},私有方法不进行字符串的非空判断。
	*
	* @param str	待转换字符串,要求非null
	*            
	* @return 转换后的ChannelBuffer
	*/
	private ChannelBuffer tranStr2Buffer(String str) {
		ChannelBuffer buffer = ChannelBuffers.buffer(str.length());
		buffer.writeBytes(str.getBytes());
		return buffer;
	}
}
 

 

输出结果:

Hello

, I'm

 clie

nt.

 

 

注意:

       这里其实,服务端是否分段发送并不会影响输出结果,也就是说,你一次性的把"Hi, I'm client."这段信息发送过来,输出的结果也是一样的。这就是开头说的,传输的是流,不分包。而只在于你如何分段读写。

 

 

 

 

ChannelBuffer的结构

 

ChannelBuffer是Netty中比较常用的一个类,其功能类似于字符数组,可以对其进行读写操作。

 

ChannelBuffer的模型图如下:



 

 

如上图所示,一个ChannelBuffer被划分为三个部分:

 

  • discardable:表示已经读过的内容缓冲区
  • readable:表示可读的内容缓冲区
  • writable:表示可写的内容缓冲区

 

 

ChannelBuffer的这三个缓冲区由2个内部控制指针来控制:

 

  • readerIndex:控制读缓冲区首地址
  • writerIndex:控制写缓冲区首地址

 

因此,ChannelBuffer提供的大部分操作都是围绕readerIndex和writerIndex来进行的。

 

 

 

 

ChannelBuffer的常用方法

 

1、

 

read()/skip()

从readerIndex读出或跳过指定数目的字节,同时readerIndex = readerIndex + byteNumber.如果readerIndex > capacity,表示读取下标越界,会抛出IndexOutOfBoundsException异常

 

 

readable():boolean

如果buffer有可读内容(此时writerIndex > readerIndex),则返回true,否则返回false

 

 

readerIndex():int

返回readerIndex

 

readableBytes():int

返回可读的字节数目(writerIndex - readerIndex)

 

 

2、

 

write();

写入指定数目的字节,同时writerIndex = writerIndex + byteNumber. 如果writerIndex > capacity,表示写入下标越界,会抛出IndexOutOfBoundsException异常

 

 

writable():boolean

如果buffer有可写入空间(此时writerIndex < capacity),则返回true,否则返回false。

 

 

writerIndex(): int

返回writerIndex

 

 

writeableIndex():int

返回可写入的字节数(capacity - writerIndex)。

 

 

3、

 

discardReadBytes()

丢弃已读的内容。其执行过程如下:



 

 

4、

 

clear()

丢弃所有的数据,并将readerIndex和writerIndex重置为0。



 

 

5、

 

markReaderIndex()

markWriterIndex()

 

保存readerIndex或writerIndex的状态

 

 

6、

 

resetReaderIndex()

resetWriterIndex()

 

重置readerIndex或writerIndex为最后一次保存的状态,如果没有保存过,则置为0

 

 

7、

 

duplicate()

slice()

 

拷贝和源目标共享buffer的数据区域,但是拷贝有自己的readerIndex和writerIndex以及markIndex,实际上只是拷贝了控制指针,数据区还是与源buffer共享。

 

 

8、

 

copy()

拷贝整个buffer,包括控制指针和数据区

 

 

 

测试代码:

 

 

package test;

import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;

public class ChannelBufferTest {

    public static void main( String[] args ) {
        // TODO Auto-generated method stub
        ChannelBuffer buffer = ChannelBuffers.buffer( 10 );
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.writeInt( 10 );
        System.out.println("after write one integer");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));

        buffer.writeInt( 10 );
        System.out.println("after write two integer");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        int i = buffer.readInt( );
        System.out.println("after read one integer: " + i);
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.discardReadBytes( );
        System.out.println("after discard read bytes");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.resetReaderIndex( );
        System.out.println("after reset reader index");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
        
        buffer.resetWriterIndex( );
        System.out.println("after reset writer index");
        System.out.println("readable bytes: " + buffer.readableBytes( ));
        System.out.println("readable index: " + buffer.readerIndex( ));
        System.out.println("writable bytes: " + buffer.writableBytes( ));
        System.out.println("writable index: " + buffer.writerIndex( ));
    }
}
 

 

 

结果:

 

readable bytes: 0

readable index: 0

writable bytes: 10

writable index: 0

after write one integer

readable bytes: 4

readable index: 0

writable bytes: 6

writable index: 4

after write two integer

readable bytes: 8

readable index: 0

writable bytes: 2

writable index: 8

after read one integer: 10

readable bytes: 4

readable index: 4

writable bytes: 2

writable index: 8

after discard read bytes

readable bytes: 4

readable index: 0

writable bytes: 6

writable index: 4

after reset reader index

readable bytes: 4

readable index: 0

writable bytes: 6

writable index: 4

after reset writer index

readable bytes: 0

readable index: 0

writable bytes: 10

writable index: 0

  • 大小: 9.3 KB
  • 大小: 6.8 KB
  • 大小: 17.5 KB
  • 大小: 15.6 KB
分享到:
评论
1 楼 liaodongdakai 2019-05-20  
Java读源码之Netty深入剖析
网盘地址:https://pan.baidu.com/s/1UZqs2ViSURgY_DEM8Kh8Uw
提取码:6ko0

相关推荐

Global site tag (gtag.js) - Google Analytics