- 資訊首頁(yè) > 開(kāi)發(fā)技術(shù) > 編程語(yǔ)言 >
- Java多線(xiàn)程之Disruptor入門(mén)
Disruptor目前是世界上最快的單機消息隊列,由英國外匯交易公司LMAX開(kāi)發(fā),研發(fā)的初衷是解決內存隊列的延遲問(wèn)題(在性能測試中發(fā)現竟然與I/O操作處于同樣的數量級)?;贒isruptor開(kāi)發(fā)的系統單線(xiàn)程能支撐每秒600萬(wàn)訂單,2010年在QCon演講后,獲得了業(yè)界關(guān)注。2011年,企業(yè)應用軟件專(zhuān)家Martin Fowler專(zhuān)門(mén)撰寫(xiě)長(cháng)文介紹。同年它還獲得了Oracle官方的Duke大獎。目前,包括Apache Storm、Camel、Log4j 2在內的很多知名項目都應用了Disruptor以獲取高性能。
Disruptor維護了一個(gè)環(huán)形隊列RingBuffer,這個(gè)隊列本質(zhì)上是一個(gè)首位相連的數組。相比于LinkedBlockdingQueue,RingBuffer的數組結構在查找方面效率更高。此外,LinkedBlockingQueue需要維護一個(gè)頭節點(diǎn)指針head和一個(gè)尾節點(diǎn)指針tail,而RingBuffer只需要維護一個(gè)sequence指向下一個(gè)可用的位置即可。所以從這兩點(diǎn)來(lái)說(shuō),RingBuffer比LinkedBlockingQueue要快。
<dependency> <groupId>com.lmax</groupId> <artifactId>disruptor</artifactId> <version>3.4.3</version> </dependency>
Disruptor是基于事件的生產(chǎn)者消費者模型。其RingBuffer中存放的其實(shí)是將消息封裝成的事件。這里定義了一個(gè)LongEvent,表示消息隊列中存放的是long類(lèi)型的數據。
public class LongEvent { private long value; public void set(long value) { this.value = value; } @Override public String toString() { return "LongEvent{" + "value=" + value + '}'; } }
實(shí)現EventFactory接口,定義Event工廠(chǎng),用于填充隊列。Event工廠(chǎng)其實(shí)是為了提高Disruptor的效率,初始化的時(shí)候,會(huì )調用Event工廠(chǎng),對RingBuffer進(jìn)行內存的提前分配,GC的頻率會(huì )降低。
import com.lmax.disruptor.EventFactory; public class LongEventFactory implements EventFactory<LongEvent> { public LongEvent newInstance() { return new LongEvent(); } }
實(shí)現EventHandler接口,定義EventHandler(消費者),處理容器中的元素。
import com.lmax.disruptor.EventHandler; public class LongEventHandler implements EventHandler<LongEvent> { public void onEvent(LongEvent event, long sequence, boolean endOfBatch) { System.out.println("Event: " + event + ", sequence: " + sequence); } }
import cn.flying.space.disruptor.demo.LongEvent; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; /** * 定義一個(gè)生產(chǎn)者,往Disruptor中投遞消息 */ public class LongEventProducer { private RingBuffer<LongEvent> ringBuffer; public LongEventProducer(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer byteBuffer) { // 定位到下一個(gè)可存放的位置 long sequence = ringBuffer.next(); try { // 拿到該位置的event LongEvent event = ringBuffer.get(sequence); // 設置event的值 event.set(byteBuffer.getLong(0)); } finally { // 發(fā)布 ringBuffer.publish(sequence); } } } import cn.flying.space.disruptor.demo.LongEvent; import cn.flying.space.disruptor.demo.LongEventFactory; import cn.flying.space.disruptor.demo.LongEventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import java.nio.ByteBuffer; import java.util.concurrent.Executors; public class TestMain { public static void main(String[] args) throws InterruptedException { // 定義event工廠(chǎng) LongEventFactory factory = new LongEventFactory(); // ringBuffer長(cháng)度 int bufferSize = 1024; // 構造一個(gè)Disruptor Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory()); // 綁定handler disruptor.handleEventsWith(new LongEventHandler()); // 啟動(dòng)Disruptor disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducer producer = new LongEventProducer(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0; true; i++) { byteBuffer.clear(); byteBuffer.putLong(i); // 投遞消息 producer.onData(byteBuffer); Thread.sleep(1000); } } }
import cn.flying.space.disruptor.demo.LongEvent; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; import java.nio.ByteBuffer; public class LongEventProducerUsingTranslator { private RingBuffer<LongEvent> ringBuffer; public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) { longEvent.set(byteBuffer.getLong(0)); } }; public void onData(ByteBuffer byteBuffer) { ringBuffer.publishEvent(TRANSLATOR, byteBuffer); } } import cn.flying.space.disruptor.demo.LongEvent; import cn.flying.space.disruptor.demo.LongEventFactory; import cn.flying.space.disruptor.demo.LongEventHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.util.DaemonThreadFactory; import java.nio.ByteBuffer; /** * @author ZhangSheng * @date 2021-4-26 14:23 */ public class TestMain { public static void main(String[] args) throws InterruptedException { LongEventFactory factory = new LongEventFactory(); int bufferSize = 1024; Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE); disruptor.handleEventsWith(new LongEventHandler()); disruptor.start(); RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer(); LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer); ByteBuffer byteBuffer = ByteBuffer.allocate(8); for (long i = 0L; true; i++) { byteBuffer.putLong(0, i); // 發(fā)布 producer.onData(byteBuffer); Thread.sleep(1000); } } }
到此這篇關(guān)于Java多線(xiàn)程之Disruptor入門(mén)的文章就介紹到這了,更多相關(guān)Java Disruptor入門(mén)內容請搜索腳本之家以前的文章或繼續瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng )、來(lái)自本網(wǎng)站內容采集于網(wǎng)絡(luò )互聯(lián)網(wǎng)轉載等其它媒體和分享為主,內容觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如侵犯了原作者的版權,請告知一經(jīng)查實(shí),將立刻刪除涉嫌侵權內容,聯(lián)系我們QQ:712375056,同時(shí)歡迎投稿傳遞力量。
Copyright ? 2009-2022 56dr.com. All Rights Reserved. 特網(wǎng)科技 特網(wǎng)云 版權所有 特網(wǎng)科技 粵ICP備16109289號
域名注冊服務(wù)機構:阿里云計算有限公司(萬(wàn)網(wǎng)) 域名服務(wù)機構:煙臺帝思普網(wǎng)絡(luò )科技有限公司(DNSPod) CDN服務(wù):阿里云計算有限公司 百度云 中國互聯(lián)網(wǎng)舉報中心 增值電信業(yè)務(wù)經(jīng)營(yíng)許可證B2
建議您使用Chrome、Firefox、Edge、IE10及以上版本和360等主流瀏覽器瀏覽本網(wǎng)站