RabbitMQ provider Fork from adonis-rabbit
adonis-rabbit is a RabbitMQ provider for Adonis.
Instal adonis-rabbit:
yarn add @alldebug/adonis-rabbit
Then:
node ace invoke @alldebug/adonis-rabbit
This will create config/rabbit.ts and add the following fields to your .env:
RABBITMQ_HOSTNAME=
RABBITMQ_USER=
RABBOTMQ_PASSWORD=
RABBITMQ_PORT=
RABBITMQ_PROTOCOL= "amqp://" //or ampqs
Make sure to set the correct values to the enviroment variables so adonis-rabbit can connect.
import Rabbit from '@ioc:Adonis/Addons/Rabbit'
import Route from '@ioc:Adonis/Core/Route'
Route.get('/', async () => {
// Ensures the queue exists
await Rabbit.assertQueue('my_queue')
// Sends a message to the queue
await Rabbit.sendToQueue('my_queue', 'This message was sent by adonis-rabbit')
})Notice doesn't really makes sense to subscribe to an queue inside a controller, usually this is done through a preload file.
- In the CLI, type:
node ace make:prldfile rabbit - Select
( ) During HTTP server
This is will create start/rabbit.ts.
Inside start/rabbit.ts:
import Rabbit from '@ioc:Adonis/Addons/Rabbit'
async function listen() {
await Rabbit.assertQueue('my_queue')
await Rabbit.consumeFrom('my_queue', (message) => {
console.log(message.content)
})
}
listen()This will log every message sent to my queue my_queue.
import Rabbit from '@ioc:Adonis/Addons/Rabbit'await Rabbit.assertQueue('myQueue')Assert the queue is created.
Parameters:
queueName: the name of the queueoptions?: the queue options
await Rabbit.assertExchange('myQueue', 'type')Assert the exchange is created.
Parameters:
queueName: the name of the queuetype: the type of the exchangeoptions?: the queue options
await Rabbit.bindQueue('myQueue', 'myExchange', '')Binds a queue and an exchange .
queueName: the name of the queueexchangeName: the name of the exchangepattern?: the pattern (default to'')
await Rabbit.sendToQueue('myQueue', 'content')Parameters:
queueName: the name of the queuecontent: the content to be send to the queueoptions: the options
Notice that the content parameter don't need to be a Buffer, Adonis RabbitMQ will automatically convert it to a Buffer if it isn't already.
You also don't have to JSON.stringify an object, Adonis RabbitMQ will also do that for you (it'll be transformed to JSON then to Buffer).
await Rabbit.sendToExchange('myExchange', 'myRoutingKey', 'content')Parameters:
exchangeName: the name of the exchangeroutingKey: the routing keycontent: the content to send to the exchangeoptions: the options
Notice that the content parameter doesn't need to be a Buffer, Adonis RabbitMQ will automatically convert it to a Buffer if it is'nt already.
You also don't have to JSON.stringify an object, Adonis RabbitMQ will also do that for you (it'll be transformed to JSON then to Buffer).
await Rabbit.consumeFrom('myQueue', (message) => {
console.log(message.content)
message.ack()
})Consumes a message from a queue.
queueName: the name of the queueonMessagethe callback which will be executed on the message receive.
The onMessage callback receives a Message instance as parameter.
await Rabbit.ackAll()Acknowledges all the messages.
await Rabbit.nackAll()Rejects all the messages.
Parameters:
requeue?adds the rejected messages to queue again.
Retrieves the amqplib's Connection instance. If there`s not a connection, it'll be created.
await Rabbit.getConnection()Retrieves the amqplib's Connection instance. If there`s not a connection, it'll be created.
await Rabbit.getConnection()Retrieves the amqplib's Channel instance. If there's not a connection, it'll be created. If there`s not a channel, it'll be created too.
await Rabbit.getChannel()Closes the channel.
Closes the connection.
When consuming messages through consumeFrom, you'll receive in the callback a Message instance.
This slightly different from amqplib approach. For example:
Rabbit.consumeFrom('queue', (message) => {
// Acknowledges the message
message.ack()
// Rejects the message
message.reject()
// The message content
console.log(message.content)
// If you're expecting a JSON, this will return the parsed message
console.log(message.jsonContent)
})message.contentReturns the message content.
message.jsonContentIf the message is expected to be in JSON format, then you can use message.jsonContent to get the message parsed as an object.
message.fieldsThe message fields.
message.propertiesThe message properties.
message.ack()Acknowledges the message.
allUpTo?acknowledges all the messages up to this.
message.nack()Rejects the message.
Parameters:
allUpTo?rejects all the messages up to this.requeue?adds the rejected messages to Queue again.
message.nack()Rejects the message, equivalent to nack, but works in older versions of RabbitMQ where nack does not.
Parameters:
requeue?adds the rejected messages to Queue again.
- Add SSL options in
config/rabbit.ts - Tests