disruptor和线程池,
00-1010前言碎片化核心概念?实践干扰者结论
目录
Disruptor是一个开源的高性能并发框架,用于英国LMAX公司线程间的消息传递。它与jdk中的BlockingQueue非常相似,但性能与BlockingQueue无法相比。下面是官方给出的测试报告,可以直观的看到两者的性能差异:
颠覆者项目地址:https://github.com/LMAX-Exchange/disruptor
00-1010表演这么破的框架一定要玩。在试用之前,我们先了解一下颠覆者的主要概念,然后结合楼主的weblog项目(之前用的BlockingQueue)进行实践。
RingBuffer:环形缓冲区,消息事件信息的载体。RingBuffer曾经是Disruptor中的主要对象,但是从3.0版本开始,它的职责被简化为只负责存储和更新通过Disruptor交换的数据(事件)。在一些更高级的应用场景中,环形缓冲区完全可以被用户自定义实现所替代。
事件:定义生产者和消费者之间交换的数据类型。
EventFactory:创建事件的工厂类接口,由用户实现,提供特定事件。
EventHandler:事件处理接口,由用户实现,用于处理事件。
到目前为止,我们只需要知道上面的核心内容。更多细节,我们可以转到维基文档:https://github.com/LMAX-Exchange/disruptor.
核心架构图:
00-1010转换boot-websocket-log项目,这是一个典型的生产者-消费者模型的例子。然后用Disruptor替换BlockingQueue来完成功能。有兴趣的可以对比一下。
第一步是定义事件类型。
/* * *由Klon 2018/8/24创建。* Content :进程日志事件内容载体*/Public Class Logger事件{Private Logger消息日志;public logger message getLog(){ return log;} public void set log(logger message log){ this . log=log;}}第二步,定义事件工厂。
/* * *由Klon 2018/8/24创建。* Content 3360进程日志事件工厂类*/公共类LoggerEvent Factory实现事件工厂{ @ override Public logger Event new instance(){ return new logger Event();}}第三步,定义数据处理器。
/* * *由Klon 2018/8/24创建。* Content :进程日志事件处理程序*/@组件公共类记录器Event Handler实现EventHandler { @ auto wired Private simple Messaging Template消息传递模板;@ Override public void onEvent(LoggerEvent stringEvent,long l,boolean b){ messaging template . convertandsend(/topic/pull logger ,stringEvent . getlog());}}第四步,创建干扰器实用类,定义事件发布方法,发布事件。
/* * *由Klon 2018/8/24创建。* Content 3360 Disruptor循环队列*/@ Component公共类LoggerDisruptor队列{ Private Executor executors=executors . new cached thread pool();//f
actory for the event private LoggerEventFactory factory = new LoggerEventFactory(); private FileLoggerEventFactory fileLoggerEventFactory = new FileLoggerEventFactory(); // Specify the size of the ring buffer, must be power of 2. private int bufferSize = 2 * 1024; // Construct the Disruptor private Disruptordisruptor = new Disruptor<>(factory, bufferSize, executor);; private DisruptorfileLoggerEventDisruptor = new Disruptor<>(fileLoggerEventFactory, bufferSize, executor);; private static RingBufferringBuffer; private static RingBufferfileLoggerEventRingBuffer; @Autowired LoggerDisruptorQueue(LoggerEventHandler eventHandler,FileLoggerEventHandler fileLoggerEventHandler) { disruptor.handleEventsWith(eventHandler); fileLoggerEventDisruptor.handleEventsWith(fileLoggerEventHandler); this.ringBuffer = disruptor.getRingBuffer(); this.fileLoggerEventRingBuffer = fileLoggerEventDisruptor.getRingBuffer(); disruptor.start(); fileLoggerEventDisruptor.start(); } public static void publishEvent(LoggerMessage log) { long sequence = ringBuffer.next(); // Grab the next sequence try { LoggerEvent event = ringBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.setLog(log); // Fill with data } finally { ringBuffer.publish(sequence); } } public static void publishEvent(String log) { if(fileLoggerEventRingBuffer == null) return; long sequence = fileLoggerEventRingBuffer.next(); // Grab the next sequence try { FileLoggerEvent event = fileLoggerEventRingBuffer.get(sequence); // Get the entry in the Disruptor // for the sequence event.setLog(log); // Fill with data } finally { fileLoggerEventRingBuffer.publish(sequence); } }}
文末结语
以上四步已经完成了Disruptor的使用,启动项目后就会不断的发布日志事件,处理器会将事件内容通过websocket传送到前端页面上展示,
boot-websocket-log项目地址:https://gitee.com/kailing/boot-websocket-log
Disruptor是高性能的进程内线程间的数据交换框架,特别适合日志类的处理。Disruptor也是从https://github.com/alipay/sofa-tracer了解到的,这是蚂蚁金服 团队开源的分布式链路追踪项目,其中日志处理部分就是使用了Disruptor。
以上就是浅析Disruptor高性能线程消息传递并发框架的详细内容,更多关于Disruptor线程消息传递并发框架的资料请关注盛行IT其它相关文章!
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。