I understand that you can send messages directly to a queue using channel.sendToQueue
, and this creates a tasks-and-workers situation, where only one consumer will handle each task.
I also understand that you can use channel.publish
with a topic-based exchange, and messages will be routed to queues based on the routing key. To my understanding, though, this will always broadcast to all subscribers on any matching queues.
I would essentially like to use the topic-based exchange, but only have one consumer handle each task. I've been through the documentation, and I don't see a way to do this.
My use-case:
I have instances of a microservice set up in multiple locations. There might be two in California, three in London, one in Singapore, etc. When a task is created, the only thing that matters is that it's handled by one of the instances in a given location.
Of course, I can create hundreds of queues named "usa-90210", "uk-ec1a", etc. It just seems like using topics would be much cleaner. It would also be more flexible, given the ability to use wildcards.
If this isn't a feature of RabbitMQ, I'm open to other thoughts or ideas as well.
UPDATE
As per istepaniuk's suggestion, I've tried creating two workers, each binding their own queue to the exchange:
const connectionA = await amqplib.connect('amqp://guest:[email protected]:5672');
const channelA = await connectionA.createChannel();
channelA.assertExchange('test_exchange', 'topic', { durable: false });
await channelA.assertQueue('test_queue_a', { exclusive: true });
channelA.bindQueue('test_queue_a', 'test_exchange', 'usa.*');
await channelA.consume('test_queue_a', () => { console.log('worker a'); }, { noAck: true });
const connectionB = await amqplib.connect('amqp://guest:[email protected]:5672');
const channelB = await connectionB.createChannel();
channelB.assertExchange('test_exchange', 'topic', { durable: false });
await channelB.assertQueue('test_queue_b', { exclusive: true });
channelB.bindQueue('test_queue_b', 'test_exchange', 'usa.*');
await channelB.consume('test_queue_b', () => { console.log('worker b'); }, { noAck: true });
const pubConnection = await amqplib.connect('amqp://guest:[email protected]:5672');
const pubChannel = await pubConnection.createChannel();
pubChannel.assertExchange('test_exchange', 'topic', { durable: false });
pubChannel.publish('test_exchange', 'usa.90210', Buffer.from(''));
Unfortunately, both consumers are still receiving the message.
worker a
worker b
question from:
https://stackoverflow.com/questions/65860805/is-it-possible-for-workers-to-consume-based-on-topic