java Netty 之 字符串消息收发(ChannelBuffer)
ChannelBuffer
Netty中的消息传递,都必须以字节的形式,以ChannelBuffer为载体传递。简单的说,就是你想直接写个字符串过去,对不起,抛异常。虽然,Netty定义的writer的接口参数是Object的,这可能也是会给新上手的朋友容易造成误会的地方。Netty源码中,是这样判断的:
SendBuffer acquire(Object message) { if (message instanceof ChannelBuffer) { return acquire((ChannelBuffer) message); } else if (message instanceof FileRegion) { return acquire((FileRegion) message); } throw new IllegalArgumentException("unsupported message type: " + message.getClass()); }
所以,我们要想传递字符串,那么就必须转换成ChannelBuffer。明确了这一点,接下来我们上代码:
public class MessageServer { public static void main(String args[]) { // Server服务启动器 ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理客户端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new MessageServerHandler()); } }); // 开放8000端口供客户端访问。 bootstrap.bind(new InetSocketAddress(8000)); } private static class MessageServerHandler extends SimpleChannelHandler { /** * 用户接受客户端发来的消息,在有客户端消息到达时触发 * */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); System.out.println(buffer.toString(Charset.defaultCharset())); } } } public class MessageClient { public static void main(String args[]) { // Client服务启动器 ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // 设置一个处理服务端消息和各种消息事件的类(Handler) bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new MessageClientHandler()); } }); // 连接到本地的8000端口的服务端 bootstrap.connect(new InetSocketAddress("127.0.0.1", 8000)); } private static class MessageClientHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 将字符串,构造成ChannelBuffer,传递给服务端 String msg = "Hello, I'm client."; ChannelBuffer buffer = ChannelBuffers.buffer(msg.length()); buffer.writeBytes(msg.getBytes()); e.getChannel().write(buffer); } } }
与 前面“Hello World” 样例代码不同的是,客户端在channel连通后,不是在本地打印,而是将消息转换成ChannelBuffer传递给服务端,服务端接受到ChannelBuffer后,解码成字符串打印出来。
数据的读和写
在TCP/IP这种基于流传递的协议中。他识别的不是你每一次发送来的消息,不是分包的。而是,只认识一个整体的流,即使分三次分别发送三段话:ABC、DEF、GHI。在传递的过程中,他就是一个具有整体长度的流。在读流的过程中,如果我一次读取的长度选择的不是三个,我可以收到类似AB、CDEFG、H、I这样的信息。这显然是我们不想看到的。所以说,在你写的消息收发的系统里,需要预先定义好这种解析机制,规定每帧(次)读取的长度。通过代码来理解一下:
public class ServerBufferHandler extends SimpleChannelHandler { /** * 用户接受客户端发来的消息,在有客户端消息到达时触发 * */ @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); // 五位读取 while (buffer.readableBytes() >= 5) { ChannelBuffer tempBuffer = buffer.readBytes(5); System.out.println(tempBuffer.toString(Charset.defaultCharset())); } // 读取剩下的信息 System.out.println(buffer.toString(Charset.defaultCharset())); } }
public class ClientBufferHandler extends SimpleChannelHandler { /** * 当绑定到服务端的时候触发,给服务端发消息。 * */ @Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) { // 分段发送信息 sendMessageByFrame(e); } /** * 将<b>"Hello, I'm client."</b>分成三份发送。 * Hello, * I'm * client. * * @param e Netty事件 */ private void sendMessageByFrame(ChannelStateEvent e) { String msgOne = "Hello, "; String msgTwo = "I'm "; String msgThree = "client."; e.getChannel().write(tranStr2Buffer(msgOne)); e.getChannel().write(tranStr2Buffer(msgTwo)); e.getChannel().write(tranStr2Buffer(msgThree)); } /** * 将字符串转换成{@link ChannelBuffer},私有方法不进行字符串的非空判断。 * * @param str 待转换字符串,要求非null * * @return 转换后的ChannelBuffer */ private ChannelBuffer tranStr2Buffer(String str) { ChannelBuffer buffer = ChannelBuffers.buffer(str.length()); buffer.writeBytes(str.getBytes()); return buffer; } }
输出结果:
Hello
, I'm
clie
nt.
注意:
这里其实,服务端是否分段发送并不会影响输出结果,也就是说,你一次性的把"Hi, I'm client."这段信息发送过来,输出的结果也是一样的。这就是开头说的,传输的是流,不分包。而只在于你如何分段读写。
ChannelBuffer的结构
ChannelBuffer是Netty中比较常用的一个类,其功能类似于字符数组,可以对其进行读写操作。
ChannelBuffer的模型图如下:
如上图所示,一个ChannelBuffer被划分为三个部分:
- discardable:表示已经读过的内容缓冲区
- readable:表示可读的内容缓冲区
- writable:表示可写的内容缓冲区
ChannelBuffer的这三个缓冲区由2个内部控制指针来控制:
- readerIndex:控制读缓冲区首地址
- writerIndex:控制写缓冲区首地址
因此,ChannelBuffer提供的大部分操作都是围绕readerIndex和writerIndex来进行的。
ChannelBuffer的常用方法
1、
read()/skip()
从readerIndex读出或跳过指定数目的字节,同时readerIndex = readerIndex + byteNumber.如果readerIndex > capacity,表示读取下标越界,会抛出IndexOutOfBoundsException异常
readable():boolean
如果buffer有可读内容(此时writerIndex > readerIndex),则返回true,否则返回false
readerIndex():int
返回readerIndex
readableBytes():int
返回可读的字节数目(writerIndex - readerIndex)
2、
write();
写入指定数目的字节,同时writerIndex = writerIndex + byteNumber. 如果writerIndex > capacity,表示写入下标越界,会抛出IndexOutOfBoundsException异常
writable():boolean
如果buffer有可写入空间(此时writerIndex < capacity),则返回true,否则返回false。
writerIndex(): int
返回writerIndex
writeableIndex():int
返回可写入的字节数(capacity - writerIndex)。
3、
discardReadBytes()
丢弃已读的内容。其执行过程如下:
4、
clear()
丢弃所有的数据,并将readerIndex和writerIndex重置为0。
5、
markReaderIndex()
markWriterIndex()
保存readerIndex或writerIndex的状态
6、
resetReaderIndex()
resetWriterIndex()
重置readerIndex或writerIndex为最后一次保存的状态,如果没有保存过,则置为0
7、
duplicate()
slice()
拷贝和源目标共享buffer的数据区域,但是拷贝有自己的readerIndex和writerIndex以及markIndex,实际上只是拷贝了控制指针,数据区还是与源buffer共享。
8、
copy()
拷贝整个buffer,包括控制指针和数据区
测试代码:
package test; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; public class ChannelBufferTest { public static void main( String[] args ) { // TODO Auto-generated method stub ChannelBuffer buffer = ChannelBuffers.buffer( 10 ); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.writeInt( 10 ); System.out.println("after write one integer"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.writeInt( 10 ); System.out.println("after write two integer"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); int i = buffer.readInt( ); System.out.println("after read one integer: " + i); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.discardReadBytes( ); System.out.println("after discard read bytes"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.resetReaderIndex( ); System.out.println("after reset reader index"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); buffer.resetWriterIndex( ); System.out.println("after reset writer index"); System.out.println("readable bytes: " + buffer.readableBytes( )); System.out.println("readable index: " + buffer.readerIndex( )); System.out.println("writable bytes: " + buffer.writableBytes( )); System.out.println("writable index: " + buffer.writerIndex( )); } }
结果:
readable bytes: 0
readable index: 0
writable bytes: 10
writable index: 0
after write one integer
readable bytes: 4
readable index: 0
writable bytes: 6
writable index: 4
after write two integer
readable bytes: 8
readable index: 0
writable bytes: 2
writable index: 8
after read one integer: 10
readable bytes: 4
readable index: 4
writable bytes: 2
writable index: 8
after discard read bytes
readable bytes: 4
readable index: 0
writable bytes: 6
writable index: 4
after reset reader index
readable bytes: 4
readable index: 0
writable bytes: 6
writable index: 4
after reset writer index
readable bytes: 0
readable index: 0
writable bytes: 10
writable index: 0
相关推荐
通过Netty4 获取串口数据并且下发数据到串口,是一个封装不错的框架
java netty接收串口数据 开启windows串口工具 发送串口数据调试助手
java实现基于netty 的utp字节数据接收服务,服务具体实现代码。样例java实现基于netty 的utp字节数据接收服务,服务具体实现代码。样例
java netty权威指南完整版带目录java netty权威指南完整版带目录java netty权威指南完整版带目录java netty权威指南完整版带目录java netty权威指南完整版带目录
实现Java服务端和C#客户端联通 Java使用Netty 开发环境为IDEA C#使用DotNetty 开发环境为VS2017 运行时先开启Java服务端 再开启客户端
基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于 Java Netty实现的可用于内网穿透的代理工具.zip基于...
JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA...
SCANFISH-II 型声呐系统数据接口协议,对接tcp转发app,json封装
基于tcp通讯,涉及java的netty服务器的推送功能和c++socket的封装以及protobuf在java和c++中的使用。
java netty需要的包。。。。。。。。。。。。java netty需要的包,netty-3.2.10.Final.jar一个就行
JAVA netty完整示例代码。里面包括整个项目和所需的JAR包。示例以:TCP/IP自定义报文协议进行解析分析,基于帧头HEAD_DATA=0x76解析过程的示例代码,并对数据进行粘包分离的处理。粘包处理方式有两种:1.自定义报文...
做Java开发,现在很多场合需要分布式应用,很多通信框架的底层实现都包含Netty技术,为了更好了解,值得有兴趣的人研究一下
高性能的网络服务器,Java 开发学习进阶
根据给定的消息协议,自己定义一个消息类,编写好服务端与客户端,自定义好编解码器,在客户端发送此消息,服务端获取消息并向客户端发送同样格式的消息;导入Eclipse maven项目运行即可跑通。
JAVA采用Netty库实现基于以DTU传输的TCP服务器 ,可以支持多端口通讯 ,同时也支持 多协议解析
Java + Netty 实现的高并发高可用MQTT服务broker,轻松支持10万并发,已用于生产环境 技术体系:(使用 netty 实现通信及协议解析,使用 nutzboot 提供依赖注入及属性配置,使用 redis 实现消息缓存,集群,使用 ...
刚学netty ,写了一个基于netty的服务器客户端收发消息代码,功能非常简单,服务器每3秒向服务器发消息,服务器再把消息反给你。简单收1分,希望大家谅解。
在java中,netty通信业务。代码实现
java应用netty服务端和客户端示例,客户端和服务端的model对象目录必须一致
java Netty MMO 回合制网络游戏;基于 ioGame 网络编程框架开发的 MMO 类型的回合制网络游戏项目.zip java Netty MMO 回合制网络游戏;基于 ioGame 网络编程框架开发的 MMO 类型的回合制网络游戏项目.zip java Netty...