异步批处理教程(异步处理程序)

  本篇文章为你整理了异步批处理教程(异步处理程序)的详细内容,包含有批量异步 异步处理程序 异步处理通用方法 异步处理是什么意思 异步批处理教程,希望能帮助你了解 异步批处理教程。

  书接上回 大数据量、高并发业务怎么优化?(一) 文章中介绍了异步批处理的三种方式,本文继续深入针对前两种进行讲解,并给出代码示例:

  一 普通版本,采用阻塞队列ArrayBlockingQueue

  使用普通方式能够直接基于JDK中现成的并发包ArrayBlockingQueue 提供的 offer(E e, long timeout, TimeUnit unit)(添加元素到队列尾部,如果队列已满则等待参数指定时间后返回false)方法 和 poll(long timeout, TimeUnit unit)(从队列头部获取元素,如果队列为空则等待参数指定时间后返回null)方法,来达到异步批处理效果

  生产者代码:由于采用内存队列,最好在创建 ArrayBlockingQueue 时指定队列大小,防止队列无界,导致内存溢出

  

/**

 

   * 生产者

  @Component

  @Slf4j

  public class MonitorQueue {

   private BlockingQueue List NodeCollectDTO queue = new ArrayBlockingQueue (10000000);

   public void put(List NodeCollectDTO list) {

   try {

   queue.put(list);

   } catch (InterruptedException e) {

   log.error(String.format("队列put异常:%s", e.getMessage()), e);

   public void offer(List NodeCollectDTO list, long timeout, TimeUnit unit) throws InterruptedException {

   queue.offer(list, timeout, unit);

   public List NodeCollectDTO poll(long timeout, TimeUnit unit) throws InterruptedException {

   return queue.poll(timeout, unit);

  

 

  消费者代码:在创建生产者时开启一个子线程在死循环中一直读取队列元素,直到队列元素超过我们的 maxNum 时,将临时列表元素插入数据库中

  

/**

 

   * 消费者

  @Slf4j

  @Component

  public class MonitorConsumer implements Runnable {

   @Autowired

   private MonitorQueue queue;

   @Autowired

   private MonitorService monitorService;

   @PostConstruct

   public void init() {

   new Thread(this, "monitor-collect").start();

   // 临时列表大小限制

   private int maxNum = 2000;

   @SuppressWarnings("InfiniteLoopStatement")

   @Override

   public void run() {

   while (true) {

   handler();

   private void handler() {

   try {

   List NodeCollectDTO temp = new ArrayList (maxNum);

   while (temp.size() = maxNum) {

   List NodeCollectDTO list = queue.poll(20, TimeUnit.SECONDS);

   if (CollectionUtil.isNotEmpty(list)) {

   temp.addAll(list);

   } else {

   break;

   if (CollectionUtil.isEmpty(temp)) {

   return;

   int i = monitorService.batchSave(temp);

   log.debug("----------------------------batchSave num:{}, collect.size:{}", i, collect.size());

   } catch (Exception e) {

   log.error(String.format("消费者异常: %s", e.getMessage()), e);

  

 

  可以看到采用该种方式实现的异步批量入库代码比较简单,便于理解,在性能上,基本都能够满足日常普通业务存在的批量入库场景

  二 进阶版,采用 Disruptor 队列,本文基于 Disruptor 最新4.0版本

  先给出 Disruptor 官网简介

  Disruptor 是一个提供并发环形缓冲区数据结构的库。它旨在在异步事件处理架构中提供低延迟、高吞吐量的工作队列。
 

  为了理解 Disruptor 的好处,我们可以将它与一些很好理解且目的非常相似的东西进行比较。在 Disruptor 的情况下,这将是 Java 的 BlockingQueue。与队列一样,Disruptor 的目的是在同一进程内的线程之间移动数据(例如消息或事件)。然而,Disruptor 提供的一些关键特性使其有别于队列。他们是:
 

  向消费者多播事件,带有消费者依赖图。
 

  为事件预分配内存。
 

  可选无锁

  Disruptor 给我们在项目中实现异步批处理提供了另一种方式,一种无锁、延迟更低、吞吐量更高、提供消费者多播等等的内存队列

  下面介绍如何使用

  2.1 依赖安装

  

 dependency 

 

   groupId com.lmax /groupId

   artifactId disruptor /artifactId

   version 4.0.0.RC1 /version

   /dependency

  

 

  2.2 Disruptor 使用代码如下:

  

public class LongEvent{

 

   private long value;

   public void set(long value){

   this.value = value;

   @Override

   public String toString(){

   return "LongEvent{" + "value=" + value + };

  @Slf4j

  public class LongEventMain {

   public static void handleEvent(LongEvent event, long sequence, boolean endOfBatch) {

   log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);

   public static void translate(LongEvent event, long sequence, ByteBuffer buffer) {

   event.set(buffer.getLong(0));

   public static void main(String[] args) throws Exception {

   int bufferSize = 128;

   // 1. 创建Disruptor对象

   Disruptor LongEvent disruptor =

   new Disruptor (LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.SINGLE, new BlockingWaitStrategy());

   // 2. 添加事件处理类(消费者)

   disruptor.handleEventsWith(LongEventMain::handleEvent);

   // 3. 开启事件处理线程

   disruptor.start();

   // 4. 获取ringBuffer

   RingBuffer LongEvent ringBuffer = disruptor.getRingBuffer();

   ByteBuffer bb = ByteBuffer.allocate(8);

   for (long l = 0; true; l++) {

   bb.putLong(0, l);

   // 5. 发布事件(生产者)

   ringBuffer.publishEvent(LongEventMain::translate, bb);

   Thread.sleep(1);

  

 

  2.3 上面代码完成了一个事件发布后,事件处理类就能够收到对应事件信息的功能,但是我们想要的是能在消费者线程中批量处理生产者数据的逻辑,还得再修改一下事件处理类代码,如下:

  

@Slf4j

 

  public class LongEventBatch implements EventHandler LongEvent {

   private static final int MAX_BATCH_SIZE = 20;

   private final List LongEvent batch = new ArrayList ();

   public LongEventBatch() {

   // 虚拟机关闭处理

   Runtime.getRuntime().addShutdownHook(new Thread(() - {

   log.info("------------------ShutdownHook-DataEventHandler,上报tempList");

   if (batch.size() 0) {

   // 批量入库伪代码

   int i = xxxService.batchSave(temp);

   }));

   @Override

   public void onEvent(final LongEvent event, final long sequence, final boolean endOfBatch) {

   log.info("event: " + event + ", sequence:" + sequence + ", endOfBatch:" + endOfBatch);

   batch.add(event);

   if (batch.size() = MAX_BATCH_SIZE) {

   processBatch(batch);

   private void processBatch(final List LongEvent batch) {

   // 批量入库伪代码

   int i = xxxService.batchSave(temp);

   // 记得清空batch列表

   batch.clear();

  

 

  由此,我们就实现了基于 Disruptor 的异步批处理逻辑,该方式会比普通版本性能高出一个数量级,大家在工作中可以尝试使用一番

  附博主 github 地址 https://github.com/wayn111

  以上就是异步批处理教程(异步处理程序)的详细内容,想要了解更多 异步批处理教程的内容,请持续关注盛行IT软件开发工作室。

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: