- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) > 編程語(yǔ)言 >
- java基礎之NIO介紹及使用
java.nio全稱(chēng)java non-blocking IO,是指jdk1.4 及以上版本里提供的新api(New IO) ,為所有的原始類(lèi)型(boolean類(lèi)型除外)提供緩存支持的數據容器,使用它可以提供非阻塞式的高伸縮性網(wǎng)絡(luò )。
NIO三大組件:Channel、Buffer、Selector
1.Channel 和Buffer
Channel是一個(gè)對象,可以通過(guò)它讀取和寫(xiě)入數據。拿 NIO 與原來(lái)的 I/O 做個(gè)比較,通道就像是流,而且他們面向緩沖區(Buffer)的。所有數據都通過(guò)Buffer對象來(lái)處理,永遠不會(huì )將字節直接寫(xiě)入通道中,而是將數據寫(xiě)入包含一個(gè)或多個(gè)字節的緩沖區。也不會(huì )直接從通道中讀取字節,而是將數據從通道讀入緩沖區,再從緩沖區獲取這個(gè)字節。
Channel是讀寫(xiě)數據的雙向通道,可以從Channel將數據讀取Buffer,也可將Buffer的數據寫(xiě)入Channel,而之前的Stream要么是輸入(InputStream)、要么是輸出(OutputStream),只在一個(gè)方向上流通。 而通道(Channel)可以用于讀、寫(xiě)或者同時(shí)用于讀寫(xiě)
常見(jiàn)的Channel
1.FileChannel
2.DatagramChannel
3.SocketChannel
4.ServerSocketChannel
Buffer緩沖區用來(lái)讀寫(xiě)數據,常見(jiàn)的Buffer
1.ByteBuffer
2.ShortBuffer
3.IntBuffer
4.LongBuffer
5.FloatBuffer
6.DoubleBuffer
7.CharBuffer
2.Selector
在多線(xiàn)程模式下,阻塞IO時(shí),一個(gè)線(xiàn)程只能處理一個(gè)請求,比如http請求,當請求響應式關(guān)閉連接,釋放線(xiàn)程資源。Selector選擇器的作用就是配合一個(gè)線(xiàn)程來(lái)管理多個(gè)Channel,獲取這些Channel上發(fā)生的事件,這些Channel工作在非阻塞模式下,不會(huì )讓線(xiàn)程一直在一個(gè)Channel上,適合連接數特別多,但流量低的場(chǎng)景。
調用Selector的select()方法會(huì )阻塞直到Channel發(fā)送了讀寫(xiě)就緒事件,這些事件發(fā)生,select()方法就會(huì )
返回這些事件交給thread來(lái)處理。
屬性:
Buffer的讀寫(xiě)操作就是通過(guò)改變position,mark,limit的值來(lái)實(shí)現。注意他們之間的關(guān)系可以很輕松的讀、寫(xiě)、覆蓋。
方法:
Buffer的方法全是根據改變position的值進(jìn)行操作的。
private static void buffer1() { String data = "abc"; //byte[] bytes = new byte[1024]; //創(chuàng )建一個(gè)字節緩沖區,1024byte ByteBuffer byteBuffer = ByteBuffer.allocate(15); System.out.println(byteBuffer.toString()); // 標記當前下標position位置,mark = position ,返回當前對象 System.out.println(byteBuffer.mark()); //對于寫(xiě)入模式,表示當前可以寫(xiě)入的數組大小,默認為數組的最大長(cháng)度,對于讀取模式,表示當前最多可以讀取的數據的位置下標 System.out.println(byteBuffer.limit()); // 對于寫(xiě)入模式,表示當前可寫(xiě)入數據的下標,對于讀取模式,表示接下來(lái)可以讀取的數據的下標 System.out.println(byteBuffer.position()); //capacity 表示當前數組的容量大小 System.out.println(byteBuffer.capacity()); //將字節數據寫(xiě)入 byteBuffer byteBuffer.put(data.getBytes()); //保存了當前寫(xiě)入的數據 for(Byte b : byteBuffer.array()){ System.out.print(b + " "); } System.out.println(); System.out.println(new String(byteBuffer.array())); //讀寫(xiě)模式切換 改變 limit,position ,mark的值 byteBuffer.flip(); //切換為寫(xiě)模式,將第一個(gè)字節覆蓋 byteBuffer.put("n".getBytes()); // abc 改為 nbc System.out.println(new String(byteBuffer.array())); //讓position為之前標記的值,調用mark()方法是將mark = position,這里將position = mark,mark為初始值拋出異常 byteBuffer.mark(); byteBuffer.reset(); //將position 和 mark設為初始值,調用這個(gè)可以一直讀取內容或者一直寫(xiě)入覆蓋之前內容,從第一位開(kāi)始讀/寫(xiě) byteBuffer.rewind(); for(Byte b : byteBuffer.array()){ System.out.print(b + " "); } System.out.println(); System.out.println(byteBuffer.toString()); //找到可寫(xiě)入的開(kāi)始位置,不覆蓋之前的數據 byteBuffer.compact(); System.out.println(byteBuffer.toString()); }
寫(xiě)入讀取完整操作
private static void buffer(){ //寫(xiě)入的數據 String data = "1234"; //創(chuàng )建一個(gè)字節緩沖區,1024byte ByteBuffer byteBuffer = ByteBuffer.allocate(15); //寫(xiě)入數據 byteBuffer.put(data.getBytes()); //打輸出 49 50 51 52 0 0 0 0 0 0 0 0 0 0 0 println(byteBuffer.array()); byteBuffer.put((byte) 5); //追加寫(xiě)入 輸出: 49 50 51 52 5 0 0 0 0 0 0 0 0 0 0 println(byteBuffer.array()); //覆蓋寫(xiě)入 byteBuffer.flip(); //將寫(xiě)入下標的 position = 0 byteBuffer.put((byte) 1); byteBuffer.put((byte) 2); byteBuffer.put((byte) 3); byteBuffer.put((byte) 4); byteBuffer.put((byte) 5); // 打印輸出 : 1 2 3 4 5 0 0 0 0 0 0 0 0 0 0 println(byteBuffer.array()); //讀取數據操作 byteBuffer.flip();//從頭開(kāi)始讀 while (byteBuffer.position() != byteBuffer.limit()){ System.out.println(byteBuffer.get()); } //此時(shí) position 和 數據數 limit相等 System.out.println(byteBuffer.toString()); //批量讀取 byteBuffer.flip(); //將 position 置位0,從頭開(kāi)始操作 //創(chuàng )建一個(gè)byte數組,數組大小為可讀取的大小 byte[] bytes = new byte[byteBuffer.limit()]; byteBuffer.get(bytes); //輸出bytes 1 2 3 4 5 println(bytes); } private static void println(byte[] bytes){ for(Byte b : bytes){ System.out.print(b + " "); } System.out.println(); }
字符串跟ByteBuffer之間的轉換
public static void main(String[] args) { /*======================字符串轉buffer===========================*/ //1.字符串 轉為buytebuffer 需要轉為讀模式再進(jìn)行讀取操作 ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("nio".getBytes()); //2.charset 自動(dòng)轉為讀模式 ByteBuffer buffer1 = StandardCharsets.UTF_8.encode("nio"); //3, warp 自動(dòng)轉為讀模式 ByteBuffer buffer2 = ByteBuffer.wrap("nio".getBytes()); /*======================buffer轉字符串===========================*/ String str = StandardCharsets.UTF_8.decode(buffer1).toString(); System.out.println(str); }
文件編程FileChannel
FileChannel只能工作在阻塞模式下,不能配合在Selector使用,Channel是雙向通道,但是FileChannel根據獲取源頭判定可讀或可寫(xiě)
** * 文件流對象打開(kāi)Channel,FileChannel是阻塞的 * @throws Exception */ private static void channel() throws Exception{ FileInputStream in = new FileInputStream("C:\\Users\\zqq\\Desktop\\123.txt"); FileOutputStream out = new FileOutputStream("C:\\Users\\zqq\\Desktop\\321.txt"); //通過(guò)文件輸入流創(chuàng )建通道channel FileChannel channel = in.getChannel(); //獲取FileChannel FileChannel outChannel = out.getChannel(); //創(chuàng )建緩沖區byteBuffer ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //將管道channel中數據讀取緩存區byteBuffer中,channel只能與Buffer交互 while (channel.read(byteBuffer) != -1){ //position設置為0,從頭開(kāi)始讀 byteBuffer.flip(); outChannel.write(byteBuffer); //byteBuffer 屬性還原 byteBuffer.clear(); } //關(guān)閉各種資源 channel.close(); out.flush(); out.close(); in.close(); out.close(); }
/** * 靜態(tài)方法打開(kāi)Channel * @throws Exception */ public static void channel1() throws Exception{ // StandardOpenOption.READ :讀取一個(gè)已存在的文件,如果不存在或被占用拋出異常 // StandardOpenOption.WRITE :以追加到文件頭部的方式,寫(xiě)入一個(gè)已存在的文件,如果不存在或被占用拋出異常 // StandardOpenOption.APPEND:以追加到文件尾部的方式,寫(xiě)入一個(gè)已存在的文件,如果不存在或被占用拋出異常 // StandardOpenOption.CREATE:創(chuàng )建一個(gè)空文件,如果文件存在則不創(chuàng )建 // StandardOpenOption.CREATE_NEW:創(chuàng )建一個(gè)空文件,如果文件存在則報錯 // StandardOpenOption.DELETE_ON_CLOSE:管道關(guān)閉時(shí)刪除文件 //創(chuàng )建讀通道 FileChannel inChannel = FileChannel.open(Paths.get("C:\\Users\\zqq\\Desktop\\123.txt"), StandardOpenOption.READ); FileChannel outChannel = FileChannel.open(Paths.get("C:\\Users\\zqq\\Desktop\\321.txt"), StandardOpenOption.READ,StandardOpenOption.WRITE,StandardOpenOption.CREATE); //內存映射 MappedByteBuffer inByteBuffer = inChannel.map(FileChannel.MapMode.READ_ONLY,0,inChannel.size()); MappedByteBuffer outByteBuffer = outChannel.map(FileChannel.MapMode.READ_WRITE,0,inChannel.size()); //直接對緩沖區數據讀寫(xiě) byte[] bytes = new byte[inByteBuffer.limit()]; inByteBuffer.get(bytes);//讀取inByteBuffer內的數據,讀到bytes數組中 outByteBuffer.put(bytes);//寫(xiě)入到outByteBuffer inChannel.close(); outChannel.close(); }
RandomAccessFile打開(kāi)通道Channel
/** * 通過(guò)RandomAccessFile獲取雙向通道 * @throws Exception */ private static void random() throws Exception{ try (RandomAccessFile randomAccessFile = new RandomAccessFile("D:\\workspace\\Demo\\test.txt","rw")){ //獲取Channel FileChannel fileChannel = randomAccessFile.getChannel(); //截取字節 //fileChannel.truncate(10); //創(chuàng )建ByteBuffer,注意文件大小 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); fileChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array(),"GBK")); byteBuffer.clear(); String data = "this is data\r"; byteBuffer.put(data.getBytes()); byteBuffer.flip();//讀寫(xiě)切換 while (byteBuffer.hasRemaining()){ fileChannel.write(byteBuffer); } //將通道數據強制寫(xiě)到磁盤(pán) fileChannel.force(true); } }
FileChannel數據傳輸transferTo
/** * 一個(gè)文件向另一個(gè)文件傳輸(copy) */ private static void transferTo() { try ( FileChannel inChannel = new FileInputStream("C:\\Users\\44141\\Desktop\\demo\\in.txt").getChannel(); FileChannel outChannel = new FileOutputStream("C:\\Users\\44141\\Desktop\\demo\\out.txt").getChannel() ){ //底層使用操作系統零拷貝進(jìn)行優(yōu)化,效率高。限制2g inChannel.transferTo(0,inChannel.size(),outChannel); }catch (Exception e){ e.printStackTrace(); } } /** * 大于2g數據 */ private static void transferToGt2g() { try ( FileChannel inChannel = new FileInputStream("C:\\Users\\44141\\Desktop\\demo\\in.txt").getChannel(); FileChannel outChannel = new FileOutputStream("C:\\Users\\44141\\Desktop\\demo\\out1.txt").getChannel() ){ //記錄inChannel初始化大小 long size = inChannel.size(); //多次傳輸 for(long rsize = size; rsize > 0;){ //transferTo返回位移的字節數 rsize -= inChannel.transferTo((size-rsize),rsize,outChannel); } }catch (Exception e){ e.printStackTrace(); } }
阻塞模式
阻塞模式服務(wù)端每個(gè)方法都會(huì )阻塞等待客戶(hù)端操作。比如accept()方法阻塞等待客戶(hù)端連接,read()方法阻塞等待客戶(hù)端傳送數據,并發(fā)訪(fǎng)問(wèn)下沒(méi)法高效的進(jìn)行工作。
1.服務(wù)端代碼
private static void server() throws Exception{ //1.創(chuàng )建服務(wù)器 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2.綁定監聽(tīng)端口 serverSocketChannel.bind(new InetSocketAddress(8080)); while (true){ System.out.println("開(kāi)始監聽(tīng)連接=============" ); //4.accept 監聽(tīng)進(jìn)來(lái)的連接 返回SocketChannel對象,accept默認阻塞 SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("有連接連入===============" ); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); // read()是阻塞方法,等客戶(hù)端發(fā)送數據才會(huì )執行 socketChannel.read(byteBuffer); byteBuffer.flip(); String str = StandardCharsets.UTF_8.decode(byteBuffer).toString(); System.out.println("接收到數據=============:" + str); } }
2.客戶(hù)端代碼
private static void client() throws Exception { //1.創(chuàng )建客戶(hù)端 SocketChannel socketChannel = SocketChannel.open(); //連接服務(wù)端 socketChannel.connect(new InetSocketAddress("localhost",8080)); //2 秒后寫(xiě)入數據 Thread.sleep(2 * 1000); socketChannel.write(StandardCharsets.UTF_8.encode("nio")); System.out.println(); }
非阻塞模式
服務(wù)端設置成非阻塞模式??蛻?hù)端不用動(dòng)。
private static void server() throws Exception{ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //設置成非阻塞模式 serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(new InetSocketAddress(8080)); while (true){ SocketChannel socketChannel = serverSocketChannel.accept(); //非阻塞模式下,SocketChannel會(huì )返回為null if(socketChannel != null){ //非阻塞模式 socketChannel.configureBlocking(false); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); byteBuffer.flip(); String str = StandardCharsets.UTF_8.decode(byteBuffer).toString(); System.out.println("接收到數據=============:" + str); } } }
Selector選擇器的作用就是配合一個(gè)線(xiàn)程來(lái)管理多個(gè)Channel,被Selector管理的Channel必須是非阻塞模式下的,所以Selector沒(méi)法與FileChannel(FileChannel只有阻塞模式)一起使用。
創(chuàng )建Selector
創(chuàng )建Selector只需要調用一個(gè)靜態(tài)方法
//創(chuàng )建Selector Selector selector = Selector.open();
Selector可以用來(lái)監聽(tīng)Channel的事件,總共有四個(gè)事件:
Selector與Channel綁定
//設置成非阻塞模式 serverSocketChannel.configureBlocking(false); SelectionKey selectionKey = serverSocketChannel.register(selector,0,null); //綁定關(guān)注的事件 selectionKey.interestOps(SelectionKey.OP_ACCEPT);
1.客戶(hù)端代碼 SocketChannel
public static void main(String[] args) throws Exception { client(); } private static void client() throws Exception { //1.創(chuàng )建客戶(hù)端 SocketChannel socketChannel = SocketChannel.open(); //連接服務(wù)端 socketChannel.connect(new InetSocketAddress("localhost",8080)); //2 秒后寫(xiě)入數據 Thread.sleep(2 * 1000); socketChannel.write(StandardCharsets.UTF_8.encode("nio")); //3.讀取服務(wù)端返回數據 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); byteBuffer.flip(); System.out.println("服務(wù)端返回數據=======:" + StandardCharsets.UTF_8.decode(byteBuffer).toString()); //斷開(kāi)連接 socketChannel.close(); }
2.服務(wù)端代碼
public static void main(String[] args) throws Exception{ server(); } private static void server() throws Exception{ //創(chuàng )建Selector Selector selector = Selector.open(); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //設置成非阻塞模式 serverSocketChannel.configureBlocking(false); //將Channel注冊到selector上(綁定關(guān)系) //當事件發(fā)生后可以根據SelectionKey知道哪個(gè)事件和哪個(gè)Channel的事件 SelectionKey selectionKey = serverSocketChannel.register(selector,0,null); //selectionKey 關(guān)注ACCEPT事件(也可以在上面注冊時(shí)用第二個(gè)參數設置) selectionKey.interestOps(SelectionKey.OP_ACCEPT); serverSocketChannel.bind(new InetSocketAddress(8080)); while (true){ System.out.println("阻塞===================="); // select方法沒(méi)有事件發(fā)生時(shí)阻塞線(xiàn)程,有事件線(xiàn)程會(huì )恢復運行 selector.select(); System.out.println("開(kāi)始處理事件================="); // 處理事件 Iterator<SelectionKey> iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()){ SelectionKey sk = iterator.next(); //獲取到SelectionKey對象之后,將集合內的引用刪掉(Selecotr不會(huì )自動(dòng)刪除) iterator.remove(); //取消事件,不操作(不處理或者不取消事件,selector.select()方法不會(huì )阻塞) //sk.cancel(); //區分不同的事件做不同的動(dòng)作 if(sk.isAcceptable()){ //有連接請求事件 //通過(guò)SelectionKey 獲取對應的channel ServerSocketChannel channel = (ServerSocketChannel) sk.channel(); SocketChannel socketChannel = channel.accept(); socketChannel.configureBlocking(false); //讀取數據內容,bytebuffer大小注意消息邊界(一個(gè)字符串被分割讀取多次出來(lái)結果不準確) ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //將socketChannel綁定Selector SelectionKey socketKey = socketChannel.register(selector,0,byteBuffer); //關(guān)注可讀事件 socketKey.interestOps(SelectionKey.OP_READ); }else if(sk.isReadable()){//可讀事件 try { //取到Channel SocketChannel socketChannel = (SocketChannel) sk.channel(); //獲取到綁定的ByteBuffer ByteBuffer byteBuffer = (ByteBuffer) sk.attachment(); int read = socketChannel.read(byteBuffer); //如果是正常斷開(kāi) read = -1 if(read == -1){ //取消事件 sk.cancel(); continue; } byteBuffer.flip(); String str = StandardCharsets.UTF_8.decode(byteBuffer).toString(); System.out.println("服務(wù)端讀取到數據===========:" + str); //寫(xiě)數據回客戶(hù)端 ByteBuffer writeBuffer = StandardCharsets.UTF_8.encode("this is result"); socketChannel.write(writeBuffer); //如果數據一次沒(méi)寫(xiě)完關(guān)注可寫(xiě)事件進(jìn)行再次寫(xiě)入(大數據一次寫(xiě)不完的情況) if(writeBuffer.hasRemaining()){ //關(guān)注可寫(xiě)事件,添加事件,用interestOps()方法獲取到之前的事件加上可寫(xiě)事件(類(lèi)似linux系統的賦權限 777) sk.interestOps(sk.interestOps() + SelectionKey.OP_WRITE); sk.attach(writeBuffer); //位運算符也可以 //sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE); } }catch (IOException e){ e.printStackTrace(); //客戶(hù)端異常斷開(kāi)連接 ,取消事件 sk.cancel(); } }else if(sk.isWritable()){ //取到Channel SocketChannel socketChannel = (SocketChannel) sk.channel(); //獲取到綁定的ByteBuffer ByteBuffer writeBuffer = (ByteBuffer) sk.attachment(); socketChannel.write(writeBuffer); //如果全部寫(xiě)完,取消可寫(xiě)事件綁定,解除writeBuffer綁定 if(!writeBuffer.hasRemaining()){ sk.attach(null); //取消可寫(xiě)事件 sk.interestOps(sk.interestOps() - SelectionKey.OP_WRITE); } } } } }
到此這篇關(guān)于java基礎之NIO介紹及使用的文章就介紹到這了,更多相關(guān)Java NIO詳解內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自互聯(lián)網(wǎng)轉載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權請聯(lián)系站長(cháng)郵箱:ts@56dr.com進(jìn)行舉報,并提供相關(guān)證據,一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容。
Copyright ? 2009-2021 56dr.com. All Rights Reserved. 特網(wǎng)科技 版權所有 珠海市特網(wǎng)科技有限公司 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2