RocketMQ原理,rocketmq原理和最佳实践

  RocketMQ原理,rocketmq原理和最佳实践

  学任何技术都是两步骤:

  搭建环境编译我也不例外,直接搞起来。

  一、RocketMQ的安装1、文档官方网站

  http://rocketmq.apache.org

  开源代码库

  https://github.com/apache/rocketmq

  2、下载wget https://镜子。有点。edu。cn/阿帕奇/rocket MQ/4。7 .0/火箭MQ-all-4。7 .0-bin-释放。活力我们是基于Centos8来的,面向官方文档学习,所以下载地址自然也是官方的。

  去官方网站找合适的版本进行下载,目前我这里最新的是4.7.0版本。

  http://rocketmq.apache.org/dowloading/releases/

  https://www.apache.org/dyn/closer.cgi?path=rocket MQ/4。7 .0/火箭MQ-all-4。7 .0-bin-释放。活力

  3、准备工作3.1、解压解压火箭MQ-all-4。7 .0-bin-释放。邮编3.2,安装jdksudo yum安装java-1.8.0-openjdk-devel4、启动4.1、启动vcd火箭MQ-all-4。7 .0-bin-释放/bin。/mqnamesrv4.2,启动经纪人CD火箭MQ-所有-4。7 .0-bin-释放/bin。/mqbroker -n本地主机:9876常见错误以及解决方案:

  常见错误:启动经纪人失败无法分配内存

  [root@node-113b bin]# ./mqbroker -n本地主机:9876

  Java热点(TM) 64位服务器虚拟机警告:信息:OS:commit _ memory(0x 000000005 c 0000000,8589934592,0)失败

  ;错误="无法分配内存"(errno=12)#

  #内存不足,Java运行时环境无法继续。

  #本机内存分配(mmap)无法映射8589934592字节以提交保留的内存。

  #包含更多信息的错误报告文件另存为:

  #/usr/local/rocket MQ/bin/hs _ err _ PID 1997。原木解决方案:

  是由于默认内存分配的太大了,超出了本机内存,直接伯父了。

  修改bin/目录下的如下两个脚本

  runbroker.sh

  runserver.sh在这两个脚本里都搜索-server -Xms,将其内存分配小点,自己玩的话512MB就足够了,够够的了!

  4.3、启动成功标识名称城郊多用途车启动成功标识:

  经纪人启动成功标识:

  二、RocketMQ控制台的安装控制台目前获取方式有如下两种:

  第三方网站去下载现成的,比如世纪乐知等。官方源码包自己编译而成,官方没有现成的。我们这里当然采取官方方式。

  1、官方文档开源代码库仓库

  https://github.com/apache/rocketmq-externals

  中文指南

  https://github。com/Apache/rocket MQ-externals/blob/master/rocket MQ-console/doc/1 _ 0 _ 0/user guide _ cn。医学博士

  2、下载源码https://github。com/Apache/rocket MQ-externals/tree/release-rocket MQ-console-1。0 .0

  3、修改配置(可选)我们下载完解压后的文件目录如下:

  修改rocket MQ-console/src/main/resources/application。性能文件的服务器。端口就欧了。默认8080。

  4、编译打包进入火箭MQ-控制台,然后用专家进行编译打包,如果没有mvn,安装它:好吃安装玛文,然后:

  传送非清洁包装-DskipTests打包完会在目标下生成我们板簧罩的冲突程序,直接爪哇罐启动完事。

  5、启动控制台将编译打包好的跳羚程序扔到服务器上,执行如下命令进行启动

  Java-jar火箭MQ-控制台-ng-1。0 .1 .罐子-火箭MQ。配置。namesrvaddr=127。0 .0 .1:9876如果想后台启动就后台运行

  访问一下看看效果:

  三、测试消息队列给我们提供了测试工具和测试类,可以在安装完很方便的进行测试。

  0、准备工作消息队列给我们提供的默认测试工具在容器目录下,叫工具. sh。我们测试前需要配置这个脚本,为他指定名称城郊多用途车地址才可以,否则测试发送/消费消息的时候会出现如下错误连接到空失败:

  22:49:02.470[main]调试I . n . u . I . l . internalloggerfactory-使用SLF4J作为默认日志记录框架

  RocketMQLog:警告找不到记录器(io。妮蒂。util。内部。平台相关0)的附加程序。

  RocketMQLog:警告请正确初始化日志系统。

  Java。郎。illegalstateexception:org。阿帕奇。火箭MQ。远程处理。例外。远程处理连接异常:连接到空失败配置如下:

  vim工具. sh

  #添加以下代码以导出JAVA_HOME

  名称导出SRV _ addr=localhost: 98761,发送消息。/tools . shorg . Apache . rocket MQ . example . quick start . producer如果成功会看到喷日志,因为这个类会向TopicTest的主题发送1000条消息。

  2.消费消息。/tools . sh org . Apache . rocket MQ . example . quick start . consumer如果成功就会看到swoosh日志,因为这个类会消耗TopicTest下的所有消息。发送的前1000件将被消耗。

  3.控制台发送成功后,我们自然可以来到控制台查看新闻、消费等信息。

  四。架构图和角色1。架构图

  2.角色2.1。Broker理解为RocketMQ本身。代理主要用于生产者和消费者接收和发送消息。Broker会定期向nameserver提交自己的信息,name server是消息中间件的消息存储转发服务器。启动时,每个代理节点将遍历名称服务器列表。与每个名称服务器建立长连接,注册自己的信息,然后定期向2.2汇报。nameserver明白zookeeper的作用,但是他没有用zk,而是写了一个nameserver来代替zk。底层由netty实现,提供路由管理、服务注册和服务发现功能。名称服务器是一个无状态节点和服务发现者。集群中的每个角色(生产者、代理、消费者等。)需要定期向名称服务器报告它们的状态,以便它们可以发现彼此。如果超时后他们没有报告,域名服务器会将其从列表中删除。名称服务器可以部署多个名称。当存在多个名称服务器时,其他角色同时向它们报告信息。为了确保高可用性,名称服务器集群不相互通信,并且没有活动和备用名称服务器内存存储的概念。nameserver中的broker、topic等信息默认不会持久化,所以他是无状态节点2.3和生产者消息的生产者,随机选择其中一个Nameserver节点建立长连接。获取主题路由信息(包括主题下的队列,这些队列分布在哪个代理上,等等。)然后和提供主题服务的master建立长连接(因为rocketmq只能从master写消息)。而定期向Master发送heartbeat 2.4和consumer消息的消费者通过NameServer集群获取主题的路由信息,并连接到相应的Broker来消费消息,因为主从都可以读取消息,因此,消费者会与主从都建立连接来消费消息。3.核心流程代理在名称服务器上注册。当生产者发送消息时,它将从名称服务器获得消息的主题信息。制作人会和所有提供服务的师傅建立长久的联系。并定时向主人发送心跳。消费者通过名称服务器集群获得主题的路由信息。消费者将与所有主设备和从设备建立连接,以监控新消息。5.核心理念1。消息消息载体。发送或使用消息时必须指定主题。Message有一个可选的标记项用于过滤消息,您还可以添加额外的键值对。

  2.主题信息的逻辑分类。在发送消息之前,必须指定一个主题来发送,也就是将该消息发送到该主题。指定在使用消息时使用的主题。就是逻辑分类。

  3.queue1话题会被分成N个队列,数量是可配置的。消息本身实际上存储在队列中,使用者使用队列中的消息。说多了,比如一个话题有四个队列,五个消费者在消费这个话题,那么一个消费者就浪费了。由于负载平衡策略,每个消费者消耗一个队列,5 ^ 4,溢出一个,这样是不行的。

  4.TagTag是Topic的进一步细分。顾名思义,标签。每条消息在发送的时候都可以进行标签化,在消费的时候可以根据标签进行筛选,选择性消费。

  5.消息模型消息模型:集群和广播。

  6.消息顺序消息顺序:顺序和并发。

  7.生产者组消息生产者组

  8.用户群消息用户群

  六、确认首先要明确一点:确认机制是发生在消费者端的,不是在生产者端的。也就是说消费者消费完消息后要进行命令正确应答确认,如果未确认则代表是消费失败,这时候经纪人会进行重试策略(仅集群模式会重试)。命令正确应答的意思就是:消费者说:好的,我消费成功了。这条消息给我标记成已消费吧。

  七、消费模式1、集群模式(聚类)1.1、图解

  1.2、特点每条消息只需要被处理一次,经纪人只会把消息发送给消费集群中的一个消费者在消息重投时,不能保证路由到同一台机器上消费状态由经纪人维护2、广播模式(广播)2.1,图解

  2.2、特点消费进度由消费者维护保证每个消费者都消费一次消息消费失败的消息不会重投八、Java API说明:

  消息队列服务端版本为目前最新版:4.7.0Java客户端版本采取的目前最新版:4 .7 .0磅如下

  属国

  groupId组织。阿帕奇。rocket MQ/groupId

  人造火箭MQ-客户端/artifactId

  版本4 .7 .0/版本

  /依赖关系1、生产者发消息肯定要必备如下几个条件:

  指定生产组名(不能用默认的,会报错)配置名称城郊多用途车地址(必须)指定主题名称(必须)指定标签/钥匙(可选)验证消息是否发送成功:消息发送完后可以启动消费者进行消费,也可以去管控台上看消息是否存在。

  1.1、发送(同步)公共类生产者{

  公共静态void main(String[] args)引发异常{

  //指定生产组名为我的制片人

  DefaultMQProducer producer=new DefaultMQProducer( my-producer );

  //配置名称城郊多用途车地址

  制片人。setnamesrvaddr( 124。57 .180 .156:9876 );

  //启动生产者

  producer.start()。

  //创建消息对象,主题为:myTopic001,消息内容为:你好世界

  消息消息=新消息( myTopic001 , hello world ).getBytes());

  //发送消息到mq,同步的

  发送结果结果=制作人。发送(msg);

  System.out.println(发送消息成功!结果是:结果);

  //关闭生产者

  生产。关闭();

  System.out.println(生产者"关机!");

  }

  }输出结果:

  发送消息成功!结果为:SendResult [sendStatus=SEND_OK,msgId=a9fe 854140 f 418 B4 AAC 26 f 7973910000,offset msgId=7b 39 b49 d 000000000589 be,message queue=message queue[topic=我的主题001,brokerName=broker-a,queueId=0],queueOffset=7]

  生产者关机!1.2、发送(批量)公共类ProducerMultiMsg {

  公共静态void main(String[] args)引发异常{

  //指定生产组名为我的制片人

  DefaultMQProducer producer=new DefaultMQProducer( my-producer );

  //配置名称城郊多用途车地址

  制片人。setnamesrvaddr( 124。57 .180 .156:9876 );

  //启动生产者

  producer.start()。

  String topic= myTopic001

  //创建消息对象,主题为:myTopic001,消息内容为:hello world1/2/3

  消息msg1=新消息(主题“你好,世界1号”.getBytes());

  消息msg2=新消息(主题,“你好,世界2”.getBytes());

  消息msg3=新消息(主题,“你好,世界3”.getBytes());

  //创建消息对象的集合,用于批量发送

  list Message msgs=new ArrayList();

  msgs。添加(msg 1);

  msgs。添加(msg 2);

  msgs。添加(msg 3);

  //批量发送的美国石油学会(美国石油协会)的也是send(),只是他的重载方法支持列出消息,同样是同步发送。

  发送结果结果=制作人。发送(短信);

  System.out.println(发送消息成功!结果是:结果);

  //关闭生产者

  生产。关闭();

  System.out.println(生产者"关机!");

  }

  }JAVA全屏

  输出结果:

  发送消息成功!结果为:SendResult [sendStatus=SEND_OK,msgId=a9 Fe 854139 c 418 B4 AAC 26 f 7d 13770000,a9 Fe 854139 c 418 B4 AAC 26 f 7d 13770001,a9 Fe 854139 c 418 B4 AAC 26 f 7d 13770002,offsetMsgId=7b 39 b 49d 00002 a 9 f 0000

  生产者关机!从结果中可以看到只有一个msgId,所以可以发现虽然是三条消息对象,但是却只发送了一次,大大节省了客户与计算机网络服务器的开销。

  错误情况:

  批量发送的主题必须是同一个,如果消息对象指定不同的题目,那么批量发送的时候会报错:

  线程“主要”组织。阿帕奇。火箭MQ。客户。例外。mqclientexception中出现异常:无法启动消息批处理

  欲了解更多信息,请访问网址,http://rocket MQ . Apache . org/docs/FAQ/

  位于org。阿帕奇。火箭MQ。客户。制片人。defaultmq生产者。批处理(defaultmq生成器。Java:950)

  位于org。阿帕奇。火箭MQ。客户。制片人。defaultmq生产者。发送(defaultmq生产者。Java:898)

  在com。陈同伟。MQ。火箭MQ。producermultimsg。main(producermultimsg。Java:29)

  原因:Java。郎。不支持操作异常:一批消息的主题应该相同

  位于org。阿帕奇。火箭MQ。常见。消息。消息批次。从列表生成(消息批次。Java:58)

  位于org。阿帕奇。火箭MQ。客户。制片人。defaultmq生产者。批处理(defaultmq生成器。Java:942)

  .2 more1.3、sendCallBack(异步)公共类生产者同步{

  公共静态void main(String[] args)引发异常{

  //指定生产组名为我的制片人

  DefaultMQProducer producer=new DefaultMQProducer( my-producer );

  //配置名称城郊多用途车地址

  制片人。setnamesrvaddr( 124。57 .180 .156:9876 );

  //启动生产者

  producer.start()。

  //创建消息对象,主题为:myTopic001,消息内容为:你好世界异步

  消息消息=新消息( myTopic001 , hello world async .getBytes());

  //进行异步发送,通过发送回拨接口来得知发送的结果

  producer.send(msg,new SendCallback() {

  //发送成功的回调接口

  @覆盖

  成功时公开作废(发送结果发送结果){

  System.out.println(发送消息成功!结果为:发送结果);

  }

  //发送失败的回调接口

  @覆盖

  异常时的公共void(Throwable Throwable){

  可投掷的。printstacktrace();

  System.out.println(发送消息失败!结果是:可投掷。getmessage());

  }

  });

  生产。关闭();

  System.out.println(生产者"关机!");

  }

  }输出结果:

  生产者关机!

  Java。郎。illegalstateexception:org。阿帕奇。火箭MQ。远程处理。例外。remotingconnectexception:连接到[124.57.180.156:9876]失败

  位于org。阿帕奇。火箭MQ。客户。impl。工厂。MQ客户端实例。updatetopicrouteinfofromnameserver(MQ客户端实例。Java:681)

  位于org。阿帕奇。火箭MQ。客户。impl。工厂。MQ客户端实例。updatetopicrouteinfofromnameserver(MQ客户端实例。Java:511)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。trytofindtopicpublishinfo(defaultmqproducerimpl。Java:692)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。senddefaultimpl(defaultmqproducerimpl。Java:556)

  在org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。访问$ 300(defaultmqproducerimpl。Java:97)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl$4。运行(defaultmqproducerimpl。Java:510)

  在爪哇。util。并发。executors$runnable适配器。调用(执行人。Java:511)

  在爪哇。util。并发。未来任务。运行(未来任务。Java:266)

  在爪哇。util。并发。threadpoolexecutor。运行工作线程(threadpoolexecutor。Java:1142)

  在爪哇。util。并发。threadpoolexecutor$worker。运行(threadpoolexecutor。Java:617)

  位于Java。郎。线程。运行(线程。Java:745)

  原因:org。阿帕奇。火箭MQ。远程处理。例外。远程处理连接异常:连接到[124.57.180.156:9876]失败

  位于org。阿帕奇。火箭MQ。远程处理。妮蒂。nettyremotingclient。getandcreatenameserverchannel(nettyremotingclient。Java:441)

  位于org。阿帕奇。火箭MQ。远程处理。妮蒂。nettyremotingclient。getandcreatechannel(nettyremotingclient。Java:396)

  位于org。阿帕奇。火箭MQ。远程处理。妮蒂。nettyremotingclient。调用sync(nettyremotingclient。Java:365)

  位于org。阿帕奇。火箭MQ。客户。impl。mqclientapiimpl。gettopicrouteinfofromnameserver(mqclientapiimpl。Java:1371)

  位于org。阿帕奇。火箭MQ。客户。impl。mqclientapiimpl。gettopicrouteinfofromnameserver(mqclientapiimpl。Java:1361)

  位于org。阿帕奇。火箭MQ。客户。impl。工厂。MQ客户端实例。updatetopicrouteinfofromnameserver(MQ客户端实例。Java:624)

  .还有10个

  发送消息失败!结果是:org。阿帕奇。火箭MQ。远程处理。例外。远程处理连接异常:连接到[124.57.180.156:9876]失败为啥报错了?很简单,他是异步的,从结果就能看出来,由于是异步的,我还没发送到(法属)马提尼克岛(马提尼克岛的简写)呢,你就先给我关机了。肯定不行,所以我们在关机前面睡眠1s在看效果

  公共类生产者同步{

  公共静态void main(String[] args)引发异常{

  //指定生产组名为我的制片人

  DefaultMQProducer producer=new DefaultMQProducer( my-producer );

  //配置名称城郊多用途车地址

  制片人。setnamesrvaddr( 124。57 .180 .156:9876 );

  //启动生产者

  producer.start()。

  //创建消息对象,主题为:myTopic001,消息内容为:你好世界异步

  消息消息=新消息( myTopic001 , hello world async .getBytes());

  //进行异步发送,通过发送回拨接口来得知发送的结果

  producer.send(msg,new SendCallback() {

  //发送成功的回调接口

  @覆盖

  成功时公开作废(发送结果发送结果){

  System.out.println(发送消息成功!结果为:发送结果);

  }

  //发送失败的回调接口

  @覆盖

  异常时的公共void(Throwable Throwable){

  可投掷的。printstacktrace();

  System.out.println(发送消息失败!结果是:可投掷。getmessage());

  }

  });

  线程。睡眠(1000);

  生产。关闭();

  System.out.println(生产者"关机!");

  }

  }输出结果:

  发送消息成功!结果为:SendResult [sendStatus=SEND_OK,msgId=a9fe 854106 e 418 B4 AAC 26 f 8719 b 20000,offset msgId=7b 39 b 49d 00000002 a 9 f 0000000058 CFC,message queue=message queue[topic=my topic 001,brokerName=broker-a,queueId=1],queueOffset=2]

  生产者关机!1.4、发送方式公开公共类生产者通道{

  公共静态void main(String[] args)引发异常{

  //指定生产组名为我的制片人

  DefaultMQProducer producer=new DefaultMQProducer( my-producer );

  //配置名称城郊多用途车地址

  制片人。setnamesrvaddr( 124。57 .180 .156:9876 );

  //启动生产者

  producer.start()。

  //创建消息对象,主题为:myTopic001,消息内容为:hello world oneway

  消息消息=新消息( myTopic001 , hello world oneway ).getBytes());

  //效率最高,因为单向的不关心是否发送成功,我就投递一下我就不管了。所以返回是空的

  制片人。sendoneway(msg);

  System.out.println(投递消息成功!注意这里是投递成功,而不是发送消息成功哦!因为我sendOneway也不知道到底成没成功,我没返回值的。);

  生产。关闭();

  System.out.println(生产者"关机!");

  }

  }输出结果:

  投递消息成功!注意这里是投递成功,而不是发送消息成功哦!因为我sendOneway也不知道到底成没成功,我没返回值的。

  生产者关机!1.5、效率对比发送单向发送回调发送批量派遣单条

  很容易理解,sendOneway不求结果,我就负责投递,我不管你失败还是成功,相当于中转站,来了我就扔出去,我不进行任何其他处理。所以最快。

  而发送回拨是异步发送肯定比同步的效率高。

  派遣批量和派遣单条的效率也是分情况的,如果只有一条味精要发,那还搞毛批量,直接派遣单条完事。

  2、消费者每个消费者只能关注一个话题。

  发消息肯定要必备如下几个条件:

  指定消费组名(不能用默认的,会报错)配置名称城郊多用途车地址(必须)指定主题名称(必须)指定标签/钥匙(可选)2.1、聚类集群模式,默认。

  比如启动五个消费者,生产者生产一条消息后,经纪人会选择五个消费者中的其中一个进行消费这条消息,所以他属于点对点消费模式。

  公共类别消费者{

  公共静态void main(String[] args)引发异常{

  //指定消费组名为我的消费者

  DefaultMQPushConsumer消费者=new DefaultMQPushConsumer(我的消费者);

  //配置名称城郊多用途车地址

  消费者。setnamesrvaddr( 124。57 .180 .156:9876 );

  //订阅主题:我的主题001下的全部消息(因为是*,*指定的是标签标签,代表全部消息,不进行任何过滤)

  consumer.subscribe(myTopic001 , * );

  //注册监听器,进行消息消息。

  消费者。registermessagelistener(new message listener concurrent(){

  @覆盖

  公共ConsumeConcurrentlyStatus消费消息(List MessageExt msgs,ConsumeConcurrentlyContext ConsumeConcurrentlyContext){

  for (MessageExt msg : msgs) {

  字符串str=新字符串(味精。getbody());

  //输出消息内容

  系统。出去。println(str);

  }

  //默认情况下,这条消息只会被一个消费者消费,这叫点对点消费模式。也就是集群模式。

  //ack确认

  返回ConsumeConcurrentlyStatus .消费_成功;

  }

  });

  //启动消费者

  消费者。start();

  System.out.println(消费者启动);

  }

  }2.2、广播广播模式。

  比如启动五个消费者,生产者生产一条消息后,经纪人会把这条消息广播到五个消费者中,这五个消费者分别消费一次,每个都消费一次。

  //代码里只需要添加如下这句话即可:

  消费者。setmessagemodel(消息模型.广播);2.3、两种模式对比集群默认是默认的,广播模式是需要手动配置。一条消息:集群模式下的多个消费者只会有一个消费者消费。广播模式下的每一个消费者都会消费这条消息。广播模式下,发送一条消息后,会被当前被广播的所有消费者消费,但是后面新加入的消费者不会消费这条消息,很好理解:村里面大喇叭喊了全村来领鸡蛋,第二天你们村新来个人,那个人肯定听不到昨天大喇叭喊的消息呀3、标签键发送/消费消息的时候可以指定标签/钥匙来进行过滤消息,支持通配符。*代表消费此主题下的全部消息,不进行过滤。

  看下org。阿帕奇。火箭MQ。常见。消息。消息源码可以发现发消息的时候可以指定标签和按键:

  公共消息(字符串主题、字符串标签、字符串关键字、byte[] body) {

  this(topic,tags,key,0,body,true);

  }比如:

  公共类生产者标签键{

  公共静态void main(String[] args)引发异常{

  //指定生产组名为我的制片人

  DefaultMQProducer producer=new DefaultMQProducer( my-producer );

  //配置名称城郊多用途车地址

  制片人。setnamesrvaddr( 124。57 .180 .156:9876 );

  //启动生产者

  producer.start()。

  //创建消息对象,主题为:myTopic001,消息内容为:hello world,且标签为:测试标签、密钥为测试键

  消息消息=新消息( myTopic001 ,测试标签,测试键,‘你好世界’).getBytes());

  //发送消息到mq,同步的

  发送结果结果=制作人。发送(msg);

  System.out.println(发送消息成功!结果是:结果);

  //关闭生产者

  生产。关闭();

  System.out.println(生产者"关机!");

  }

  }输出结果:

  发送消息成功!结果为:SendResult [sendStatus=SEND_OK,msgId=a9fe 854149 DC 18 B4 AAC 26 fa 4b 7200000,offsetMsgId=7b 39 b49 d 0000002 a 9 f 000000058 da 6,message queue=message queue[topic=我的主题001,brokerName=broker-a,queueId=3],queueOffset=3]

  生产者关机!查看管控台,可以发现标签和键已经生效了:

  消费的时候如果指定*那就是此主题下的全部消息,我们可以指定前缀通配符,比如:

  //这样就只会消费myTopic001下的标签为测试-*开头的消息。

  consumer.subscribe(myTopic001 , test-* );

  //代表订阅主题为myTopic001下的标签为塔加楼或TagB的所有消息

  consumer.subscribe(myTopic001 , TagA TagB );还支持结构化查询语言表达式过滤,不是很常用。不倍黑了。

  4、常见错误4.1、sendDefaultImpl调用超时4.1.1、异常线程“主要”组织。阿帕奇。火箭MQ。远程处理。例外。远程处理toomuchrequestexception中出现异常:sendDefaultImpl调用超时

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。senddefaultimpl(defaultmqproducerimpl。Java:666)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。send(defaultmqproducerimpl。Java:1342)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。send(defaultmqproducerimpl。Java:1288)

  位于org。阿帕奇。火箭MQ。客户。制片人。defaultmq生产者。发送(defaultmq生产者。Java:324)

  在com。陈同伟。MQ。火箭MQ。制片人。主(制作人。Java:18)4。1 .2、解决1.如果你是云服务器,首先检查安全组是否允许9876这个端口访问,是否开启了防火墙,如果开启了的话是否将9876映射了出去。

  2.修改配置文件broker.conf,加上:

  brokerIP1=我用的是阿里云服务器,这里是我的公网互联网协议(互联网协议)启动名称城郊多用途车和经纪人的时候加上本机IP(我用的是阿里云服务器,这里是我的公网IP):/bin/mqnamesrv -n IP:9876。/bin/MQ broker-n IP:9876-c conf/broker。会议4.2,没有此主题的路由信息4.2.1、异常线程“主要”组织。阿帕奇。火箭MQ。客户。例外。mqclientexception中出现异常:没有此主题的路由信息:我的主题001

  详情请见http://rocketmq.apache.org/docs/faq/。

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。senddefaultimpl(defaultmqproducerimpl。Java:684)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。send(defaultmqproducerimpl。Java:1342)

  位于org。阿帕奇。火箭MQ。客户。impl。制片人。defaultmqproducerimpl。send(defaultmqproducerimpl。Java:1288)

  位于org。阿帕奇。火箭MQ。客户。制片人。defaultmq生产者。发送(defaultmq生产者。Java:324)

  在com。陈同伟。MQ。火箭MQ。制片人。主(制作人。Java:18)4。2 .2、解决很明显发送成功了,不再是刚才的超时了,但是告诉我们没有这个话题。那不能每次都手动创建呀,所以启动经纪人的时候可以指定参数让经纪人为我们自动创建。如下。/bin/MQ broker-n IP:9876-c conf/broker。conf autoCreateTopicEnable=true

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: