我刚刚开始使用RabbitMQ和AMQP。

我有一个消息队列 我有多个消费者,我希望用相同的消息做不同的事情。

大多数RabbitMQ文档似乎都关注于循环,即单个消息由单个消费者消费,负载分布在每个消费者之间。这确实是我亲眼所见的行为。

举个例子:生产者只有一个队列,每2秒发送一次消息:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
var count = 1;

connection.on('ready', function () {
  var sendMessage = function(connection, queue_name, payload) {
    var encoded_payload = JSON.stringify(payload);  
    connection.publish(queue_name, encoded_payload);
  }

  setInterval( function() {    
    var test_message = 'TEST '+count
    sendMessage(connection, "my_queue_name", test_message)  
    count += 1;
  }, 2000) 


})

这是一位消费者:

var amqp = require('amqp');
var connection = amqp.createConnection({ host: "localhost", port: 5672 });
connection.on('ready', function () {
  connection.queue("my_queue_name", function(queue){
    queue.bind('#'); 
    queue.subscribe(function (message) {
      var encoded_payload = unescape(message.data)
      var payload = JSON.parse(encoded_payload)
      console.log('Recieved a message:')
      console.log(payload)
    })
  })
})

如果我启动消费者两次,我可以看到每个消费者都在循环行为中使用备用消息。例如,我将在一个终端上看到消息1,3,5,在另一个终端上看到消息2,4,6。

我的问题是:

我能让每个消费者收到相同的消息吗?也就是说,两个消费者都得到消息1,2,3,4,5,6 ?这在AMQP/RabbitMQ中被称为什么?它通常是如何配置的? 这种做法普遍吗?我是否应该让交换器将消息路由到两个单独的队列中,只有一个消费者?


当前回答

显然你想要的是扇形散开。扇出

阅读rabbitMQ教程: https://www.rabbitmq.com/tutorials/tutorial-three-javascript.html

以下是我的例子:

Publisher.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create exchange for queues
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      await channel.publish(process.env.EXCHANGE_NAME, '', Buffer.from('msg'))
    } catch(error) {
      console.error(error)
    }
})

Subscriber.js:

amqp.connect('amqp://<user>:<pass>@<host>:<port>', async (error0, connection) => {
    if (error0) {
      throw error0;
    }
    console.log('RabbitMQ connected')
    try {
      // Create/Bind a consumer queue for an exchange broker
      channel = await connection.createChannel()
      await channel.assertExchange(process.env.EXCHANGE_NAME, 'fanout', { durable: false });
      const queue = await channel.assertQueue('', {exclusive: true})
      channel.bindQueue(queue.queue, process.env.EXCHANGE_NAME, '')

      console.log(" [*] Waiting for messages in %s. To exit press CTRL+C");
      channel.consume('', consumeMessage, {noAck: true});
    } catch(error) {
      console.error(error)
    }
});

这是我在网上找到的一个例子。也许还能帮上忙。 https://www.codota.com/code/javascript/functions/amqplib/Channel/assertExchange

其他回答

如果你碰巧像我一样使用amqplib库,他们有一个发布/订阅RabbitMQ教程的实现示例,你可能会觉得很方便。

发送模式是一对一的关系。如果你想“发送”给多个接收者,你应该使用“发布/订阅”模式。详情见http://www.rabbitmq.com/tutorials/tutorial-three-python.html。

在这种情况下,有一个有趣的选项,我在这里没有找到答案。

您可以在一个使用者中使用“requeue”特性对消息进行Nack,以便在另一个使用者中处理它们。 一般来说,这不是一种正确的方式,但也许它对某人来说已经足够好了。

https://www.rabbitmq.com/nack.html

注意循环(当所有消费者nack+requeue消息时)!

我认为你应该检查一下用扇出交换器发送信息。这样你就会收到不同消费者的相同消息,在表下面,RabbitMQ正在为每个新的消费者/订阅者创建不同的队列。

这是javascript教程示例的链接 https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html

最后几个答案几乎是正确的——我有大量的应用程序,它们生成的消息需要最终传递给不同的消费者,所以这个过程非常简单。

如果希望同一消息有多个使用者,请执行以下步骤。

创建多个队列,用于接收消息的每个应用程序,在每个队列属性中,用amq“绑定”一个路由标记。直接交换。更改您的发布应用程序发送到amq。直接使用路由标记(不是队列)。然后AMQP将消息复制到具有相同绑定的每个队列中。工作就像一个魅力:)

例子:假设我有一个我生成的JSON字符串,我将它发布到“amq。使用路由标签“new-sales-order”直接交换,我有一个order_printer应用程序打印订单的队列,我有一个计费系统的队列,它将发送订单的副本和发票给客户端,我有一个web存档系统,我存档订单的历史/合规原因,我有一个客户端web界面,订单被跟踪,因为其他信息来自订单。

因此,我的队列是:order_printer、order_billing、order_archive和order_tracking 它们都绑定了“new-sales-order”绑定标签,所有4个都将获得JSON数据。

这是一种理想的发送数据的方式,而发布应用程序不知道或关心接收应用程序。