- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) >
- Java進(jìn)階之高并發(fā)核心Selector詳解
筆者下載得是openjdk8的源碼, 畫(huà)出類(lèi)圖
比較清晰得看到,openjdk中Selector的實(shí)現是SelectorImpl,然后SelectorImpl又將職責委托給了具體的平臺
,比如圖中框出的
EpollSelectorImpl
WindowsSelectorImpl
KQueueSelectorImpl
從名字也可以猜到,openjdk肯定在底層還是用epoll,kqueue,iocp這些技術(shù)來(lái)實(shí)現的I/O多路復用
。
眾所周知,Selector.open()可以得到一個(gè)Selector實(shí)例,怎么實(shí)現的呢?
// Selector.java public static Selector open() throws IOException { // 首先找到provider,然后再打開(kāi)Selector return SelectorProvider.provider().openSelector(); }
// java.nio.channels.spi.SelectorProvider public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; // 這里就是打開(kāi)Selector的真正方法 provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
在openjdk中,每個(gè)操作系統都有一個(gè)sun.nio.ch.DefaultSelectorProvider實(shí)現
,以solaris為例:
/** * Returns the default SelectorProvider. */ public static SelectorProvider create() { // 獲取OS名稱(chēng) String osname = AccessController .doPrivileged(new GetPropertyAction("os.name")); // 根據名稱(chēng)來(lái)創(chuàng )建不同的Selctor if (osname.equals("SunOS")) return createProvider("sun.nio.ch.DevPollSelectorProvider"); if (osname.equals("Linux")) return createProvider("sun.nio.ch.EPollSelectorProvider"); return new sun.nio.ch.PollSelectorProvider(); }
如果系統名稱(chēng)是Linux的話(huà),真正創(chuàng )建的是sun.nio.ch.EPollSelectorProvider
。如果不是SunOS也不是Linux,就使用sun.nio.ch.PollSelectorProvider
, 關(guān)于PollSelector有興趣的讀者自行了解下, 本文僅以實(shí)際常用的EpollSelector為例探討。
打開(kāi)sun.nio.ch.EPollSelectorProvider查看openSelector方法
public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); }
很直觀(guān),這樣我們在Linux平臺就得到了最終的Selector實(shí)現:sun.nio.ch.EPollSelectorImpl
epoll系統調用主要分為3個(gè)函數
epoll_create: 創(chuàng )建一個(gè)epollfd,并開(kāi)辟epoll自己的內核高速cache區,建立紅黑樹(shù),分配好想要的size的內存對象,建立一個(gè)list鏈表,用于存儲準備就緒的事件。epoll_wait: 等待內核返回IO事件epoll_ctl: 對新舊事件進(jìn)行新增修改或者刪除
EPollSelectorImpl的構造器代碼如下:
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); // makePipe返回管道的2個(gè)文件描述符,編碼在一個(gè)long類(lèi)型的變量中 // 高32位代表讀 低32位代表寫(xiě) // 使用pipe為了實(shí)現Selector的wakeup邏輯 long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; // 新建一個(gè)EPollArrayWrapper pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); }
再看EPollArrayWrapper的初始化過(guò)程
EPollArrayWrapper() throws IOException { // creates the epoll file descriptor // 創(chuàng )建epoll fd epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); // eventHigh needed when using file descriptors > 64k if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap<>(); } private native int epollCreate();
在初始化過(guò)程中調用了epollCreate
方法,這是個(gè)native方法。
打開(kāi)
jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c
EPollArrayWrapper() throws IOException { // creates the epoll file descriptor // 創(chuàng )建epoll fd epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); // eventHigh needed when using file descriptors > 64k if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap<>(); } private native int epollCreate();
可以看到最后還是使用了操作系統的api: epoll_create函數
調用Selector.select()
,最后會(huì )委托給各個(gè)實(shí)現的doSelect
方法,限于篇幅不貼出太詳細的,這里看下EpollSelectorImpl的doSelect
方法
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); // 真正的實(shí)現是這行 pollWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); // 以下基本都是異常處理 if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
然后我們去看pollWrapper.poll
, 打開(kāi)jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java
:
int poll(long timeout) throws IOException { updateRegistrations(); // 這個(gè)epollWait是不是有點(diǎn)熟悉呢? updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; } private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
epollWait也是個(gè)native方法,打開(kāi)c代碼一看:
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this, jlong address, jint numfds, jlong timeout, jint epfd) { struct epoll_event *events = jlong_to_ptr(address); int res; if (timeout <= 0) { /* Indefinite or no wait */ // 發(fā)起epoll_wait系統調用等待內核事件 RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res); } else { /* Bounded wait; bounded restarts */ res = iepoll(epfd, events, numfds, timeout); } if (res < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed"); } return res; } =
可以看到,最后還是發(fā)起的epoll_wait
系統調用.
JDK中對于注冊到Selector上的IO事件關(guān)系是使用SelectionKey來(lái)表示
,代表了Channel感興趣的事件,如Read,Write,Connect,Accept
.
調用Selector.register()
時(shí)均會(huì )將事件存儲到EpollArrayWrapper
的成員變量eventsLow和eventsHigh中
// events for file descriptors with registration changes pending, indexed // by file descriptor and stored as bytes for efficiency reasons. For // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at // least) then the update is stored in a map. // 使用數組保存事件變更, 數組的最大長(cháng)度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024 private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; // 超過(guò)數組長(cháng)度的事件會(huì )緩存到這個(gè)map中,等待下次處理 private Map<Integer,Byte> eventsHigh; /** * Sets the pending update events for the given file descriptor. This * method has no effect if the update events is already set to KILLED, * unless {@code force} is {@code true}. */ private void setUpdateEvents(int fd, byte events, boolean force) { // 判斷fd和數組長(cháng)度 if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } }
上面看到EpollArrayWrapper.poll()
的時(shí)候, 首先會(huì )調用updateRegistrations
/** * Returns the pending update events for the given file descriptor. */ private byte getUpdateEvents(int fd) { if (fd < MAX_UPDATE_ARRAY_SIZE) { return eventsLow[fd]; } else { Byte result = eventsHigh.get(Integer.valueOf(fd)); // result should never be null return result.byteValue(); } } /** * Update the pending registrations. */ private void updateRegistrations() { synchronized (updateLock) { int j = 0; while (j < updateCount) { int fd = updateDescriptors[j]; // 從保存的eventsLow和eventsHigh里取出事件 short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { // 判斷操作類(lèi)型以傳給epoll_ctl // 沒(méi)有指定EPOLLET事件類(lèi)型 if (isRegistered) { opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { // 熟悉的epoll_ctl epollCtl(epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } } private native void epollCtl(int epfd, int opcode, int fd, int events);
在獲取到事件之后將操作委托給了epollCtl
,這又是個(gè)native方法,打開(kāi)相應的c代碼一看:
JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd, jint opcode, jint fd, jint events) { struct epoll_event event; int res; event.events = events; event.data.fd = fd; // 發(fā)起epoll_ctl調用來(lái)進(jìn)行IO事件的管理 RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res); /* * A channel may be registered with several Selectors. When each Selector * is polled a EPOLL_CTL_DEL op will be inserted into its pending update * list to remove the file descriptor from epoll. The "last" Selector will * close the file descriptor which automatically unregisters it from each * epoll descriptor. To avoid costly synchronization between Selectors we * allow pending updates to be processed, ignoring errors. The errors are * harmless as the last update for the file descriptor is guaranteed to * be EPOLL_CTL_DEL. */ if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) { JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed"); } }
原來(lái)還是我們的老朋友epoll_ctl.
有個(gè)小細節是jdk沒(méi)有指定ET(邊緣觸發(fā))還是LT(水平觸發(fā)),所以默認會(huì )用LT:)
在AbstractSelectorImpl中有3個(gè)set保存事件
// Public views of the key sets // 注冊的所有事件 private Set<SelectionKey> publicKeys; // Immutable // 內核返回的IO事件封裝,表示哪些fd有數據可讀可寫(xiě) private Set<SelectionKey> publicSelectedKeys; // Removal allowed, but not addition // 取消的事件 private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
在EpollArrayWrapper.poll調用完成之后, 會(huì )調用updateSelectedKeys
來(lái)更新上面的仨set
private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated;
代碼很直白,拿出事件對set比對操作。
重點(diǎn)注意四個(gè)方法
這是一個(gè)阻塞方法,調用該方法,會(huì )阻塞,直到返回一個(gè)有事件發(fā)生的selectionKey集合
獲取不到有事件發(fā)生的selectionKey集合,也會(huì )立即返回
如果沒(méi)有獲取到有事件發(fā)生的selectionKey集合,阻塞指定的long時(shí)間
可以理解:selector一直在監聽(tīng)select()
Server代碼:
public class NIOServer { public static void main(String[] args) throws Exception{ //創(chuàng )建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一個(gè)Selecor對象 Selector selector = Selector.open(); //綁定一個(gè)端口6666, 在服務(wù)器端監聽(tīng) serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //設置為非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注冊到 selector 關(guān)心 事件為 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("注冊后的selectionkey 數量=" + selector.keys().size()); // 1 //循環(huán)等待客戶(hù)端連接 while (true) { //這里我們等待1秒,如果沒(méi)有事件發(fā)生, 返回 if(selector.select(1000) == 0) { //沒(méi)有事件發(fā)生 System.out.println("服務(wù)器等待了1秒,無(wú)連接"); continue; } //如果返回的>0, 就獲取到相關(guān)的 selectionKey集合 //1.如果返回的>0, 表示已經(jīng)獲取到關(guān)注的事件 //2. selector.selectedKeys() 返回關(guān)注事件的集合 // 通過(guò) selectionKeys 反向獲取通道 Set<SelectionKey> selectionKeys = selector.selectedKeys(); System.out.println("selectionKeys 數量 = " + selectionKeys.size()); //遍歷 Set<SelectionKey>, 使用迭代器遍歷 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //獲取到SelectionKey SelectionKey key = keyIterator.next(); //根據key 對應的通道發(fā)生的事件做相應處理 if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客戶(hù)端連接 //該該客戶(hù)端生成一個(gè) SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客戶(hù)端連接成功 生成了一個(gè) socketChannel " + socketChannel.hashCode()); //將 SocketChannel 設置為非阻塞 socketChannel.configureBlocking(false); //將socketChannel 注冊到selector, 關(guān)注事件為 OP_READ, 同時(shí)給socketChannel //關(guān)聯(lián)一個(gè)Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客戶(hù)端連接后 ,注冊的selectionkey 數量=" + selector.keys().size()); //2,3,4.. } if(key.isReadable()) { //發(fā)生 OP_READ //通過(guò)key 反向獲取到對應channel SocketChannel channel = (SocketChannel)key.channel(); //獲取到該channel關(guān)聯(lián)的buffer ByteBuffer buffer = (ByteBuffer)key.attachment(); channel.read(buffer); System.out.println("form 客戶(hù)端 " + new String(buffer.array())); } //手動(dòng)從集合中移動(dòng)當前的selectionKey, 防止重復操作 keyIterator.remove(); } } } }
Client代碼
public class NIOClient { public static void main(String[] args) throws Exception{ //得到一個(gè)網(wǎng)絡(luò )通道 SocketChannel socketChannel = SocketChannel.open(); //設置非阻塞 socketChannel.configureBlocking(false); //提供服務(wù)器端的ip 和 端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); //連接服務(wù)器 if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因為連接需要時(shí)間,客戶(hù)端不會(huì )阻塞,可以做其它工作.."); } } //...如果連接成功,就發(fā)送數據 String str = "hello, 尚硅谷~"; //Wraps a byte array into a buffer ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); //發(fā)送數據,將 buffer 數據寫(xiě)入 channel socketChannel.write(buffer); System.in.read(); } }
jdk中Selector是對操作系統的IO多路復用調用的一個(gè)封裝,在Linux中就是對epoll的封裝。epoll實(shí)質(zhì)上是將event loop交給了內核,因為網(wǎng)絡(luò )數據都是首先到內核的,直接內核處理可以避免無(wú)謂的系統調用和數據拷貝, 性能是最好的。
jdk中對IO事件的封裝是SelectionKey, 保存Channel關(guān)心的事件。
到此這篇關(guān)于Java進(jìn)階之高并發(fā)核心Selector詳解的文章就介紹到這了,更多相關(guān)高并發(fā)核心Selector內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站