Bus

RabbitMQ bus module built with appolo-rabbit

Installation#

npm i @appolo/bus

Options#

keyDescriptionTypeDefault
idinjection idstringbusProvider
connectionAMQP connection stringstringnull
autoListentrue to auto initialize busProvider and start listen to eventsbooleantrue
handleEventstrue to register queue event handlersbooleantrue
exchangename of the exchange or exchange optionsstring{}
queuequeue optionsobject{}
requestQueuerequest queue optionsobject{}
replayQueuerequest queue optionsobject{}
appendEnvappend env name to queueName and exchangeNamebooleantrue

Exchange Options#

keyDescriptionTypeDefault
typerequest queue options or false to disablestringtopic
autoDeletedelete when consumer count goes to 0booleanfalse
durablesurvive broker restartsbooleantrue
persistentpersistent delivery, messages saved to diskbooleantrue
alternatedefine an alternate exchangestring
publishTimeouttimeout in milliseconds for publish calls to this exchange2^32
replyTimeouttimeout in milliseconds to wait for a reply2^32
limitthe number of unpublished messages to cache while waiting on connection2^16
noConfirmprevents rabbot from creating the exchange in confirm modebooleanfalse

Queue Options#

keyDescriptionTypeDefault
autoDeletedelete when consumer count goes to 0booleanfalse
durablesurvive broker restartsbooleantrue
subscribeauto-start the subscriptionbooleanfalse
limitmax number of unacked messages allowed for consumer2^161
noAckthe server will remove messages from the queue as soon as they are deliveredbooleanfalse
noBatchcauses ack, nack & reject to take place immediatelybooleanfalse
noCacheKeysdisable cache of matched routing keys to prevent unbounded memory growthbooleanfalse
queueLimitmax number of ready messages a queue can hold2^32
messageTttime in ms before a message expires on the queue2^32
expirestime in ms before a queue with 0 consumers expires2^32

in config/modules/all.ts

import {BusModule} from '@appolo/bus';
export = async function (app: App) {
app.module.use(BusModule.for({
connection:"amqp://connection-string",
exhcnage:"exhcnage",
queue:"someQueue"
}));
}

Usage#

Publisher#

we inject BusProvider in order to publish messages

import {define, singleton,inject} from '@appolo/inject'
import {BusProvider} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@inject() busProvider:BusProvider
publish(data:any): Promise<any> {
return this.busProvider.publish({routingKey:"test",data})
}
}

Handler#

if you don not call msg ack or nack it will be called on handler return msg.ack() or msg.nack() on error

import {define, singleton} from '@appolo/inject'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
@handler("test")
handle(msg: IMessage<data>) {
//do something
}
@handler("someName")
handle(msg: IMessage<data>) {
try{
//do some thing
msg.ack();
}
catch(){
msg.nack();
}
}
}

Request#

we can await a response and set expire timout if timeout reached timeout error will be thrown

import {define, singleton} from '@appolo/bus'
import {BusProvider} from "@appolo/bus";
@define()
@singleton()
export class SomePublisher {
@inject() busProvider:BusProvider
public async getData(params:any): Promise<any> {
let data = await this.busProvider.request({
routingKey:"test",
params,
expire:5000})
return data;
}
}

Reply#

we define reply answer handler

import {define, singleton} from '@appolo/inject'
import {handler} from "@appolo/bus";
@define()
@singleton()
export class SomeHandler {
@reply("test")
handle(msg: IMessage<data>) {
return {userId:1}
}
// or reply methods
@reply("someName")
handle(msg: IMessage<data>) {
try{
//get some data
msg.replySuccess(msg,{userId:1})
}
catch(){
msg.replyError(msg,e)
}
}
}

IMessage#

each handler and reply handler called with message object

{
// metadata specific to routing & delivery
fields: {
consumerTag: "", // identifies the consumer to rabbit
deliveryTag: #, // identifies the message delivered for rabbit
redelivered: true|false, // indicates if the message was previously nacked or returned to the queue
exchange: "" // name of exchange the message was published to,
routingKey: "" // the routing key (if any) used when published
},
properties:{
contentType: "application/json", // see serialization for how defaults are determined
contentEncoding: "utf8", // rabbot's default
headers: {}, // any user provided headers
correlationId: "", // the correlation id if provided
replyTo: "", // the reply queue would go here
messageId: "", // message id if provided
type: "", // the type of the message published
appId: "" // not used by rabbot
},
content: { "type": "Buffer", "data": [ ... ] }, // raw buffer of message body
body: , // this could be an object, string, etc - whatever was published
type: "" // this also contains the type of the message published
}

ack#

message.ack()#

Enqueues the message for acknowledgement.

reject#

message.nack()#

Enqueues the message for rejection. This will re-enqueue the message.

reject#

message.reject()#

Rejects the message without re-queueing it. Please use with caution and consider having a dead-letter-exchange assigned to the queue before using this feature.

reply#

reply( data:any )#

Acknowledges the messages and sends the message back to the requestor.

replySuccess#

replySuccess( data:T )#

reply the message with json object {success: true,data}

replyError#

message.replyError( e: RequestError<T> )#

reply the message with json object {success: false,message: e.message, data:e.data}

BusProvider#

initialize#

initialize()#

initialize busProvider and start listen to events if not in in auto mode

publish#

publish(type: string, data: any, expire?: number): Promise<void>#

publish event

  • type - event name
  • data - any data
  • expire - timeout until the message is expired in the queue

request#

request<T>(type: string, data: any, expire?: number): Promise<T>#

request data by event return promise with event response

  • type - event name
  • data - any data
  • expire - timeout until the request is rejected

close#

close<T>(): Promise<void>#

close the connection and clean all handlers

getQueueMessagesCount#

getQueueMessagesCount(): Promise<number>#

return number of pending events in the queue