rocketmq同步复制原理,rocketmq同步双写

  rocketmq同步复制原理,rocketmq同步双写

  00-1010一、主从副本二。读写分离

  

目录
RocketMQ为了提高消费的高可用性,需要避免代理的单点故障导致代理上的消息不能及时消费,同时需要避免单机上的硬盘损坏导致消费数据丢失。

  RocketMQ采用代理数据的主从复制机制。当消息发送到主服务器时,它会将消息同步到从服务器。如果主服务器关闭,消息消费者可以继续从从服务器获取消息。

  有两种方法可以将消息从主服务器复制到从服务器:同步复制SYNC_MASTER和异步复制ASYNC_MASTER。

  通过配置文件conf/broker.conf文件配置:

  #根据一个或多个# contributor许可协议授权给Apache Software Foundation (ASF)。有关版权所有权的其他信息,请参见# this work附带的通知文件。ASF根据Apache许可证版本2.0#(下称“许可证”)将此文件许可给您;除非符合#许可证,否则您不得使用此文件。您可以在# # http://www.apache.org/licenses/LICENSE-2.0# #获得许可证的副本,除非适用法律要求或书面同意,根据许可证分发的软件#按“原样”分发,#不附带任何种类的明示或暗示的担保或条件。#请参阅许可协议,了解许可协议下管理权限的特定语言和#限制。broker cluster name=DefaultClusterbrokerName=broker-abrokerId=0 delete when=04 filerereservedtime=48 broker role=asynn C _ MASTERflushDiskType=ASYNC _ FLUSH对brokerRole参数进行设置:

  在向客户端返回成功写入的状态之前,同步复制:和从属都成功写入。

  优点:的主服务器坏了,从服务器有所有数据的备份,很容易恢复到主服务器。缺点:有一个额外的同步等待步骤,这会增加数据写入延迟并降低系统吞吐量。只有当异步复制:的主服务器写成功时,它才能向客户端返回写成功的状态。

  优点:没有同步等待步骤,延迟低,吞吐量高。如果缺点:主服务器出现故障,一些数据可能无法写入从服务器,未同步的数据可能会丢失。在实际应用中,需要结合业务场景合理设置磁盘刷机模式和主从复制模式。不建议使用同步刷盘,因为频繁触发写盘操作,性能下降明显。* *通常情况下,主从设置为异步刷机和同步拷贝,以保证数据不丢失。* *这样,即使一台服务器出现故障,仍然可以保证数据不会丢失。

  00-1010读写分离机制是高性能和高可用性架构中的常见设计。比如Mysql就实现了读写分离机制。客户端只能从主服务器写入数据,可以从主服务器和从服务器读取数据。

  RocketMQ的消费者拉取消息时,代理会判断主服务器的消息累积情况,以确定消费者是否从从服务器拉取消息进行消费。首先,默认情况下,组消息是从主服务器上获取的。如果主服务器的消息累积超过物理内存的40%,则将消息结果返回给消费者并通知消费者。下一次,消息将从其他从属服务器中提取。

  RocketMQ有自己的一套读写分离逻辑。

  辑,会判断主服务器的消息堆积量来决定消费者是否向从服务器拉取消息消费。

  Consumer在向 Broker 发送消息拉取请求时,会根据筛选出来的消息队列,判定是从Master,还是从Slave拉取消息,默认是Master。

  Broker 接收到消息消费者拉取请求,在获取本地堆积的消息量后,会计算服务器的消息堆积量是否大于物理内存的一定值,如果是,则标记下次从 Slave服务器拉取,计算 Slave服务器的 Broker Id,并响应给消费者。

  Consumer在接收到 Broker的响应后,会把消息队列与建议下一次拉取节点的 Broker Id 关联起来,并缓存在内存中,以便下次拉取消息时,确定从哪个节点发送请求。

  

public class GetMessageResult {    private final List<SelectMappedBufferResult> messageMapedList =        new ArrayList<SelectMappedBufferResult>(100);    private final List<ByteBuffer> messageBufferList = new ArrayList<ByteBuffer>(100);    private GetMessageStatus status;    private long nextBeginOffset;    private long minOffset;    private long maxOffset;    private int bufferTotalSize = 0;    // 标识是否通过Slave拉拉取消息    private boolean suggestPullingFromSlave = false;    private int msgCount4Commercial = 0;}// 针对消息堆积量过大会切换到Slave进行查询。// maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量。// TOTAL_PHYSICAL_MEMORY_SIZE 表示当前系统物理内存,accessMessageInMemoryMaxRatio 的默认值为 40,// 以上逻辑即可算出当前消息堆积量是否大于物理内存的 40%,如果大于则将 suggestPullingFromSlave 设置为 true。long diff = maxOffsetPy - maxPhyOffsetPulling;long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE    * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));getResult.setSuggestPullingFromSlave(diff > memory);
决定消费者是否向从服务器拉取消息消费的值存在 GetMessageResult 类中。suggestPullingFromSlave的默认值为 false,即默认消费者不会消费从服务器,但它会在消费者发送消息拉取请求时,动态改变该值,Broker 接收、处理消费者拉取消息请求。针对本MessageQueue消息堆积量过大会切换到Slave进行查询,maxOffsetPy 为当前最大物理偏移量,maxPhyOffsetPulling 为本次消息拉取最大物理偏移量,他们的差即可表示消息堆积量,当前消息堆积量是否大于物理内存的 40%就会切换到Slave进行查询。
public class PullMessageResponseHeader implements CommandCustomHeader {    // suggestWhichBrokerId标识从哪个broker进行查询    private Long suggestWhichBrokerId;    private Long nextBeginOffset;    private Long minOffset;    private Long maxOffset;}public class PullMessageProcessor implements NettyRequestProcessor {    private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)        throws RemotingCommandException {        RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);        final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();        final PullMessageRequestHeader requestHeader =            (PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);        response.setOpaque(request.getOpaque());        final GetMessageResult getMessageResult =            this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),                requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);        if (getMessageResult != null) {            response.setRemark(getMessageResult.getStatus().name());            responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());            responseHeader.setMinOffset(getMessageResult.getMinOffset());            responseHeader.setMaxOffset(getMessageResult.getMaxOffset());            // 建议从slave消费消息            if (getMessageResult.isSuggestPullingFromSlave()) {                // 从slave查询                responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());            } else {                // 从master查询                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);            }            switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {                case ASYNC_MASTER:                case SYNC_MASTER:                    break;                case SLAVE:                    // 针对SLAVE需要判断是否可读,不可读的情况下读MASTER                    if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {                        response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);                        responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);                    }                    break;            }            if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {                // consume too slow ,redirect to another machine                if (getMessageResult.isSuggestPullingFromSlave()) {                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());                }                // consume ok                else {                    responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());                }            } else {                responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);            }        }        return response;    }}
PullMessageResponseHeadersuggestWhichBrokerId标识某个MessageQueue的消息从具体的brokerId进行查询。针对Slave不可读的情况会设置为从MASTER_ID进行查询。

  

public class PullAPIWrapper {    private final InternalLogger log = ClientLogger.getLog();    private final MQClientInstance mQClientFactory;    private final String consumerGroup;    private final boolean unitMode;    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);    private volatile boolean connectBrokerByUser = false;    private volatile long defaultBrokerId = MixAll.MASTER_ID;    private Random random = new Random(System.currentTimeMillis());    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();    public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,        final SubscriptionData subscriptionData) {        PullResultExt pullResultExt = (PullResultExt) pullResult;        // 处理MessageQueue对应拉取的brokerId        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());        // 省略相关代码        pullResultExt.setMessageBinary(null);        return pullResult;    }    public void updatePullFromWhichNode(final MessageQueue mq, final long brokerId) {        // 保存在pullFromWhichNodeTable对象中        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);        if (null == suggest) {            this.pullFromWhichNodeTable.put(mq, new AtomicLong(brokerId));        } else {            suggest.set(brokerId);        }    }}
Consumer收到拉取响应回来的数据后,会将下次建议拉取的 brokerId缓存起来。

  

public class PullAPIWrapper {    private final InternalLogger log = ClientLogger.getLog();    private final MQClientInstance mQClientFactory;    private final String consumerGroup;    private final boolean unitMode;    private ConcurrentMap<MessageQueue, AtomicLong/* brokerId */> pullFromWhichNodeTable =        new ConcurrentHashMap<MessageQueue, AtomicLong>(32);    private volatile boolean connectBrokerByUser = false;    private volatile long defaultBrokerId = MixAll.MASTER_ID;    private Random random = new Random(System.currentTimeMillis());    private ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();    public PullResult pullKernelImpl(        final MessageQueue mq,        final String subExpression,        final String expressionType,        final long subVersion,        final long offset,        final int maxNums,        final int sysFlag,        final long commitOffset,        final long brokerSuspendMaxTimeMillis,        final long timeoutMillis,        final CommunicationMode communicationMode,        final PullCallback pullCallback    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        // 查找MessageQueue应该从brokerName的哪个节点查询        FindBrokerResult findBrokerResult =            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),                this.recalculatePullFromWhichNode(mq), false);        if (null == findBrokerResult) {            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());            findBrokerResult =                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),                    this.recalculatePullFromWhichNode(mq), false);        }        if (findBrokerResult != null) {            {                // check version                if (!ExpressionType.isTagType(expressionType)                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);                }            }            int sysFlagInner = sysFlag;            if (findBrokerResult.isSlave()) {                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);            }            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();            requestHeader.setConsumerGroup(this.consumerGroup);            requestHeader.setTopic(mq.getTopic());            requestHeader.setQueueId(mq.getQueueId());            requestHeader.setQueueOffset(offset);            requestHeader.setMaxMsgNums(maxNums);            requestHeader.setSysFlag(sysFlagInner);            requestHeader.setCommitOffset(commitOffset);            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);            requestHeader.setSubscription(subExpression);            requestHeader.setSubVersion(subVersion);            requestHeader.setExpressionType(expressionType);            String brokerAddr = findBrokerResult.getBrokerAddr();            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);            }            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(                brokerAddr,                requestHeader,                timeoutMillis,                communicationMode,                pullCallback);            return pullResult;        }        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);    }    public long recalculatePullFromWhichNode(final MessageQueue mq) {        if (this.isConnectBrokerByUser()) {            return this.defaultBrokerId;        }        AtomicLong suggest = this.pullFromWhichNodeTable.get(mq);        if (suggest != null) {            return suggest.get();        }        return MixAll.MASTER_ID;    }}
Consumer拉取消息的时候会从 pullFromWhichNodeTable 中取出拉取 brokerId确定去具体的broker进行查询。

  到此这篇关于RocketMQ设计之主从复制和读写分离的文章就介绍到这了,更多相关RocketMQ从复制和读写分离内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: