本篇文章为你整理了使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作()的详细内容,包含有 使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作,希望能帮助你了解 使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作。
使用java代码操作rabbitmq时,首先需要一个有创建用户等权限的管理员账号,需要在rabbitmq的后台管理页面手动创建这个账号,系统推荐的这几个tag可以让账号有rabbitmq后台管理页面的访问权限
图一
管理账号创建完成后就可以在代码中操作新增编辑mq账号及vhost等等了,点击rabbitmq后台管理页面左下角的HTTP API(见上文图一左下角)可以查看所有API接口
以下代码中,rabbitmqUsername 为管理员账号的用户名,rabbitmqPassword为管理员账号的密码,rabbitmqUrl为rabbitmq服务器接口地址(例:http://127.0.0.1:15672/api/)
1. 新增用户或修改用户密码
1.1 API
1.2 代码示例
以下代码中,yourUsername为新增账号的用户名,yourPassword为新增账号的密码,guest为新增账号的tag,可以自定义,也可以使用rabbitmq提供的tag(见上文图一),该接口也可以用来修改已有账号的密码
//add user
String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) );
HttpPut putCriaUsuario = new HttpPut( rabbitmqUrl+"users/"+yourUserName );
// RabbitMQ requires a user with create permission, create it mannually first
putCriaUsuario.addHeader( "Authorization", "Basic " + enc );
putCriaUsuario.addHeader( "content-type", "application/json" );
putCriaUsuario.setEntity( new StringEntity( "{\"password\":\""+yourPassword+"\",\"tags\":\"guest\"}" ) );
CloseableHttpClient client = HttpClients.createDefault();
client.execute( putCriaUsuario );
2. 新增vhost
2.1 API
2.2 代码示例
以下代码中 yourVhost 为新增vhost的名称,guest为自定义的tag
// 管理员账号用户名密码
String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) );
//add vhost
HttpPut putVhost = new HttpPut( rabbitmqUrl+"vhosts/"+yourVhost );
putVhost.addHeader( "Authorization", "Basic " + enc );
putVhost.addHeader( "content-type", "application/json" );
putVhost.setEntity( new StringEntity( "{\"tags\":\"guest\"}" ) );
CloseableHttpClient putVhostClient = HttpClients.createDefault();
putVhostClient.execute( putVhost );
3. mq用户绑定vhost并设置权限
3.1 API
3.2 代码示例
以下代码中,yourVhost 与yourUsername为绑定的mq用户与vhost的名称,代码示例中该用户对该vhost只开启了read权限,如果需要开启全部的configure(配置),write(写入),read(读取)权限,参数需要写成:
"{\"configure\":\".*\",\"write\":\".*\",\"read\":\".*\"}"
//管理员账号用户名密码
String enc = new String( Base64.encodeBase64((rabbitmqUsername + ":" + rabbitmqPassword).getBytes() ) );
//add permissions and bind user vhost
HttpPut putPermissions = new HttpPut( rabbitmqUrl+"permissions/"+yourVhost+"/"+yourUsername);
putPermissions.addHeader( "Authorization", "Basic " + enc );
putPermissions.addHeader( "content-type", "application/json" );
putPermissions.setEntity( new StringEntity( "{\"configure\":\"\",\"write\":\"\",\"read\":\".*\"}" ) );
CloseableHttpClient putPermissionsClient = HttpClients.createDefault();
putPermissionsClient.execute( putPermissions );
4. 动态创建exchange交换机和queue队列,并绑定指定vhost虚拟机
//add exchange, queue and bind vhost
RabbitModuleInfo rabbitModuleInfo = new RabbitModuleInfo();
rabbitModuleInfo.setVhost(vhost);
RabbitModuleInfo.Queue queue = new RabbitModuleInfo.Queue();
Map String, Object arguments = new HashMap ();
//消息过期时间
arguments.put("x-message-ttl",3600000);
queue.setName(queueName);
queue.setArguments(arguments);
rabbitModuleInfo.setQueue(queue);
RabbitModuleInfo.Exchange exchange = new RabbitModuleInfo.Exchange();
exchange.setName(exchangeName);
rabbitModuleInfo.setExchange(exchange);
rabbitModuleInfo.setRoutingKey(queueName);
rabbitModuleInitializer.declareRabbitModule(rabbitModuleInfo);
public RabbitModuleInitializer(AmqpAdmin amqpAdmin,RealtimePushProducer realtimePushProducer) {
this.amqpAdmin = amqpAdmin;
this.realtimePushProducer = realtimePushProducer;
public void declareRabbitModule(RabbitModuleInfo rabbitModuleInfo) {
configParamValidate(rabbitModuleInfo);
// 队列
Queue queue = convertQueue(rabbitModuleInfo.getQueue());
// 交换机
Exchange exchange = convertExchange(rabbitModuleInfo.getExchange());
// 绑定关系
String routingKey = rabbitModuleInfo.getRoutingKey();
String queueName = rabbitModuleInfo.getQueue().getName();
String exchangeName = rabbitModuleInfo.getExchange().getName();
Binding binding = new Binding(queueName, Binding.DestinationType.QUEUE, exchangeName, routingKey, null);
realtimePushProducer.bindVhostExchangeQueue(rabbitModuleInfo.getVhost(),exchange,queue,binding);
* RabbitMQ动态配置参数校验
* @param rabbitModuleInfo
public void configParamValidate(RabbitModuleInfo rabbitModuleInfo) {
String routingKey = rabbitModuleInfo.getRoutingKey();
Assert.isTrue(StrUtil.isNotBlank(routingKey), "RoutingKey 未配置");
Assert.isTrue(rabbitModuleInfo.getExchange() != null, "routingKey:{}未配置exchange", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getExchange().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
Assert.isTrue(rabbitModuleInfo.getQueue() != null, "routingKey:{}未配置queue", routingKey);
Assert.isTrue(StrUtil.isNotBlank(rabbitModuleInfo.getQueue().getName()), "routingKey:{}未配置exchange的name属性", routingKey);
* 转换生成RabbitMQ队列
* @param queue
* @return
public Queue convertQueue(RabbitModuleInfo.Queue queue) {
Map String, Object arguments = queue.getArguments();
// 转换ttl的类型为long
if (arguments != null arguments.containsKey("x-message-ttl")) {
arguments.put("x-message-ttl", Convert.toLong(arguments.get("x-message-ttl")));
// 是否需要绑定死信队列
String deadLetterExchange = queue.getDeadLetterExchange();
String deadLetterRoutingKey = queue.getDeadLetterRoutingKey();
if (StrUtil.isNotBlank(deadLetterExchange) StrUtil.isNotBlank(deadLetterRoutingKey)) {
if (arguments == null) {
arguments = new HashMap (4);
arguments.put("x-dead-letter-exchange", deadLetterExchange);
arguments.put("x-dead-letter-routing-key", deadLetterRoutingKey);
return new Queue(queue.getName(), queue.isDurable(), queue.isExclusive(), queue.isAutoDelete(), arguments);
String exchangeName = exchangeInfo.getName();
boolean isDurable = exchangeInfo.isDurable();
boolean isAutoDelete = exchangeInfo.isAutoDelete();
Map String, Object arguments = exchangeInfo.getArguments();
switch (exchangeType) {
case DIRECT:// 直连交换机
exchange = new DirectExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case TOPIC: // 主题交换机
exchange = new TopicExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case FANOUT: //扇形交换机
exchange = new FanoutExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
case HEADERS: // 头交换机
exchange = new HeadersExchange(exchangeName, isDurable, isAutoDelete, arguments);
break;
return exchange;
}
@Override
public void bindVhostExchangeQueue(String vhost, Exchange exchange, Queue queue, Binding binding) {
ConnectionFactory factory = queueConfig.pushConnectionFactory(rabbitProperties,vhost);
RabbitAdmin rabbitAdmin = new RabbitAdmin(factory);
log.debug("bind vhost={},exchange={},queue={}",vhost,exchange.getName(),queue.getName());
// 创建队列
rabbitAdmin.declareQueue(queue);
// 创建交换机
rabbitAdmin.declareExchange(exchange);
// 队列 绑定 交换机
rabbitAdmin.declareBinding(binding);
}
public ConnectionFactory pushConnectionFactory(RabbitProperties rabbitProperties, String vhost) {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setHost(rabbitProperties.getHost());
cachingConnectionFactory.setPort(rabbitProperties.getPort());
cachingConnectionFactory.setUsername(rabbitProperties.getUsername());
cachingConnectionFactory.setPassword(rabbitProperties.getPassword());
cachingConnectionFactory.setVirtualHost(vhost);
return cachingConnectionFactory;
}
以上就是使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作()的详细内容,想要了解更多 使用java代码调用rabbitmq接口进行新增编辑mq用户、虚拟机vhost、动态创建交换机exchange、队列queue以及设置权限,绑定vhost与exchange等操作的内容,请持续关注盛行IT软件开发工作室。
郑重声明:本文由网友发布,不代表盛行IT的观点,版权归原作者所有,仅为传播更多信息之目的,如有侵权请联系,我们将第一时间修改或删除,多谢。