Work Queue
/* Work Queue Model / -> consumers one P -> [...]Q \ -> consumers two */
接下来用Rabbit实现一个工作任务分发,work queue常被用于web应用程序的场景。
生产者Send.java(发送多个消息)
package test; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send { public static void main(String[] args) throws IOException { // 创建一个连接连接服务器 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); //factory.setPort(1987); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 声明一个队列,可以对队列做配置,如持久化等。然后往队列发送数据 channel.queueDeclare("queue1", false, false, false, null); for(int i = 1; i < 10; i ++){ String message = "message " + i; channel.basicPublish("", "queue1", null, message.getBytes()); System.out.println(" [x] Sent '" + message +"'"); } channel.close(); connection.close(); } }
Recv代码不变,开启多个Recv,运行效果如下
root # java -cp .:* Send [x] Sent 'message 1' [x] Sent 'message 2' [x] Sent 'message 3' [x] Sent 'message 4' [x] Sent 'message 5' [x] Sent 'message 6' [x] Sent 'message 7' [x] Sent 'message 8' [x] Sent 'message 9' root # java -cp .:* Recv [x] Received 'message 1' [x] Received 'message 3' [x] Received 'message 5' [x] Received 'message 7' [x] Received 'message 9' root # java -cp .:* Recv [x] Received 'message 2' [x] Received 'message 4' [x] Received 'message 6' [x] Received 'message 8'
默认情况下,RabbitMQ会把消息依次发送给各个consumer,平均情况下各个消费者分配的任务差不多,这种分配方式也称为round-robin轮询。
为了更加智能地按照工作量均分任务,在Recv.java中添加如下代码
int prefetchCount = 1; channel.basicQos(prefetchCount);
上面的模型比较简单,我们考虑如下场景:
有时候完成一个任务需要耗时很久,加入某个consumer执行任务失败(比如进程挂掉),消息队列中也没有这个消息了,这时这条消息就会丢失。
为了保证消息不丢失,RabbitMQ引入ack保证消息被完全处理,队列才会把消息删除掉。rabbitmq默认是开启这个配置的,如果需要关闭这个通知,可以再Recv.java中修改下面代码
channel.basicConsume("queue1", false, consumer);
注:我们也要防止某个队列不能及时收到ack而不断重复发送消息导致耗尽内存,可以用rabbitmqctl查看messages_unacknowledged
此外配置队列和消息持久(生产者消费者均需要配置)
boolean durable = true; channel.queueDeclare("queue1", durable, false, false, null); //发布持久消息 channel.basicPublish("", "queue1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
相关推荐
基于RabbitMQ的工作队列实现,包括消息确认机制、消息持久化机制、消息的公平调度等。
学习RabbitMQ的学习笔记
RabbitMQ客户连接池的Java实现。我们刚开始也是采用这种方式来实现的,但做压力测试时,发现这种每次新建Connection和新建Channel是非常耗时的,在大并发下,一般都要8毫秒左右,慢的话,好多都是几十毫秒。因此我们...
rabbitMQ实战java版-rabbitMQ-demo
rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...
Java全能学习面试手册——Java面试题库.zip 01 7道消息队列ActiveMQ面试题!.pdf 02 10道Java高级必备的Netty面试题!.pdf 03 10道Java面试必备的设计模式面试题!.pdf 04 10个Java经典的List面试题!.pdf 05 10个...
javaAPI SpringMVC 集成rabbitMQ 很全的例子,实现了生产消费,重复消费等功能
RabbitMQ 三种Exchange.wps————————三种exchange解释及代码 rabbitmq结构.wps————————rabbitmq架构简介 rabbitmq入门.pdf——————入门的文档 RabbitMQ研究与应用.pdf——————简单的研究
java 队列源码 #rabbitMQ repository 主要记录个人学习reabbit的相关demo ...rabbitmq-spring-work-queue spring boot使用rabbitmq的队列示例 rabbitmq-spring-fanout spring boot使用的rabbitmq的发布订阅示例 rabb
2. 工作队列模式(Work Queue Mode): - 工作队列模式用于将任务分配给多个消费者进行处理。多个消费者共享一个队列,消息发送到队列中,然后按顺序被不同的消费者接收并处理。它可以通过负载平衡来提高任务处理的...
Java使用RabbitMq的一个简单demo,自留。
封装了RabbitMQ的订阅者线程和发布者线程(还有个初始化工厂的连接工具类),另外附加一个安卓的使用demo
rabbitmq_queue_to_queue
在学校简单了解过,从未在生产环境使用过。...现在MQ的基本学习告一段落。感觉这文档还是挺有用的哈,,“应付”我们的日常开发暂时够用了。当然写的有点潦草,有一些细节知识点也未指出。欢迎大家批评错误点
Java 客户端库 RabbitMQ 遵循AMQP协议,那是一个开放的,并且通用的消息协议。java Android RabbitMQ可以用来发送和接收消息
2. 掌握RabbitMQ、RocketMQ、Kafka这三款主流的消息中间件的架构、模型和使用(开发、 安装、集群部署、运维、监控等) 3. 掌握消息的可靠性、幂等性、顺序消息、延迟消息、事务消息等进阶的知识,以及大规模生产 ...
javaAPI SpringMVC 集成rabbitMQ 很全的例子,实现了生产消费,重复消费等功能
rabbitmq 发布/订阅 java 实现
java版本RabbitMQ实例.rar.rar
RabbitMQ入门 1. Java客户端——基础代码 2. Java客户端——Spring AMQP