rocketmq常见问题,rocketmq技术

  rocketmq常见问题,rocketmq技术

  名称服务器为了简化和客户端通信,发现经纪人故障时并不会立即通知客户端。故障规避机制就是用来解决当经纪人出现故障,制作人不能及时感知而导致消息发送失败的问题。默认不开启,如果开启,消息发送失败的时候会将失败的经纪人暂时排除在队列选择列表外

  MQFaultStrategy类的:

  公众的类MQFaultStrategy { private final静态内部记录器log=client logger。getlog();private final latencyFaultTolerance字符串latencyFaultTolerance=new latencyFaultTolerance impl();private boolean sendLatencyFaultEnable=false;private long[] latencyMax={50L,100L,550L,1000L,2000L,3000L,15000 l };private long[]notAvailableDuration={ 0L,0L,30000L,60000L,120000L,180000L,60000L };public long[]getNotAvailableDuration(){ return notAvailableDuration;} public void setNotAvailableDuration(final long[]notAvailableDuration){ this。notAvailableDuration=notAvailableDuration;} public long[]getLatencyMax(){ return latencyMax;} public void setLatencyMax(final long[]latencyMax){ this。latencyMax=latencyMax} public boolean isSendLatencyFaultEnable(){ return senddlatencyfaultenable;} public void setSendLatencyFaultEnable(final布尔值sendLatencyFaultEnable){ this。sendLatencyFaultEnable=sendLatencyFaultEnable;}公共消息队列selectOneMessageQueue(final TopicPublishInfo tpInfo,final String lastBrokerName) { //是否开启故障延迟机制如果(这个。sendlatencyfaultenable){ try { int index=TP info。getsendwithqueue().getAndIncrement();for(int I=0;i tpInfo.getMessageQueueList().size();I){ int pos=math。ABS(索引)% tpinfo。getmessagequeuelist().size();if(pos 0)pos=0;消息队列MQ=tpinfo。getmessagequeuelist().get(pos);//判断长队是否可用if(latencyfaulttolerance。可用(MQ。getbrokername()){ if(null==lastBrokerName MQ。getbrokername()).等于(lastBrokerName))返回MQ;} }最终字符串notBestBroker=latencyfaulttolerance。pickoneatual();int writeQueu

  eNums = tpInfo.getQueueIdByBroker(notBestBroker);                if (writeQueueNums > 0) {                    final MessageQueue mq = tpInfo.selectOneMessageQueue();                    if (notBestBroker != null) {                        mq.setBrokerName(notBestBroker);                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);                    }                    return mq;                } else {                    latencyFaultTolerance.remove(notBestBroker);                }            } catch (Exception e) {                log.error("Error occurred when selecting message queue", e);            }            return tpInfo.selectOneMessageQueue();        }        //默认轮询        return tpInfo.selectOneMessageQueue(lastBrokerName);    }    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {        if (this.sendLatencyFaultEnable) {            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);        }    }    private long computeNotAvailableDuration(final long currentLatency) {        for (int i = latencyMax.length - 1; i >= 0; i--) {            if (currentLatency >= latencyMax[i])                return this.notAvailableDuration[i];        }        return 0;    }}在选择查找路由时,选择消息队列的关键步骤:

  先按轮询算法选择一个消息队列从故障列表判断该消息队列是否可用LatencyFaultToleranceImpl中判断是否可用:

  

@Overridepublic boolean isAvailable(final String name) {    final FaultItem faultItem = this.faultItemTable.get(name);    if (faultItem != null) {        return faultItem.isAvailable();    }    return true;}public boolean isAvailable() {            return (System.currentTimeMillis() - startTimestamp) >= 0;        }
判断是否在故障列表中,不在故障列表中代表可用。在故障列表中判断当前时间是否大于等于故障规避的开始时间startTimestamp在消息发送结束后和发送出现异常时调用updateFaultItem()方法来更新故障列表,computeNotAvailableDuration()根据响应时间来计算故障周期时长,响应时间越长故障周期越长。网络异常、Broker异常、客户端异常都是固定响应时长30s,它们故障周期时长为10分钟。消息发送成功或线程中断异常响应时间在100毫秒以内,故障周期时长为0。

  LatencyFaultToleranceImpl类的updateFaultItem方法:

  

@Overridepublic void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {    FaultItem old = this.faultItemTable.get(name);    if (null == old) {        final FaultItem faultItem = new FaultItem(name);        faultItem.setCurrentLatency(currentLatency);        faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        //加入故障列表        old = this.faultItemTable.putIfAbsent(name, faultItem);        if (old != null) {            old.setCurrentLatency(currentLatency);            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);        }    } else {        old.setCurrentLatency(currentLatency);        old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);    }}
FaultItem存储Broker名称、响应时长、故障规避开始时间,最重要的是故障规避开始时间,用来判断Queue是否可用

  到此这篇关于RocketMQ设计之故障规避机制的文章就介绍到这了,更多相关RocketMQ故障规避机制内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: