NIO学习笔记
在IO流中,大致可以分为普通IO(阻塞式IO)、NIO(非阻塞式IO)、AIO(异步IO)三种。
普通IO就是我们通常说的IO流,按照流对象可以分为字节流和字符流,按照的流的方向可以分为输入流和输出流。这里只是按照这两种方式进行划分,对于IO流还可以划分的更加的详细,这里就不加多说。
下面这个表格有助于我们更好的了解IO流的结构:
字节流 | 字符流 | |
---|---|---|
输入流 | InputStream | Reader |
输出流 | OutputStream | Writer |
字节流与字符流之间的桥梁:InputStreamReader
和OutputStreamWriter
针对普通IO就说这么多,下面开始学习NIO
NIO:(New IO)也可以理解为NON BOLOCKING IO,即非阻塞式IO
先看一个表格,再来更好的学习NIO
IO | NIO | AIO | |
---|---|---|---|
面向对象 | 面向流 | 面向缓冲区 | |
阻塞/非阻塞 | 阻塞 | 非阻塞 | |
同/异步 | 同步 | 同步 | 异步 |
单/双向 | 单向 | 双向 |
1. Buffer_缓冲区
- 重要属性
- position:当前操作的位置,默认0
- limit:极限,默认缓冲区的大小,只能操作limit之前的数据
- capacity:容量,缓冲区的大小
- mark:标记,用来记录position位置,调用mark方法时position回到标记的位置
- 重要方法
- allocate(int capacity) :初始化大小
- flip():切换模式,从写模式转换为读模式,调用该方法后,limit = position,position = 0。
- rewind():重复读,limit不变,positon = 0。
- clear():清空缓冲区,但是数据未进行清空,只不过所有的指针都指向了最初的位置。
- mark():记录position位置,mark = position。
- reset():将positon = mark
- 模型
- 初始化,假设初始化大小为1024,
ByteBuffer.allocate(1024)
- 存入数据,
buf.put("abcd".getBytes())
- 读取数据前调用
flip()
方法
- 读取数据
- 调用
rewind
方法
- 调用
clear
方法
- 初始化,假设初始化大小为1024,
- 代码
package buffer;
import org.junit.Test;
import java.nio.ByteBuffer;
/** * @ClassName BufferTest * @Description: TODO * @Author Bobo * @Mail tqb820965236@163.com * @Date 2019/7/23 11:28 * @Version v1.0 */
public class BufferTest {
@Test
public void test1(){
ByteBuffer buf = ByteBuffer.allocate(1024);
System.out.println("allocate.............");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
buf.put("adcd".getBytes());
System.out.println("put...........");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
buf.flip();
System.out.println("flip.......");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
byte[] by = new byte[buf.limit()];
buf.get(by);
System.out.println(new String(by, 0, by.length));
System.out.println("get.........");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
buf.rewind();
System.out.println("rewind.........");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
byte[] by1 = new byte[buf.limit()];
buf.get(by1);
System.out.println(new String(by1, 0, by1.length));
System.out.println("rewind...get.........");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
buf.clear();
System.out.println("clear.........");
System.out.println(buf.position());
System.out.println(buf.limit());
System.out.println(buf.capacity());
}
/** * @Author Bobo * @Description 进行mark属性与reset方法的测试 * @Date 2019/7/23 12:48 * @Param [] * @return void */
@Test
public void test2(){
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("abcd".getBytes());
// 读取数据
buffer.flip();
// 进行标记,记录position当前位置
buffer.mark();
byte[] by1 = new byte[buffer.limit()];
buffer.get(by1);
System.out.println(new String(by1, 0, by1.length ));
// position位置进行重置 position = mark
buffer.reset();
byte[] by2 = new byte[buffer.limit()];
buffer.get(by2);
System.out.println(new String(by2, 0, by2.length ));
}
}
2. 直接缓冲区与非直接缓冲区
- 非直接缓冲区:在上文我们讲的缓冲区,其实就是非直接缓冲区,我们是通过调用allocate方法进行创建的。那什么是非直接缓冲区呢?在进行我们上边的操作时,我们其实并没有对内存进行直接的操作,我们对内存的操作是间接的。那当我们将一个流写入到磁盘文件中,过程是怎么样的呢?首先我们会在JVM的虚拟内存中开辟一个缓冲区的空间,正如我们所说,它内部是采用数组实现的,在堆中开辟我们需要的空间。我们写入的数据首先会写入到用户地址空间中,内核地址空间再从用户地址空间进行copy数据,接着磁盘才从内核地址空间进行读取储存。这样没有直接对物理内存进行操作的缓冲区称为非直接缓冲区。我们看下面这个图和调用allocate方法的源码实现方式:
源码:public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); } -------------------------------分隔符------------------------------------------------------ HeapByteBuffer(int cap, int lim) { super(-1, 0, lim, cap, new byte[cap], 0); }
- 直接缓冲区:针对上边的解释,我们可能已经知道了什么是直接缓冲区了,其实就是可以直接对内存进行操作,减少了不必要的copy操作,通过物理内存映射文件直接将应用程序和磁盘联系了起来。我们知道,在内存中操作数据的效率是非常高的,但是,直接对内存进行操作也带了一些不可控的问题,如:内存空间消耗过大,不安全等问题。在Buffer中,Java也提供了这种高效率的缓冲区,通过调用
allocateDirect()
方法就可创建直接缓冲区。这里我们也看一下该方法的源码是如何实现的。public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity); } ----------------------------------------------------------------------------- DirectByteBuffer(int cap) { // package-private super(-1, 0, cap, cap); boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0)); Bits.reserveMemory(size, cap); long base = 0; try { base = unsafe.allocateMemory(size); } catch (OutOfMemoryError x) { Bits.unreserveMemory(size, cap); throw x; } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary address = base + ps - (base & (ps - 1)); } else { address = base; } cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); att = null; }
3. 通道Channel
- 什么是通道:用于 I/O 操作的连接。 通道表示到实体,如硬件设备、文件、网络套接字或可以执行一个或多个不同 I/O 操作(如读取或写入)的程序组件的开放的连接。 通道可处于打开或关闭状态。创建通道时它处于打开状态,一旦将其关闭,则保持关闭状态。一旦关闭了某个通道,试图对其调用 I/O 操作就会导致 ClosedChannelException 被抛出。通过调用通道的 isOpen 方法可测试通道是否处于打开状态。(Java文档)
Channel本质上就是一个连接通道(连接),用于目标节点与源节点之间的连接,负责缓冲区中数据的传输,Channel是不存储数据的,Buffer才存储,因此使用Channel时需要Buffer! - 通道的实现类:
- FileChannel:文件通道
- SocketChannel:TCP传输的客户端
- ServerSocketChannel:TCP传输的服务端
- DatagramChannel:UDP传输
- 通道的获取:
- 对支持通道的类提供了getChannel()方法
- FileInputStream/FileOutputStream
- RandomAccessFile
- Socket
- ServerSocket
- DatagramSocket
- 对通道提供了open方法,用于获取通道对象(JDK1.7 NIO.2)
- Files类的newByteChannel()方法(JDK1.7 NIO.2)
- 对支持通道的类提供了getChannel()方法
- 使用通道的不同方式实现文件的复制
- 使用非直接缓冲区(
getChannel()
方法)package channel; import org.junit.Test; import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; /** * @ClassName ChannelDemo1 * @Description: 使用getChannel获取通道 * @Author Bobo * @Mail tqb820965236@163.com * @Date 2019/7/23 15:27 * @Version v1.0 */ public class ChannelDemo1 { @Test public void test1() throws Exception{ FileInputStream fis = new FileInputStream("1.jpg"); FileOutputStream fos = new FileOutputStream("2.jpg"); FileChannel inChannel = fis.getChannel(); FileChannel outChannel = fos.getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024); while (inChannel.read(buffer) != -1){ buffer.flip();// 转换为读模式 outChannel.write(buffer); buffer.clear();// 清空缓冲区 } inChannel.close(); outChannel.close(); fis.close(); fos.close(); } }
- 使用
open()
获取通道,且通过直接缓冲区进行操作package channel; import org.junit.Test; import java.io.FileInputStream; import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; /** * @ClassName ChannelDemo1 * @Description: 使用open获取通道,且通过直接缓冲区进行操作 * @Author Bobo * @Mail tqb820965236@163.com * @Date 2019/7/23 15:27 * @Version v1.0 */ public class ChannelDemo2 { @Test public void test1() throws Exception{ FileChannel in = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); FileChannel out = FileChannel.open(Paths.get("3.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 创建内存映射文件的类对象 MappedByteBuffer map_IN = in.map(FileChannel.MapMode.READ_ONLY, 0, in.size()); MappedByteBuffer map_OUT = out.map(FileChannel.MapMode.READ_WRITE, 0, in.size()); // 创建字节数组 byte[] buf = new byte[map_IN.limit()]; // 调用get、put操作进行数据的传输 map_IN.get(buf); map_OUT.put(buf); in.close(); out.close(); } }
- 使用
open()
获取通道,且通过直接缓冲区进行操作(transferTo()
方法)package channel; import org.junit.Test; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; /** * @ClassName ChannelDemo1 * @Description: 使用open获取通道,且通过直接缓冲区进行操作(transferTo方法) * @Author Bobo * @Mail tqb820965236@163.com * @Date 2019/7/23 15:27 * @Version v1.0 */ public class ChannelDemo3 { @Test public void test1() throws Exception{ FileChannel in = FileChannel.open(Paths.get("1.jpg"), StandardOpenOption.READ); FileChannel out = FileChannel.open(Paths.get("4.jpg"), StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); // 此方法依旧使用的是直接缓冲区 in.transferTo(0, in.size(), out); in.close(); out.close(); } }
- 使用非直接缓冲区(
4. 分散读取与聚集写入
- 分散读取:从单个通道Channel中读取数据时,采用多个Buffer,即Buffer数组进行数据的读取工作。
- 聚集写入:将多个Buffer的内容写入单个Channel中。
示例:
package gatherscatter;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
/** * @ClassName Demo * @Description: 分散读取与聚集写入 * @Author 田清波 * @Mail tqb820965236@163.com * @Date 2019/7/25 11:04 * @Version v1.0 */
public class Demo {
/** * 分散读取与聚集写入 */
@Test
public void scatherAndgather() {
RandomAccessFile ras = null;
try {
ras = new RandomAccessFile("1.txt", "rw");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
FileChannel channel = ras.getChannel();
ByteBuffer buffer1 = ByteBuffer.allocate(100);
ByteBuffer buffer2 = ByteBuffer.allocate(1024);
ByteBuffer[] buf = {buffer1, buffer2};
try {
channel.read(buf);
} catch (IOException e) {
e.printStackTrace();
}
// 转换模式
buffer1.flip();
buffer2.flip();
System.out.println(new String(buffer1.array(), 0, buffer1.limit()));
System.out.println("----------------------分隔符------------------------------");
System.out.println(new String(buffer2.array(), 0, buffer2.limit()));
// 聚集写入
RandomAccessFile fas1 = null;
try {
fas1 = new RandomAccessFile("2.txt", "rw");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
FileChannel channel2 = fas1.getChannel();
try {
channel2.write(buf);
} catch (IOException e) {
e.printStackTrace();
}
try {
channel.close();
channel2.close();
fas1.close();
ras.close();
} catch (Exception e) {
}
}
}
5. TCP通信中的阻塞与非阻塞
在之前了解的TCP通信中,使用到的Socket类有:ServerSocket、Socket,使用这两个类进行TCP通信的时候,采用的是阻塞式通信,如果要使用非阻塞的方式进行TCP通信,那就得使用ServerSocketChannel和SocketChannel这两个类。
在非阻塞通信中,有三个重要的类或者接口是需要熟悉的:
- Channel:是个接口,上边了解到,其实就是个连接对象,用于Buffer的数据传输,下面是部分实现类。
- SocketChannel
- ServerSocketChannel
- DatagramChannel
- Buffer:缓冲区
- Seletor:SelectableChannel 对象的多路复用器。 选择器,在非阻塞通信中,用于将Channel注册到选择器中,并且指定注册事件,在事件完成时,才执行具体的操作。
这里需要了解什么是SelectableChannel 对象,从Java文档中了解到,上边写到的三个Channel以及后边讲的Pipe.SinkChannel和SourceChannel都是SelectableChannel 对象,它们继承AbstractSelectableChannel,而AbstractSelectableChannel又继承SelectableChannel。
而SelectionKey则就是在进行注册时必须要指定的注册事件,用于后续监听,被称为选择键,有下面几个选择键类型以及常用方法
下面用代码的方式实现一下SocketChannel和ServerSocketChannel的阻塞与非阻塞方式的通信:
阻塞式的通信方式(其实默认为阻塞方式,若要实现非阻塞得调用configureBlocking()方法进行配置)
package nio_blocking;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Date;
import java.util.Locale;
import java.util.Scanner;
/** * @ClassName Demo * @Description: NIO的阻塞式 * @Author 田清波 * @Mail tqb820965236@163.com * @Date 2019/7/25 17:29 * @Version v1.0 */
public class Demo {
@Test
public void client() throws Exception {
// 创建客户端Channel
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999));
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 向缓冲区中存放数据
String str = "hello world";
buffer.put((LocalDateTime.now() + "\n" + str).getBytes());
// 切换模式
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
socketChannel.close();
}
@Test
public void server() throws IOException {
// 创建服务器端的channel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(9999));
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 获取客户端socket连接对象
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println(socketChannel.getLocalAddress() + "连接至服务器");
// 读取数据
socketChannel.read(buffer);
// 切换到读模式
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
}
}
非阻塞式的通信方式
package nio_nonblocking;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.time.LocalDateTime;
import java.util.Iterator;
/** * @ClassName Demo1 * @Description: NIO的非阻塞通信方式 * @Author 田清波 * @Mail tqb820965236@163.com * @Date 2019/7/25 19:53 * @Version v1.0 */
public class Demo1 {
@Test
public void client() throws IOException {
// 创建客户端Channel
SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999));
// 设置Channel为非阻塞模式
sChannel.configureBlocking(false);
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 发送数据
buffer.put(LocalDateTime.now().toString().getBytes());
buffer.flip();
sChannel.write(buffer);
buffer.clear();
// 关闭通道
sChannel.close();
}
@Test
public void server() throws IOException {
ServerSocketChannel ssChannel = ServerSocketChannel.open();
// non-blocking
ssChannel.configureBlocking(false);
// bind port
ssChannel.bind(new InetSocketAddress(9999));
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 创建选择器
Selector selector = Selector.open();
// 注册
ssChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮询注册事件
while (selector.select() > 0) {
Iterator<SelectionKey> it = selector.selectedKeys().iterator();
// 进行判断事件是否为Accept
while (it.hasNext()) {
SelectionKey s = it.next();
if (s.isAcceptable()){
SocketChannel sChannel = ssChannel.accept();
sChannel.configureBlocking(false);
sChannel.register(selector, SelectionKey.OP_READ);
}else if (s.isReadable()){
SocketChannel sChannel = (SocketChannel) s.channel();
sChannel.read(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
// 取消选择键selectionKeys
it.remove();
}
// ssChannel.close();
}
}
6. UDP的非阻塞式通信
上边针对TCP方式进行了讲解,UDP非阻塞式通信的代码和TCP差的不多,但是是两种完全不同的通信方式,不要混淆。
package udp_nonblocking;
import org.junit.Test;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.time.LocalDateTime;
import java.util.Iterator;
import java.util.Scanner;
/** * @ClassName Demo * @Description: UDP通信的NIO方式 * @Author 田清波 * @Mail tqb820965236@163.com * @Date 2019/7/26 9:15 * @Version v1.0 */
public class Demo {
// @Test
/*public void send() throws IOException { }*/
// IDEA @Test 在控制台无法输入,有解决办法的可以告诉一下我
public static void main(String[] args) throws IOException {
DatagramChannel dChannel = DatagramChannel.open();
// 创建缓冲区
// NIO
dChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner sc = new Scanner(System.in);
if (sc.hasNext()) {
String str = sc.nextLine();
buffer.put((LocalDateTime.now() + ":\n" + str).getBytes());
buffer.flip();
dChannel.send(buffer, new InetSocketAddress("127.0.0.1", 9999));
buffer.clear();
}
dChannel.close();
}
@Test
public void receive() throws IOException {
DatagramChannel reChannel = DatagramChannel.open();
reChannel.bind(new InetSocketAddress("127.0.0.1", 9999));
reChannel.configureBlocking(false);
Selector selector = Selector.open();
// 注册
reChannel.register(selector, SelectionKey.OP_READ);
while (selector.select() > 0) {
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
reChannel.receive(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
buffer.clear();
}
}
iterator.remove();
}
}
}
7. 管道通信
了解
package pipe;
import org.junit.Test;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Pipe;
/** * @ClassName Demo * @Description: 管道方式通讯 * @Author 田清波 * @Mail tqb820965236@163.com * @Date 2019/7/26 9:45 * @Version v1.0 */
public class Demo {
@Test
public void test1() throws IOException {
Pipe pipe = Pipe.open();
// 创建管道写入Channel
Pipe.SinkChannel sinkChannel = pipe.sink();
// 创建缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("单向管道通信".getBytes());
buffer.flip();
// 向管道中写入数据
sinkChannel.write(buffer);
// 创建管道读取Channel
Pipe.SourceChannel sourceChannel = pipe.source();
buffer.flip();
int len = sourceChannel.read(buffer);
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}