Bus
RabbitMQ bus module built with appolo-rabbit
#
Installation#
Optionskey | Description | Type | Default |
---|---|---|---|
id | injection id | string | busProvider |
connection | AMQP connection string | string | null |
autoListen | true to auto initialize busProvider and start listen to events | boolean | true |
handleEvents | true to register queue event handlers | boolean | true |
exchange | name of the exchange or exchange options | string | {} |
queue | queue options | object | {} |
requestQueue | request queue options | object | {} |
replayQueue | request queue options | object | {} |
appendEnv | append env name to queueName and exchangeName | boolean | true |
#
Exchange Optionskey | Description | Type | Default |
---|---|---|---|
type | request queue options or false to disable | string | topic |
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
persistent | persistent delivery, messages saved to disk | boolean | true |
alternate | define an alternate exchange | string | |
publishTimeout | timeout in milliseconds for publish calls to this exchange | 2^32 | |
replyTimeout | timeout in milliseconds to wait for a reply | 2^32 | |
limit | the number of unpublished messages to cache while waiting on connection | 2^16 | |
noConfirm | prevents rabbot from creating the exchange in confirm mode | boolean | false |
#
Queue Optionskey | Description | Type | Default |
---|---|---|---|
autoDelete | delete when consumer count goes to 0 | boolean | false |
durable | survive broker restarts | boolean | true |
subscribe | auto-start the subscription | boolean | false |
limit | max number of unacked messages allowed for consumer | 2^16 | 1 |
noAck | the server will remove messages from the queue as soon as they are delivered | boolean | false |
noBatch | causes ack, nack & reject to take place immediately | boolean | false |
noCacheKeys | disable cache of matched routing keys to prevent unbounded memory growth | boolean | false |
queueLimit | max number of ready messages a queue can hold | 2^32 | |
messageTt | time in ms before a message expires on the queue | 2^32 | |
expires | time in ms before a queue with 0 consumers expires | 2^32 |
in config/modules/all.ts
#
Usage#
Publisherwe inject BusProvider in order to publish messages
#
Handlerif you don not call msg ack or nack
it will be called on handler return msg.ack()
or msg.nack()
on error
#
Requestwe can await a response and set expire timout if timeout reached timeout error will be thrown
#
Replywe define reply answer handler
#
IMessageeach handler and reply handler called with message object
#
ackmessage.ack()
#
Enqueues the message for acknowledgement.
#
rejectmessage.nack()
#
Enqueues the message for rejection. This will re-enqueue the message.
#
rejectmessage.reject()
#
Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.
#
replyreply( data:any )
#
Acknowledges the messages and sends the message back to the requestor.
#
replySuccessreplySuccess( data:T )
#
reply the message with json object {success: true,data}
#
replyErrormessage.replyError( e: RequestError<T> )
#
reply the message with json object {success: false,message: e.message, data:e.data}
#
BusProvider#
initializeinitialize()
#
initialize busProvider and start listen to events if not in in auto
mode
#
publishpublish(type: string, data: any, expire?: number): Promise<void>
#
publish event
- type - event name
- data - any data
- expire - timeout until the message is expired in the queue
#
requestrequest<T>(type: string, data: any, expire?: number): Promise<T>
#
request data by event return promise with event response
- type - event name
- data - any data
- expire - timeout until the request is rejected
#
closeclose<T>(): Promise<void>
#
close the connection and clean all handlers
#
getQueueMessagesCountgetQueueMessagesCount(): Promise<number>
#
return number of pending events in the queue