rocketmq同步刷盘和异步刷盘,rocketmq同步刷盘性能

  rocketmq同步刷盘和异步刷盘,rocketmq同步刷盘性能

  上一篇消息队列设计之同步刷盘

  异步刷盘方式:在返回写成功状态时,消息可能只是被写入了内存的页面缓存,写操作的返回快,吞吐量大;当内存里的消息量积累到一定程度时,统一触发写磁盘操作,快速写入

  消息队列默认采用异步刷盘,异步刷盘两种策略:开启缓冲池,不开启缓冲池

  CommitLog的handleDiskFlush方法:

  公共void句柄磁盘刷新(AppendMessageResult result,PutMessageResult PutMessageResult,MessageExt messageExt) { //同步flush if (FlushDiskType .SYNC _ FLUSH==这个。defaultmessagestore。getmessagestoreconfig().getFlushDiskType()){ final GroupCommitService service service=(GroupCommitService)this。flushcommitlogserviceif(messageext。iswaitstoremsgok()){ GroupCommitRequest=new GroupCommitRequest(result。getwrotefoffset()结果。getwrotebytes());service.putRequest(请求);布尔冲洗ok=请求。等待冲洗(这个。defaultmessagestore。getmessagestoreconfig().getSyncFlushTimeout());如果(!冲洗好){日志。错误(执行组提交,等待刷新失败,主题: messageext。gettopic() tags : messageext。gettags()客户端地址: messageext。getbornhoststring());putmessageresult。setputmessagestatus(PutMessageStatus .刷新磁盘超时);} } else { service。醒来();} } //异步刷新else { if(!这个。defaultmessagestore。getmessagestoreconfig().isTransientStorePoolEnable()){ flushcomitlogservice。醒来();} else { commitlogservice。醒来();} } }不开启缓冲池:默认不开启,刷盘线程FlushRealTimeService会每间隔500毫秒尝试去刷盘。

  类FlushRealTimeService扩展了flushcomitlogservice { private long lastflustimestamp=0;私有长打印时间=0;public void run(){ commit log。日志。信息(这个。获取服务名()服务已启动);而(!这个。is stopped()){ boolean flushcomitlogtimed=提交日志。这个。defaultmessagestore。getmessagestoreconfig().isFlushCommitLogTimed();//每次脸红间隔500毫秒int interval=提交日志。这个。defaultmessagestore。getmessagestoreconfig().getflushtintervalcommit

  Log();            //每次Flush最少4页内存数据(16KB)            int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();               //距离上次刷盘时间阈值为10秒            int flushPhysicQueueThoroughInterval =                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();            boolean printFlushProgress = false;            // Print flush progress            long currentTimeMillis = System.currentTimeMillis();            if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {                this.lastFlushTimestamp = currentTimeMillis;                flushPhysicQueueLeastPages = 0;                printFlushProgress = (printTimes++ % 10) == 0;            }            try {                if (flushCommitLogTimed) {                    Thread.sleep(interval);                } else {                    this.waitForRunning(interval);                }                if (printFlushProgress) {                    this.printFlushProgress();                }                long begin = System.currentTimeMillis();                CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);                long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                if (storeTimestamp > 0) {                    CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                }                long past = System.currentTimeMillis() - begin;                if (past > 500) {                    log.info("Flush data to disk costs {} ms", past);                }            } catch (Throwable e) {                CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                this.printFlushProgress();            }        }        // Normal shutdown, to ensure that all the flush before exit        boolean result = false;        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {            result = CommitLog.this.mappedFileQueue.flush(0);            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));        }        this.printFlushProgress();        CommitLog.log.info(this.getServiceName() + " service end");    }    @Override    public String getServiceName() {        return FlushRealTimeService.class.getSimpleName();    }    private void printFlushProgress() {        // CommitLog.log.info("how much disk fall behind memory, "        // + CommitLog.this.mappedFileQueue.howMuchFallBehind());    }    @Override    public long getJointime() {        return 1000 * 60 * 5;    }}判断是否超过10秒没刷盘了,如果超过强制刷盘等待Flush间隔500ms通过MappedFile刷盘设置StoreCheckpoint刷盘时间点超过500ms的刷盘记录日志Broker正常停止前,把内存page中的数据刷盘开启缓冲池:

  

class CommitRealTimeService extends FlushCommitLogService {    private long lastCommitTimestamp = 0;    @Override    public String getServiceName() {        return CommitRealTimeService.class.getSimpleName();    }    @Override    public void run() {        CommitLog.log.info(this.getServiceName() + " service started");        while (!this.isStopped()) {            //每次提交间隔200毫秒            int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();            //每次提交最少4页内存数据(16KB)            int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();            //距离上次提交时间阈值为200毫秒            int commitDataThoroughInterval =                CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();            long begin = System.currentTimeMillis();            if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {                this.lastCommitTimestamp = begin;                commitDataLeastPages = 0;            }            try {                boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);                long end = System.currentTimeMillis();                if (!result) {                    this.lastCommitTimestamp = end; // result = false means some data committed.                    //now wake up flush thread.                    flushCommitLogService.wakeup();                }                if (end - begin > 500) {                    log.info("Commit data to file costs {} ms", end - begin);                }                this.waitForRunning(interval);            } catch (Throwable e) {                CommitLog.log.error(this.getServiceName() + " service has exception. ", e);            }        }        boolean result = false;        for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {            result = CommitLog.this.mappedFileQueue.commit(0);            CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));        }        CommitLog.log.info(this.getServiceName() + " service end");    }}
RocketMQ申请一块和CommitLog文件相同大小的堆外内存来做缓冲池,数据会先写入缓冲池,提交线程CommitRealTimeService也每间隔500毫秒尝试提交到文件通道等待刷盘,刷盘最终由FlushRealTimeService来完成,和不开启缓冲池的处理一致。使用缓冲池的目的是多条数据合并写入,从而提高io性能。

  判断是否超过200毫秒没提交,需要强制提交提交到MappedFile,此时还未刷盘然后唤醒刷盘线程在Broker正常停止前,提交内存page中的数据到此这篇关于RocketMQ设计之异步刷盘的文章就介绍到这了,更多相关RocketMQ异步刷盘内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: