spring boot 消息队列,springboot 队列

  spring boot 消息队列,springboot 队列

  geBody.getClass()); messageModel.setBodyContent(JSON.toJSONString(messageBody)); if (extraParam != null) { for (String key:extraParam.keySet()) { messageModel.getExtraParam().put(key,extraParam.get(key)); } } if(systemConfig.getMessageChannel().equals("redis")){ redisUtil.sendMessage("message", JSON.toJSON(messageModel)); }else{ jmsMessagingTemplate.convertAndSend("message",JSON.toJSONString(messageModel)); } }}

 

  

集成Redis消息队列

pom配置

 

  

 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <version>2.0.1.RELEASE</version> </dependency>

连接配置

 

  

spring: redis: host: localhost port: 6379 password:

 

  

操作工具类

 

  

 @Autowired private RedisTemplate redisTemplate; public void sendMessage(String channel, Object message) { redisTemplate.convertAndSend(channel, message); }

消息处理

 

  

@Component@ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true)public class RedisMessageReceiver { public void receiveMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); }}

配置注册

 

  

@Configurationpublic class MessageCenter { @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(connectionFactory); // 可以添加多个 messageListener,配置不同的交换机 container.addMessageListener(listenerAdapter, new PatternTopic("message")); return container; } /** * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法 * * @param receiver * @return */ @Bean @ConditionalOnProperty(name = "system.messageChannel", havingValue = "redis", matchIfMissing = true) MessageListenerAdapter listenerAdapter(RedisMessageReceiver receiver) { return new MessageListenerAdapter(receiver, "receiveMessage"); }}

 

  

集成ActiveMQ消息队列

pom配置

 

  

 <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.15.0</version> </dependency>

注意:jdk1.8对应版本5.15.0

 

  连接配置

  

spring: activemq: broker-url: tcp://127.0.0.1:61616 #MQ服务器地址 user: admin password: admin pool: enabled: true

 

  

消息处理

 

  

@Component@ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false)public class ActiveMQMessageReceiver { @JmsListener(destination = "message", containerFactory = "customQueueListener") public void handleMessage(String message) { MessageModel messageModel = JSON.parseObject(message, MessageModel.class); IMessageReceiver receiver = SpringBootBeanUtil.getBean(messageModel.getHandleClazz()); receiver.handleMessage(JSON.parseObject(messageModel.getBodyContent(), messageModel.getBodyClass()), messageModel.getExtraParam()); }}

配置注册

 

  

@Configuration@EnableJmspublic class MessageCenter { @Bean(name = "customQueueListener") @ConditionalOnProperty(name = "system.messageChannel", havingValue = "activemq", matchIfMissing = false) public JmsListenerContainerFactory<?> customQueueListener(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setPubSubDomain(false); factory.setConnectionFactory(connectionFactory); //重连间隔时间 factory.setRecoveryInterval(1000L); factory.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE); //连接数 factory.setConcurrency("5-10"); //指定任务线程池 factory.setTaskExecutor(new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy())); return factory; }}

 

  

使用示例

消息处理

 

  

@Servicepublic class RequestLogMessageReceiver implements IMessageReceiver{ @Autowired private F_RequestLogService requestLogService; @Override public void handleMessage(Object bodyObject, HashMap extraParam) { F_RequestLogDO requestLogDO = (F_RequestLogDO)bodyObject; requestLogService.insert(requestLogDO); }}

发送消息

 

  

@AutoWiredprivate MessageUtil messageUtil;messageUtil.sendMessage(requestLogDO,RequestLogMessageReceiver.class,null);

到此这篇关于详解SpringBoot集成消息队列的案例应用的文章就介绍到这了,更多相关SpringBoot消息队列内容请搜索盛行IT以前的文章或继续浏览下面的相关文章希望大家以后多多支持盛行IT!

 

郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。

留言与评论(共有 条评论)
   
验证码: