new AmqpClient(url: string?, options: Object?)

Extends EventEmitter

Parameters
url (string?) URL of the form amqp[s]://[user:password@]hostname[:port][/vhost]
options (Object? = {})
Name Description
options.url string? URL of the form amqp[s]://[user:password@]hostname[:port][/vhost]
options.retryTimes number (default 5) The number of attempts to make before giving up.
options.poolMaxChannels Object (default 16) Maximum number of pooled channel
options.poolMaxConfirmChannels Object (default 16) Maximum number of pooled confirm channel
options.configuration Object?
Static Members
transformer(transformer)

transformer(transformer: function (message: AmqpMessage, cb)): stream.Transform
Parameters
transformer (function (message: AmqpMessage, cb))
Returns
stream.Transform:
Example
const sub = amqp.subscriber('rpc_queue');
const pub = amqp.publisher({ contentType: 'text/plain' });
const transformer = AmqpClient.transformer((message, callback) => {
    const n = Number(message.content);
    callback(null, fibonacci(n).toString());
});
sub.pipe(transformer).pipe(pub);
Instance Members
publisher(exchange?, routingKey?, options?)

publisher(queue, options) publisher(exchange, routingKey, options)

publisher(exchange: string?, routingKey: string?, options: Object?): stream.Writable
Parameters
exchange (string?)
routingKey (string?)
options (Object?) Options fields are divided into those that have some meaning to RabbitMQ and those that will be ignored by RabbitMQ but passed on to consumers. Options may be omitted altogether, in which case defaults as noted will apply.
Name Description
options.confirmation string (default false) If true, the confirm channel will be acquired from the pool

Used by RabbitMQ and sent on to consumers:

options.expiration number? If supplied, the message will be discarded from a queue once it's been there longer than the given number of milliseconds.
options.expirationMs number? Alias for expiration
options.userId string? If supplied, RabbitMQ will compare it to the username supplied when opening the connection, and reject messages for which it does not match.
options.CC (string | Array<string>)? Messages will be routed to these routing keys in addition to that given as the routingKey parameter. A string will be implicitly treated as an array containing just that string. This will override any value given for CC in the headers parameter.
options.BCC (string | Array<string>)? Like CC, except that the value will not be sent in the message headers to consumers.
options.priority number? A priority for the message; ignored by versions of RabbitMQ older than 3.5.0, or if the queue is not a priority queue.
options.persistent boolean (default false) If truthy, the message will survive broker restarts provided it's in a queue that also survives restarts.
options.mandatory boolean (default false) If true, the message will be returned if it is not routed to a queue (i.e., if there are no bindings that match its routing key).

Not used by RabbitMQ and not sent to consumers:

options.immediate boolean? In the specification, this instructs the server to return the message if it is not able to be sent immediately to a consumer. No longer implemented in RabbitMQ, and if true, will provoke a channel error, so it's best to leave it out.
options.contentType string? A MIME type for the message content.
options.contentEncoding string? A MIME encoding for the message content.
options.headers Object? Application specific headers to be carried along with the message content. The value as sent may be augmented by extension-specific fields if they are given in the parameters, for example, 'CC', since these are encoded as message headers; the supplied value won't be mutated.
options.correlationId string? Usually used to match replies to requests, or similar.
options.replyTo (AmqpStreamSubscriber | string)? Often used to name a queue to which the receiving application must send replies, in an RPC scenario (many libraries assume this pattern).
options.messageId string? Arbitrary application-specific identifier for the message.
options.timestamp number? A timestamp for the message.
options.type string? An arbitrary application-specific type for the message.
options.appId string? An arbitrary identifier for the originating application.
Returns
stream.Writable:
Example
// without options and callback
     publish('foo', 'bar', 'message')
     // without options
     publish('foo', 'bar', 'message', (err) => {})
     // without callback
     publish('foo', 'bar', 'message', {expirationMs: 15})
     // with options and callback
     publish('foo', 'bar', 'message', {expirationMs: 15}, (err) => {})
subscriber(exchange?, routingKey, options?)

subscriber(queue, options) Readable, stream subscribe on messages from queue subscriber(exchange, routingKey, options) Readable, create exclusive queue and bind it to the exchange If incoming message has replyTo property

NB RabbitMQ v3.3.0 changes the meaning of prefetch (basic.qos) to apply per-consumer, rather than per-channel. It will apply to consumers started after the method is called. See rabbitmq-prefetch.

Use the global flag to get the per-channel behaviour. To keep life interesting, using the global flag with an RabbitMQ older than v3.3.0 will bring down the whole connection.

subscriber(exchange: string?, routingKey: string, options: Object?): stream.Readable
Parameters
exchange (string?)
routingKey (string)
options (Object?)
Name Description
options.consumerTag string?
options.noLocal boolean (default false)
options.noAck boolean (default false)
options.exclusive boolean (default false)
options.priority number?
options.arguments object?
options.prefetch string? Set the prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged. A falsey value for count indicates no such limit.
options.global string (default false) Use the global flag to get the per-channel prefetch behaviour. To keep life interesting, using the global flag with an RabbitMQ older than v3.3.0 will bring down the whole connection.
Returns
stream.Readable:
close(cb)

Close AMQP connection

close(cb: function)
Parameters
cb (function = noopThrowIfError)
assertQueue(queue, options?, cb = noopThrowIfError)

Assert a queue into existence. This operation is idempotent given identical arguments; however, it will bork the channel if the queue already exists but has different properties (values supplied in the arguments field may or may not count for borking purposes).

assertQueue(queue: string, options: Object?, cb: function (err, {queue, messageCount, consumerCount})?)
Parameters
queue (string) Queue name, if you supply an empty string or other falsey value (including null and undefined), the server will create a random name for you.
options (Object?)
Name Description
options.exclusive boolean (default false) If true, scopes the queue to the connection
options.durable boolean (default true) If true, the queue will survive broker restarts, modulo the effects of exclusive and autoDelete
options.autoDelete boolean (default false) If true, the queue will be deleted when the number of consumers drops to zero.
options.arguments Object? Additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL.
options.messageTtl number? Expires messages arriving in the queue after n milliseconds
options.expires number? The queue will be destroyed after n milliseconds of disuse
options.deadLetterExchange string? An exchange to which messages discarded from the queue will be resent. A message is discarded when it expires or is rejected or nacked, or the queue limit is reached.
options.maxLength number? Sets a maximum number of messages the queue will hold. Old messages will be discarded (dead-lettered if that's set) to make way for new messages.
options.maxPriority number? Makes the queue a priority queue.
cb (function (err, {queue, messageCount, consumerCount})? = noopThrowIfError)
checkQueue(queue, cb)

Check whether a queue exists. This will bork the channel if the named queue doesn't exist; if it does exist, you go through to the next round!

checkQueue(queue: any, cb: any): AmqpClient
Parameters
queue (any)
cb (any)
Returns
AmqpClient:
deleteQueue(queue, options?, cb = noopThrowIfError)

Delete the queue named. Naming a queue that doesn't exist will result in the server closing the channel, to teach you a lesson (except in RabbitMQ version 3.2.0 and after1).

Note the obverse semantics of the options: if both are true, the queue will be deleted only if it has no consumers and no messages. You should leave out the options altogether if you want to delete the queue unconditionally.

The server reply contains a single field, messageCount, with the number of messages deleted or dead-lettered along with the queue.

deleteQueue(queue: string, options: Object?, cb: function (err, {messageCount})?): this
Parameters
queue (string)
options (Object?)
Name Description
options.ifUnused boolean (default false) If true and the queue has consumers, it will not be deleted and the channel will be closed.
options.ifEmpty boolean (default false) If true and the queue contains messages, the queue will not be deleted and the channel will be closed.
cb (function (err, {messageCount})? = noopThrowIfError)
Returns
this:
purgeQueue(queue, cb = noopThrowIfError)

Remove all undelivered messages from the queue named. Note that this won't remove messages that have been delivered but not yet acknowledged; they will remain, and may be requeued under some circumstances (e.g., if the channel to which they were delivered closes without acknowledging them).

The server reply contains a single field, messageCount, containing the number of messages purged from the queue.

purgeQueue(queue: any, cb: function (err, {messageCount})?): this
Parameters
queue (any)
cb (function (err, {messageCount})? = noopThrowIfError)
Returns
this:
bindQueue(queue, source, pattern, argt?, cb = noopThrowIfError)

Assert a routing path from an exchange to a queue: the exchange named by source will relay messages to the queue named, according to the type of the exchange and the pattern given. Pattern is a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message.

There are two important special cases for binding keys:

  • * (star) can substitute for exactly one word,
  • # (hash) can substitute for zero or more words.

A few valid routing key examples: "stock.usd.nyse", "quick.orange.rabbit", "*.*.rabbit".

bindQueue(queue: string, source: string, pattern: string, argt: Object?, cb: function?)
Parameters
queue (string) The queue name
source (string) The exchange name
pattern (string) The routing key pattern
argt (Object?) Is an object containing extra arguments that may be required for the particular exchange type.
cb (function? = noopThrowIfError)
unbindQueue(queue, source, pattern, argt?, cb = noopThrowIfError)

Remove a routing path between the queue named and the exchange named as source with the pattern and arguments given.

unbindQueue(queue: string, source: string, pattern: string, argt: Object?, cb: function?)
Parameters
queue (string) The queue name
source (string) The exchange name
pattern (string) The routing key pattern
argt (Object?) Is an object containing extra arguments that may be required for the particular exchange type.
cb (function? = noopThrowIfError)
assertExchange(exchange, type, options?, cb)

Assert an exchange into existence. As with queues, if the exchange exists already and has properties different to those supplied, the channel will 'splode; fields in the arguments object may or may not be 'splodey, depending on the type of exchange. Unlike queues, you must supply a name, and it can't be the empty string. You must also supply an exchange type, which determines how messages will be routed through the exchange.

assertExchange(exchange: string, type: ("direct" | "fanout" | "topic" | "headers"), options: Object?, cb: function)
Parameters
exchange (string) Exchange name
type (("direct" | "fanout" | "topic" | "headers") = 'direct') Exchange type
options (Object?)
Name Description
options.durable boolean (default true) If true, the exchange will survive broker restarts.
options.internal boolean (default false) If true, messages cannot be published directly to the exchange (i.e., it can only be the target of bindings, or possibly create messages ex-nihilo).
options.autoDelete boolean (default false) If true, the exchange will be destroyed once the number of bindings for which it is the source drop to zero.
options.alternateExchange string? An exchange to send messages to if this exchange can't route them to any queues.
options.arguments Object? Any additional arguments that may be needed by an exchange type.
cb (function = noopThrowIfError)
checkExchange(exchange, cb)

Check that an exchange exists. If it doesn't exist, the channel will be closed with an error. If it does exist, happy days.

checkExchange(exchange: string, cb: function (err))
Parameters
exchange (string)
cb (function (err) = noopThrowIfError)
deleteExchange(name, options, cb)

Delete exchange, if the exchange does not exist, a channel error is raised (RabbitMQ version 3.2.0 and after will not raise an error1).

deleteExchange(name: any, options: any, cb: any)
Parameters
name (any)
options (any)
Name Description
options.ifUnused boolean?
cb (any)
bindExchange(destination, source, pattern, args, cb)

Bind an exchange to another exchange. The exchange named by destination will receive messages from the exchange named by source, according to the type of the source and the pattern given. For example, a direct exchange will relay messages that have a routing key equal to the pattern.

NB Exchange to exchange binding is a RabbitMQ extension.

bindExchange(destination: string, source: string, pattern: string, args: any, cb: any)
Parameters
destination (string) The destination exchange
source (string) The source exchange
pattern (string) The routing key pattern
args (any)
cb (any)
unbindExchange(destination, source, pattern, args, cb)

Remove a binding from an exchange to another exchange. A binding with the exact source exchange, destination exchange, routing key pattern, and extension args will be removed. If no such binding exists, it's – you guessed it – a channel error, except in RabbitMQ >= version 3.2.0, for which it succeeds trivially 1.

unbindExchange(destination: string, source: string, pattern: string, args: any, cb: any)
Parameters
destination (string) The destination exchange
source (string) The source exchange
pattern (string) The routing key pattern
args (any)
cb (any)
get(queue, options?, cb)

Ask a queue for a message, as an RPC. This will be resolved with either false, if there is no message to be had (the queue has no messages ready), or a message.

get(queue: string, options: Object?, cb: any)
Parameters
queue (string)
options (Object?)
cb (any = noopThrowIfError)
ackAll(cb)

Acknowledge all outstanding messages on the channel. This is a "safe" operation, in that it won't result in an error even if there are no such messages.

ackAll(cb: any)
Parameters
cb (any)
nackAll(cb)

Reject all messages outstanding on this channel. If requeue is truthy, or omitted, the server will try to re-enqueue the messages.

nackAll(cb: any)
Parameters
cb (any)
recover(cb = noopThrowIfError)

Requeue unacknowledged messages on this channel. The server will reply (with an empty object) once all messages are requeued.

recover(cb: function (err)?)
Parameters
cb (function (err)? = noopThrowIfError)

AmqpMessageFields

lib/typedefs.js

AMQP message fields

AmqpMessageFields

Type: Object

Properties
consumerTag (string) : Identifies the consumer for which the message is destined
deliveryTag (number) : A serial number for the message
redelivered (boolean) : If true indicates that this message has been delivered before and been handed back to the server (e.g., by a nack or recover operation).
exchange (string) : Name of exchange the message was published to
routingKey (string) : The routing key (if any) used when published

AmqpMessageProperties

lib/typedefs.js

AMQP message properties

AmqpMessageProperties

Type: Object

Properties
contentType (string) : A MIME type for the message content.
contentEncoding (string) : A MIME encoding for the message content.
headers (Object) : Any user provided headers
deliveryMode (string)
priority (string)
correlationId (string)
replyTo (string)
expiration (string)
messageId (string)
timestamp (string)
type (string)
userId (string)
appId (string)
clusterId (string)

AmqpMessage

lib/typedefs.js

AMQP message

AmqpMessage

Type: Object

Properties
properties (AmqpMessageProperties)
content (Buffer)
ack (function)
nack (function)