rocketmq同步刷盘和异步刷盘,rocketmq异步刷盘会丢数据吗

  rocketmq同步刷盘和异步刷盘,rocketmq异步刷盘会丢数据吗

  同步刷盘方式:在返回写成功状态时,消息已经被写入磁盘。具体流程是,消息写入内存的页面缓存后,立刻通知刷盘线程刷盘,然后等待刷盘完成,刷盘线程执行完成后唤醒等待的线程,返回消息写成功的状态。

  在同步刷盘模式下,当消息写到内存后,会等待数据写到磁盘的CommitLog文件。

  CommitLog的handleDiskFlush方法:

  公共void句柄磁盘刷新(AppendMessageResult result,PutMessageResult PutMessageResult,MessageExt messageExt) { //同步flush if (FlushDiskType .SYNC _ FLUSH==这个。defaultmessagestore。getmessagestoreconfig().getFlushDiskType()){ final GroupCommitService service service=(GroupCommitService)this。flushcommitlogserviceif(messageext。iswaitstoremsgok()){ GroupCommitRequest=new GroupCommitRequest(result。getwrotefoffset()结果。getwrotebytes());service.putRequest(请求);布尔冲洗ok=请求。等待冲洗(这个。defaultmessagestore。getmessagestoreconfig().getSyncFlushTimeout());如果(!冲洗好){日志。错误(执行组提交,等待刷新失败,主题: messageext。gettopic() tags : messageext。gettags()客户端地址: messageext。getbornhoststring());putmessageresult。setputmessagestatus(PutMessageStatus .刷新磁盘超时);} } else { service。醒来();} } //异步刷新else { if(!这个。defaultmessagestore。getmessagestoreconfig().isTransientStorePoolEnable()){ flushcomitlogservice。醒来();} else { commitlogservice。醒来();} } }类GroupCommitService扩展flushcomitlogservice { private volatile ListGroupCommitRequest write=new ArrayListGroupCommitRequest();private volatile ListGroupCommitRequest requests read=new ArrayListGroupCommitRequest();//提交刷盘任务到任务列表公共同步void put请求(final GroupCommitRequest){ synchronized(this。requestswrite){ this。requestswrite。添加(请求);}如果(已通知。compareandset(false,true)){等待点。倒计时();//notify } } private void swapRequests(){

           List<GroupCommitRequest> tmp = this.requestsWrite;            this.requestsWrite = this.requestsRead;            this.requestsRead = tmp;        }        private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }        public void run() {            CommitLog.log.info(this.getServiceName() + " service started");            while (!this.isStopped()) {                try {                    this.waitForRunning(10);                    this.doCommit();                } catch (Exception e) {                    CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);                }            }            // Under normal circumstances shutdown, wait for the arrival of the            // request, and then flush            try {                Thread.sleep(10);            } catch (InterruptedException e) {                CommitLog.log.warn("GroupCommitService Exception, ", e);            }            synchronized (this) {                this.swapRequests();            }            this.doCommit();            CommitLog.log.info(this.getServiceName() + " service end");        }        @Override        protected void onWaitEnd() {            this.swapRequests();        }        @Override        public String getServiceName() {            return GroupCommitService.class.getSimpleName();        }        @Override        public long getJointime() {            return 1000 * 60 * 5;        }    }GroupCommitRequest是刷盘任务,提交刷盘任务后,会在刷盘队列中等待刷盘,而刷盘线程

  GroupCommitService每隔10毫秒写一批数据到磁盘。之所以不直接写是磁盘io压力大,写入性能低,每隔10毫秒写一次可以提升磁盘io效率和写入性能。

  putRequest(request) 提交刷盘任务到任务列表request.waitForFlush同步等待GroupCommitService将任务列表中的任务刷盘完成。两个队列读写分离,requestsWrite是写队列,用户保存添加进来的刷盘任务,requestsRead是读队列,在刷盘之前会把写队列的数据放入读队列。

  CommitLog的doCommit方法:

  

private void doCommit() {            synchronized (this.requestsRead) {                if (!this.requestsRead.isEmpty()) {                    for (GroupCommitRequest req : this.requestsRead) {                        // There may be a message in the next file, so a maximum of                        // two times the flush                        boolean flushOK = false;                        for (int i = 0; i < 2 && !flushOK; i++) {                            //根据offset确定是否已经刷盘                            flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();                            if (!flushOK) {                                CommitLog.this.mappedFileQueue.flush(0);                            }                        }                        req.wakeupCustomer(flushOK);                    }                    long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();                    if (storeTimestamp > 0) {                        CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);                    }                    //清空已刷盘的列表                    this.requestsRead.clear();                } else {                    // Because of individual messages is set to not sync flush, it                    // will come to this process                    CommitLog.this.mappedFileQueue.flush(0);                }            }        }
刷盘的时候依次读取requestsRead中的数据写入磁盘,写入完成后清空requestsRead。读写分离设计的目的是在刷盘时不影响任务提交到列表。

  CommitLog.this.mappedFileQueue.flush(0);是刷盘操作:

  

public boolean flush(final int flushLeastPages) {    boolean result = true;    MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);    if (mappedFile != null) {        long tmpTimeStamp = mappedFile.getStoreTimestamp();        int offset = mappedFile.flush(flushLeastPages);        long where = mappedFile.getFileFromOffset() + offset;        result = where == this.flushedWhere;        this.flushedWhere = where;        if (0 == flushLeastPages) {            this.storeTimestamp = tmpTimeStamp;        }    }    return result;}
通过MappedFile映射的CommitLog文件写入磁盘

  这就是RocketMQ高可用设计之同步刷盘的基本情况了,大体思路就是一个读写分离的队列来刷盘,同步刷盘任务提交后会在刷盘队列中等待刷盘完成后再返回,而GroupCommitService每隔10毫秒写一批数据到磁盘。

  到此这篇关于RocketMQ设计之同步刷盘的文章就介绍到这了,更多相关RocketMQ同步刷盘内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: