`

Java学习——rabbitmq(work queue)

 
阅读更多

Work Queue

 

/* Work Queue Model
            / -> consumers one
P -> [...]Q
            \ -> consumers two
*/

接下来用Rabbit实现一个工作任务分发,work queue常被用于web应用程序的场景。

 

生产者Send.java(发送多个消息)

package test;

import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send {

	public static void main(String[] args) throws IOException {
		// 创建一个连接连接服务器
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		//factory.setPort(1987);
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();
		
		// 声明一个队列,可以对队列做配置,如持久化等。然后往队列发送数据
		channel.queueDeclare("queue1", false, false, false, null);
		
		for(int i = 1; i < 10; i ++){
			String message = "message " + i;
			channel.basicPublish("", "queue1", null, message.getBytes());
			System.out.println(" [x] Sent '" + message +"'");
		}
		channel.close();
		connection.close();
	}
}

 

Recv代码不变,开启多个Recv,运行效果如下

 

root # java -cp .:* Send 
 [x] Sent 'message 1'
 [x] Sent 'message 2'
 [x] Sent 'message 3'
 [x] Sent 'message 4'
 [x] Sent 'message 5'
 [x] Sent 'message 6'
 [x] Sent 'message 7'
 [x] Sent 'message 8'
 [x] Sent 'message 9'

root # java -cp .:* Recv 
 [x] Received 'message 1'
 [x] Received 'message 3'
 [x] Received 'message 5'
 [x] Received 'message 7'
 [x] Received 'message 9'
 
root # java -cp .:* Recv 
 [x] Received 'message 2'
 [x] Received 'message 4'
 [x] Received 'message 6'
 [x] Received 'message 8'

 默认情况下,RabbitMQ会把消息依次发送给各个consumer,平均情况下各个消费者分配的任务差不多,这种分配方式也称为round-robin轮询。

 

为了更加智能地按照工作量均分任务,在Recv.java中添加如下代码

 

int prefetchCount = 1;
channel.basicQos(prefetchCount);

 

上面的模型比较简单,我们考虑如下场景:

 

有时候完成一个任务需要耗时很久,加入某个consumer执行任务失败(比如进程挂掉),消息队列中也没有这个消息了,这时这条消息就会丢失。

为了保证消息不丢失,RabbitMQ引入ack保证消息被完全处理,队列才会把消息删除掉。rabbitmq默认是开启这个配置的,如果需要关闭这个通知,可以再Recv.java中修改下面代码

 

channel.basicConsume("queue1", false, consumer);

 

注:我们也要防止某个队列不能及时收到ack而不断重复发送消息导致耗尽内存,可以用rabbitmqctl查看messages_unacknowledged

 

此外配置队列和消息持久(生产者消费者均需要配置)

 

boolean durable = true;
channel.queueDeclare("queue1", durable, false, false, null);
//发布持久消息
channel.basicPublish("", "queue1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics