国产成人精品18p,天天干成人网,无码专区狠狠躁天天躁,美女脱精光隐私扒开免费观看

Java進(jìn)階之高并發(fā)核心Selector詳解

發(fā)布時(shí)間:2021-07-06 11:13 來(lái)源:腳本之家 閱讀:0 作者:cristianoxm 欄目: 開(kāi)發(fā)技術(shù) 歡迎投稿:712375056

目錄

        一、Selector設計

        筆者下載得是openjdk8的源碼, 畫(huà)出類(lèi)圖


        比較清晰得看到,openjdk中Selector的實(shí)現是SelectorImpl,然后SelectorImpl又將職責委托給了具體的平臺,比如圖中框出的

        • linux2.6以后才有的EpollSelectorImpl
        • Windows平臺是WindowsSelectorImpl
        • MacOSX平臺是KQueueSelectorImpl

        從名字也可以猜到,openjdk肯定在底層還是用epoll,kqueue,iocp這些技術(shù)來(lái)實(shí)現的I/O多路復用。

        二、獲取Selector

        眾所周知,Selector.open()可以得到一個(gè)Selector實(shí)例,怎么實(shí)現的呢?

        // Selector.java
        public static Selector open() throws IOException {
            // 首先找到provider,然后再打開(kāi)Selector
            return SelectorProvider.provider().openSelector();
        }
        
        // java.nio.channels.spi.SelectorProvider
            public static SelectorProvider provider() {
            synchronized (lock) {
                if (provider != null)
                    return provider;
                return AccessController.doPrivileged(
                    new PrivilegedAction<SelectorProvider>() {
                        public SelectorProvider run() {
                                if (loadProviderFromProperty())
                                    return provider;
                                if (loadProviderAsService())
                                    return provider;
                                    // 這里就是打開(kāi)Selector的真正方法
                                provider = sun.nio.ch.DefaultSelectorProvider.create();
                                return provider;
                            }
                        });
            }
        }
        

        在openjdk中,每個(gè)操作系統都有一個(gè)sun.nio.ch.DefaultSelectorProvider實(shí)現,以solaris為例:

        /**
         * Returns the default SelectorProvider.
         */
        public static SelectorProvider create() {
            // 獲取OS名稱(chēng)
            String osname = AccessController
                .doPrivileged(new GetPropertyAction("os.name"));
            // 根據名稱(chēng)來(lái)創(chuàng  )建不同的Selctor
            if (osname.equals("SunOS"))
                return createProvider("sun.nio.ch.DevPollSelectorProvider");
            if (osname.equals("Linux"))
                return createProvider("sun.nio.ch.EPollSelectorProvider");
            return new sun.nio.ch.PollSelectorProvider();
        }
        

        如果系統名稱(chēng)是Linux的話(huà),真正創(chuàng )建的是sun.nio.ch.EPollSelectorProvider。如果不是SunOS也不是Linux,就使用sun.nio.ch.PollSelectorProvider, 關(guān)于PollSelector有興趣的讀者自行了解下, 本文僅以實(shí)際常用的EpollSelector為例探討。

        打開(kāi)sun.nio.ch.EPollSelectorProvider查看openSelector方法

        public AbstractSelector openSelector() throws IOException {
            return new EPollSelectorImpl(this);
        }

        很直觀(guān),這樣我們在Linux平臺就得到了最終的Selector實(shí)現:sun.nio.ch.EPollSelectorImpl

        三、EPollSelector如何進(jìn)行select

        epoll系統調用主要分為3個(gè)函數

        epoll_create: 創(chuàng )建一個(gè)epollfd,并開(kāi)辟epoll自己的內核高速cache區,建立紅黑樹(shù),分配好想要的size的內存對象,建立一個(gè)list鏈表,用于存儲準備就緒的事件。epoll_wait: 等待內核返回IO事件epoll_ctl: 對新舊事件進(jìn)行新增修改或者刪除

        3.1 Epoll fd的創(chuàng )建

        EPollSelectorImpl的構造器代碼如下:

        EPollSelectorImpl(SelectorProvider sp) throws IOException {
            super(sp);
            // makePipe返回管道的2個(gè)文件描述符,編碼在一個(gè)long類(lèi)型的變量中
            // 高32位代表讀 低32位代表寫(xiě)
            // 使用pipe為了實(shí)現Selector的wakeup邏輯
            long pipeFds = IOUtil.makePipe(false);
            fd0 = (int) (pipeFds >>> 32);
            fd1 = (int) pipeFds;
            // 新建一個(gè)EPollArrayWrapper
            pollWrapper = new EPollArrayWrapper();
            pollWrapper.initInterrupt(fd0, fd1);
            fdToKey = new HashMap<>();
        }
        

        再看EPollArrayWrapper的初始化過(guò)程

        EPollArrayWrapper() throws IOException {
            // creates the epoll file descriptor
            // 創(chuàng  )建epoll fd
            epfd = epollCreate();
        
            // the epoll_event array passed to epoll_wait
            int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
            pollArray = new AllocatedNativeObject(allocationSize, true);
            pollArrayAddress = pollArray.address();
        
            // eventHigh needed when using file descriptors > 64k
            if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
                eventsHigh = new HashMap<>();
        }
        private native int epollCreate();
        

        在初始化過(guò)程中調用了epollCreate方法,這是個(gè)native方法。
        打開(kāi)

        jdk/src/solaris/native/sun/nio/ch/EPollArrayWrapper.c

        EPollArrayWrapper() throws IOException {
            // creates the epoll file descriptor
            // 創(chuàng  )建epoll fd
            epfd = epollCreate();
        
            // the epoll_event array passed to epoll_wait
            int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
            pollArray = new AllocatedNativeObject(allocationSize, true);
            pollArrayAddress = pollArray.address();
        
            // eventHigh needed when using file descriptors > 64k
            if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
                eventsHigh = new HashMap<>();
        }
        private native int epollCreate();
        

        可以看到最后還是使用了操作系統的api: epoll_create函數

        3.2 Epoll wait等待內核IO事件

        調用Selector.select(),最后會(huì )委托給各個(gè)實(shí)現的doSelect方法,限于篇幅不貼出太詳細的,這里看下EpollSelectorImpl的doSelect方法

        protected int doSelect(long timeout) throws IOException {
            if (closed)
                throw new ClosedSelectorException();
            processDeregisterQueue();
            try {
                begin();
                // 真正的實(shí)現是這行
                pollWrapper.poll(timeout);
            } finally {
                end();
            }
            processDeregisterQueue();
            int numKeysUpdated = updateSelectedKeys();
        
            // 以下基本都是異常處理
            if (pollWrapper.interrupted()) {
                // Clear the wakeup pipe
                pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
                synchronized (interruptLock) {
                    pollWrapper.clearInterrupted();
                    IOUtil.drain(fd0);
                    interruptTriggered = false;
                }
            }
            return numKeysUpdated;
        }
        

        然后我們去看pollWrapper.poll, 打開(kāi)jdk/src/solaris/classes/sun/nio/ch/EPollArrayWrapper.java:

        int poll(long timeout) throws IOException {
            updateRegistrations();
            // 這個(gè)epollWait是不是有點(diǎn)熟悉呢?
            updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
            for (int i=0; i<updated; i++) {
                if (getDescriptor(i) == incomingInterruptFD) {
                    interruptedIndex = i;
                    interrupted = true;
                    break;
                }
            }
            return updated;
        }
        
        private native int epollWait(long pollAddress, int numfds, long timeout,
                                     int epfd) throws IOException;
        

        epollWait也是個(gè)native方法,打開(kāi)c代碼一看:

        JNIEXPORT jint JNICALL
        Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                                    jlong address, jint numfds,
                                                    jlong timeout, jint epfd)
        {
            struct epoll_event *events = jlong_to_ptr(address);
            int res;
        
            if (timeout <= 0) {           /* Indefinite or no wait */
                // 發(fā)起epoll_wait系統調用等待內核事件
                RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
            } else {                      /* Bounded wait; bounded restarts */
                res = iepoll(epfd, events, numfds, timeout);
            }
        
            if (res < 0) {
                JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
            }
            return res;
        }
        =

        可以看到,最后還是發(fā)起的epoll_wait系統調用.

        3.3 epoll control以及openjdk對事件管理的封裝

        JDK中對于注冊到Selector上的IO事件關(guān)系是使用SelectionKey來(lái)表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept.

        調用Selector.register()時(shí)均會(huì )將事件存儲到EpollArrayWrapper的成員變量eventsLow和eventsHigh中

        // events for file descriptors with registration changes pending, indexed
        // by file descriptor and stored as bytes for efficiency reasons. For
        // file descriptors higher than MAX_UPDATE_ARRAY_SIZE (unlimited case at
        // least) then the update is stored in a map.
        // 使用數組保存事件變更, 數組的最大長(cháng)度是MAX_UPDATE_ARRAY_SIZE, 最大64*1024
        private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE];
        // 超過(guò)數組長(cháng)度的事件會(huì )緩存到這個(gè)map中,等待下次處理
        private Map<Integer,Byte> eventsHigh;
        
        
        /**
         * Sets the pending update events for the given file descriptor. This
         * method has no effect if the update events is already set to KILLED,
         * unless {@code force} is {@code true}.
         */
        private void setUpdateEvents(int fd, byte events, boolean force) {
            // 判斷fd和數組長(cháng)度
            if (fd < MAX_UPDATE_ARRAY_SIZE) {
                if ((eventsLow[fd] != KILLED) || force) {
                    eventsLow[fd] = events;
                }
            } else {
                Integer key = Integer.valueOf(fd);
                if (!isEventsHighKilled(key) || force) {
                    eventsHigh.put(key, Byte.valueOf(events));
                }
            }
        }
        

        上面看到EpollArrayWrapper.poll()的時(shí)候, 首先會(huì )調用updateRegistrations

        /**
         * Returns the pending update events for the given file descriptor.
         */
        private byte getUpdateEvents(int fd) {
            if (fd < MAX_UPDATE_ARRAY_SIZE) {
                return eventsLow[fd];
            } else {
                Byte result = eventsHigh.get(Integer.valueOf(fd));
                // result should never be null
                return result.byteValue();
            }
        }
        
        /**
         * Update the pending registrations.
         */
        private void updateRegistrations() {
            synchronized (updateLock) {
                int j = 0;
                while (j < updateCount) {
                    int fd = updateDescriptors[j];
                    // 從保存的eventsLow和eventsHigh里取出事件
                    short events = getUpdateEvents(fd);
                    boolean isRegistered = registered.get(fd);
                    int opcode = 0;
        
                    if (events != KILLED) {
                        // 判斷操作類(lèi)型以傳給epoll_ctl
                        // 沒(méi)有指定EPOLLET事件類(lèi)型
                        if (isRegistered) {
                            opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
                        } else {
                            opcode = (events != 0) ? EPOLL_CTL_ADD : 0;
                        }
                        if (opcode != 0) {
                            // 熟悉的epoll_ctl
                            epollCtl(epfd, opcode, fd, events);
                            if (opcode == EPOLL_CTL_ADD) {
                                registered.set(fd);
                            } else if (opcode == EPOLL_CTL_DEL) {
                                registered.clear(fd);
                            }
                        }
                    }
                    j++;
                }
                updateCount = 0;
            }
        }
        private native void epollCtl(int epfd, int opcode, int fd, int events);
        

        在獲取到事件之后將操作委托給了epollCtl,這又是個(gè)native方法,打開(kāi)相應的c代碼一看:

        JNIEXPORT void JNICALL
        Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                                   jint opcode, jint fd, jint events)
        {
            struct epoll_event event;
            int res;
        
            event.events = events;
            event.data.fd = fd;
        
            // 發(fā)起epoll_ctl調用來(lái)進(jìn)行IO事件的管理
            RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
        
            /*
             * A channel may be registered with several Selectors. When each Selector
             * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
             * list to remove the file descriptor from epoll. The "last" Selector will
             * close the file descriptor which automatically unregisters it from each
             * epoll descriptor. To avoid costly synchronization between Selectors we
             * allow pending updates to be processed, ignoring errors. The errors are
             * harmless as the last update for the file descriptor is guaranteed to
             * be EPOLL_CTL_DEL.
             */
            if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
                JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
            }
        }
        

        原來(lái)還是我們的老朋友epoll_ctl.
        有個(gè)小細節是jdk沒(méi)有指定ET(邊緣觸發(fā))還是LT(水平觸發(fā)),所以默認會(huì )用LT:)

        AbstractSelectorImpl中有3個(gè)set保存事件

        // Public views of the key sets
        // 注冊的所有事件
        private Set<SelectionKey> publicKeys;             // Immutable
        // 內核返回的IO事件封裝,表示哪些fd有數據可讀可寫(xiě)
        private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
        
        // 取消的事件
        private final Set<SelectionKey> cancelledKeys = new HashSet<SelectionKey>();
        

        EpollArrayWrapper.poll調用完成之后, 會(huì )調用updateSelectedKeys來(lái)更新上面的仨set

        private int updateSelectedKeys() {
            int entries = pollWrapper.updated;
            int numKeysUpdated = 0;
            for (int i=0; i<entries; i++) {
                int nextFD = pollWrapper.getDescriptor(i);
                SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));
                // ski is null in the case of an interrupt
                if (ski != null) {
                    int rOps = pollWrapper.getEventOps(i);
                    if (selectedKeys.contains(ski)) {
                        if (ski.channel.translateAndSetReadyOps(rOps, ski)) {
                            numKeysUpdated++;
                        }
                    } else {
                        ski.channel.translateAndSetReadyOps(rOps, ski);
                        if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                            selectedKeys.add(ski);
                            numKeysUpdated++;
                        }
                    }
                }
            }
            return numKeysUpdated;
        

        代碼很直白,拿出事件對set比對操作。

        四、Selector類(lèi)的相關(guān)方法

        重點(diǎn)注意四個(gè)方法

        • select(): 這是一個(gè)阻塞方法,調用該方法,會(huì )阻塞,直到返回一個(gè)有事件發(fā)生的selectionKey集合
        • selectNow() :非阻塞方法,獲取不到有事件發(fā)生的selectionKey集合,也會(huì )立即返回
        • select(long):阻塞方法,如果沒(méi)有獲取到有事件發(fā)生的selectionKey集合,阻塞指定的long時(shí)間
        • selectedKeys(): 返回全部selectionKey集合,不管是否有事件發(fā)生

        可以理解:selector一直在監聽(tīng)select()

        五、Selector、SelectionKey、ServerScoketChannel、ScoketChannel的關(guān)系

         

        Server代碼:

        public class NIOServer {
            public static void main(String[] args) throws Exception{
        
                //創(chuàng  )建ServerSocketChannel -> ServerSocket
        
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        
                //得到一個(gè)Selecor對象
                Selector selector = Selector.open();
        
                //綁定一個(gè)端口6666, 在服務(wù)器端監聽(tīng)
                serverSocketChannel.socket().bind(new InetSocketAddress(6666));
                //設置為非阻塞
                serverSocketChannel.configureBlocking(false);
        
                //把 serverSocketChannel 注冊到  selector 關(guān)心 事件為 OP_ACCEPT
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        
                System.out.println("注冊后的selectionkey 數量=" + selector.keys().size()); // 1
        
        
        
                //循環(huán)等待客戶(hù)端連接
                while (true) {
        
                    //這里我們等待1秒,如果沒(méi)有事件發(fā)生, 返回
                    if(selector.select(1000) == 0) { //沒(méi)有事件發(fā)生
                        System.out.println("服務(wù)器等待了1秒,無(wú)連接");
                        continue;
                    }
        
                    //如果返回的>0, 就獲取到相關(guān)的 selectionKey集合
                    //1.如果返回的>0, 表示已經(jīng)獲取到關(guān)注的事件
                    //2. selector.selectedKeys() 返回關(guān)注事件的集合
                    //   通過(guò) selectionKeys 反向獲取通道
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    System.out.println("selectionKeys 數量 = " + selectionKeys.size());
        
                    //遍歷 Set<SelectionKey>, 使用迭代器遍歷
                    Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
        
                    while (keyIterator.hasNext()) {
                        //獲取到SelectionKey
                        SelectionKey key = keyIterator.next();
                        //根據key 對應的通道發(fā)生的事件做相應處理
                        if(key.isAcceptable()) { //如果是 OP_ACCEPT, 有新的客戶(hù)端連接
                            //該該客戶(hù)端生成一個(gè) SocketChannel
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            System.out.println("客戶(hù)端連接成功 生成了一個(gè) socketChannel " + socketChannel.hashCode());
                            //將  SocketChannel 設置為非阻塞
                            socketChannel.configureBlocking(false);
                            //將socketChannel 注冊到selector, 關(guān)注事件為 OP_READ, 同時(shí)給socketChannel
                            //關(guān)聯(lián)一個(gè)Buffer
                            socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024));
                            System.out.println("客戶(hù)端連接后 ,注冊的selectionkey 數量=" + selector.keys().size()); //2,3,4..
        
                        }
                        if(key.isReadable()) {  //發(fā)生 OP_READ
        
                            //通過(guò)key 反向獲取到對應channel
                            SocketChannel channel = (SocketChannel)key.channel();
        
                            //獲取到該channel關(guān)聯(lián)的buffer
                            ByteBuffer buffer = (ByteBuffer)key.attachment();
                            channel.read(buffer);
                            System.out.println("form 客戶(hù)端 " + new String(buffer.array()));
        
                        }
                        //手動(dòng)從集合中移動(dòng)當前的selectionKey, 防止重復操作
                        keyIterator.remove();
        
                    }
                }
            }
        }
        

        Client代碼

        public class NIOClient {
            public static void main(String[] args) throws Exception{
        
                //得到一個(gè)網(wǎng)絡(luò )通道
                SocketChannel socketChannel = SocketChannel.open();
                //設置非阻塞
                socketChannel.configureBlocking(false);
                //提供服務(wù)器端的ip 和 端口
                InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666);
                //連接服務(wù)器
                if (!socketChannel.connect(inetSocketAddress)) {
        
                    while (!socketChannel.finishConnect()) {
                        System.out.println("因為連接需要時(shí)間,客戶(hù)端不會(huì )阻塞,可以做其它工作..");
                    }
                }
        
                //...如果連接成功,就發(fā)送數據
                String str = "hello, 尚硅谷~";
                //Wraps a byte array into a buffer
                ByteBuffer buffer = ByteBuffer.wrap(str.getBytes());
                //發(fā)送數據,將 buffer 數據寫(xiě)入 channel
                socketChannel.write(buffer);
                System.in.read();
        
            }
        }
        
        

        六、總結


        jdk中Selector是對操作系統的IO多路復用調用的一個(gè)封裝,在Linux中就是對epoll的封裝。epoll實(shí)質(zhì)上是將event loop交給了內核,因為網(wǎng)絡(luò )數據都是首先到內核的,直接內核處理可以避免無(wú)謂的系統調用和數據拷貝, 性能是最好的。jdk中對IO事件的封裝是SelectionKey, 保存Channel關(guān)心的事件。

        到此這篇關(guān)于Java進(jìn)階之高并發(fā)核心Selector詳解的文章就介紹到這了,更多相關(guān)高并發(fā)核心Selector內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

        免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。

        在线视频免费观看高清| 国产精品无码MV在线观看| 午夜亚洲WWW湿好大| 久久综合88熟人妻| 国产精品刮毛| 九九久久精品国产AV片国产|