0.1.0
Extends EventEmitter
(Object?
= {}
)
Name | Description |
---|---|
options.url string?
|
URL of the form
amqp[s]://[user:password@]hostname[:port][/vhost]
|
options.retryTimes number
(default 5 )
|
The number of attempts to make before giving up. |
options.poolMaxChannels Object
(default 16 )
|
Maximum number of pooled channel |
options.poolMaxConfirmChannels Object
(default 16 )
|
Maximum number of pooled confirm channel |
options.configuration Object?
|
(function (message: AmqpMessage, cb))
stream.Transform
:
const sub = amqp.subscriber('rpc_queue');
const pub = amqp.publisher({ contentType: 'text/plain' });
const transformer = AmqpClient.transformer((message, callback) => {
const n = Number(message.content);
callback(null, fibonacci(n).toString());
});
sub.pipe(transformer).pipe(pub);
publisher(queue, options) publisher(exchange, routingKey, options)
(string?)
(string?)
(Object?)
Options fields are divided into those that have some meaning to RabbitMQ and those that
will be ignored by RabbitMQ but passed on to consumers. Options may be omitted altogether,
in which case defaults as noted will apply.
Name | Description |
---|---|
options.confirmation string
(default false )
|
If true, the confirm channel will be acquired from the pool
Used by RabbitMQ and sent on to consumers: |
options.expiration number?
|
If supplied, the message will be discarded from a queue once it's been there longer than the given number of milliseconds. |
options.expirationMs number?
|
Alias for expiration |
options.userId string?
|
If supplied, RabbitMQ will compare it to the username supplied when opening the connection, and reject messages for which it does not match. |
options.CC (string | Array<string>)?
|
Messages will be routed to these routing keys in addition to that given as the routingKey parameter. A string will be implicitly treated as an array containing just that string. This will override any value given for CC in the headers parameter. |
options.BCC (string | Array<string>)?
|
Like CC, except that the value will not be sent in the message headers to consumers. |
options.priority number?
|
A priority for the message; ignored by versions of RabbitMQ older than 3.5.0, or if the queue is not a priority queue. |
options.persistent boolean
(default false )
|
If truthy, the message will survive broker restarts provided it's in a queue that also survives restarts. |
options.mandatory boolean
(default false )
|
If true, the message will be returned if it is not routed to a queue
(i.e., if there are no bindings that match its routing key).
Not used by RabbitMQ and not sent to consumers: |
options.immediate boolean?
|
In the specification, this instructs the server to return the message if it is not able to be sent immediately to a consumer. No longer implemented in RabbitMQ, and if true, will provoke a channel error, so it's best to leave it out. |
options.contentType string?
|
A MIME type for the message content. |
options.contentEncoding string?
|
A MIME encoding for the message content. |
options.headers Object?
|
Application specific headers to be carried along with the message content. The value as sent may be augmented by extension-specific fields if they are given in the parameters, for example, 'CC', since these are encoded as message headers; the supplied value won't be mutated. |
options.correlationId string?
|
Usually used to match replies to requests, or similar. |
options.replyTo (AmqpStreamSubscriber | string)?
|
Often used to name a queue to which the receiving application must send replies, in an RPC scenario (many libraries assume this pattern). |
options.messageId string?
|
Arbitrary application-specific identifier for the message. |
options.timestamp number?
|
A timestamp for the message. |
options.type string?
|
An arbitrary application-specific type for the message. |
options.appId string?
|
An arbitrary identifier for the originating application. |
stream.Writable
:
// without options and callback
publish('foo', 'bar', 'message')
// without options
publish('foo', 'bar', 'message', (err) => {})
// without callback
publish('foo', 'bar', 'message', {expirationMs: 15})
// with options and callback
publish('foo', 'bar', 'message', {expirationMs: 15}, (err) => {})
subscriber(queue, options) Readable, stream subscribe on messages from queue subscriber(exchange, routingKey, options) Readable, create exclusive queue and bind it to the exchange If incoming message has replyTo property
NB RabbitMQ v3.3.0 changes the meaning of prefetch (basic.qos) to apply per-consumer, rather than per-channel. It will apply to consumers started after the method is called. See rabbitmq-prefetch.
Use the global
flag to get the per-channel behaviour. To keep life interesting, using the global
flag
with an RabbitMQ older than v3.3.0 will bring down the whole connection.
(string?)
(string)
(Object?)
Name | Description |
---|---|
options.consumerTag string?
|
|
options.noLocal boolean
(default false )
|
|
options.noAck boolean
(default false )
|
|
options.exclusive boolean
(default false )
|
|
options.priority number?
|
|
options.arguments object?
|
|
options.prefetch string?
|
Set the prefetch
count
for this channel. The
count
given is the maximum number of messages
sent over the channel that can be awaiting acknowledgement; once there are
count
messages outstanding,
the server will not send more messages on this channel until one or more have been acknowledged.
A falsey value for
count
indicates no such limit.
|
options.global string
(default false )
|
Use the
global
flag to get the per-channel prefetch behaviour. To keep life interesting,
using the
global
flag with an RabbitMQ older than v3.3.0 will bring down the whole connection.
|
stream.Readable
:
Assert a queue into existence. This operation is idempotent given identical arguments; however, it will bork the channel if the queue already exists but has different properties (values supplied in the arguments field may or may not count for borking purposes).
(string)
Queue name, if you supply an empty string or other falsey value (including null and undefined),
the server will create a random name for you.
(Object?)
Name | Description |
---|---|
options.exclusive boolean
(default false )
|
If true, scopes the queue to the connection |
options.durable boolean
(default true )
|
If true, the queue will survive broker restarts, modulo the effects of exclusive and autoDelete |
options.autoDelete boolean
(default false )
|
If true, the queue will be deleted when the number of consumers drops to zero. |
options.arguments Object?
|
Additional arguments, usually parameters for some kind of broker-specific extension e.g., high availability, TTL. |
options.messageTtl number?
|
Expires messages arriving in the queue after n milliseconds |
options.expires number?
|
The queue will be destroyed after n milliseconds of disuse |
options.deadLetterExchange string?
|
An exchange to which messages discarded from the queue will be resent. A message is discarded when it expires or is rejected or nacked, or the queue limit is reached. |
options.maxLength number?
|
Sets a maximum number of messages the queue will hold. Old messages will be discarded (dead-lettered if that's set) to make way for new messages. |
options.maxPriority number?
|
Makes the queue a priority queue. |
(function (err, {queue, messageCount, consumerCount})?
= noopThrowIfError
)
Check whether a queue exists. This will bork the channel if the named queue doesn't exist; if it does exist, you go through to the next round!
(any)
(any)
AmqpClient
:
Delete the queue named. Naming a queue that doesn't exist will result in the server closing the channel, to teach you a lesson (except in RabbitMQ version 3.2.0 and after1).
Note the obverse semantics of the options: if both are true, the queue will be deleted only if it has no consumers and no messages. You should leave out the options altogether if you want to delete the queue unconditionally.
The server reply contains a single field, messageCount, with the number of messages deleted or dead-lettered along with the queue.
(string)
(Object?)
Name | Description |
---|---|
options.ifUnused boolean
(default false )
|
If true and the queue has consumers, it will not be deleted and the channel will be closed. |
options.ifEmpty boolean
(default false )
|
If true and the queue contains messages, the queue will not be deleted and the channel will be closed. |
(function (err, {messageCount})?
= noopThrowIfError
)
this
:
Remove all undelivered messages from the queue named. Note that this won't remove messages that have been delivered but not yet acknowledged; they will remain, and may be requeued under some circumstances (e.g., if the channel to which they were delivered closes without acknowledging them).
The server reply contains a single field, messageCount, containing the number of messages purged from the queue.
(any)
(function (err, {messageCount})?
= noopThrowIfError
)
this
:
Assert a routing path from an exchange to a queue: the exchange named by source
will relay
messages to the queue
named, according to the type of the exchange and the pattern
given.
Pattern is a list of words, delimited by dots. The words can be anything, but usually they specify
some features connected to the message.
There are two important special cases for binding keys:
* (star)
can substitute for exactly one word,# (hash)
can substitute for zero or more words.A few valid routing key examples: "stock.usd.nyse"
, "quick.orange.rabbit"
, "*.*.rabbit"
.
Remove a routing path between the queue named and the exchange named as source with the pattern and arguments given.
Assert an exchange into existence. As with queues, if the exchange exists already and has properties different to those supplied, the channel will 'splode; fields in the arguments object may or may not be 'splodey, depending on the type of exchange. Unlike queues, you must supply a name, and it can't be the empty string. You must also supply an exchange type, which determines how messages will be routed through the exchange.
"direct"
| "fanout"
| "topic"
| "headers"
), options: Object?, cb: function)(string)
Exchange name
(("direct"
| "fanout"
| "topic"
| "headers"
)
= 'direct'
)
Exchange type
(Object?)
Name | Description |
---|---|
options.durable boolean
(default true )
|
If true, the exchange will survive broker restarts. |
options.internal boolean
(default false )
|
If true, messages cannot be published directly to the exchange (i.e., it can only be the target of bindings, or possibly create messages ex-nihilo). |
options.autoDelete boolean
(default false )
|
If true, the exchange will be destroyed once the number of bindings for which it is the source drop to zero. |
options.alternateExchange string?
|
An exchange to send messages to if this exchange can't route them to any queues. |
options.arguments Object?
|
Any additional arguments that may be needed by an exchange type. |
(function
= noopThrowIfError
)
Delete exchange, if the exchange does not exist, a channel error is raised (RabbitMQ version 3.2.0 and after will not raise an error1).
Bind an exchange to another exchange. The exchange named by destination will receive messages from the exchange named by source, according to the type of the source and the pattern given. For example, a direct exchange will relay messages that have a routing key equal to the pattern.
NB Exchange to exchange binding is a RabbitMQ extension.
Remove a binding from an exchange to another exchange. A binding with the exact source exchange, destination exchange, routing key pattern, and extension args will be removed. If no such binding exists, it's – you guessed it – a channel error, except in RabbitMQ >= version 3.2.0, for which it succeeds trivially 1.
Ask a queue for a message, as an RPC. This will be resolved with either false, if there is no message to be had (the queue has no messages ready), or a message.
Acknowledge all outstanding messages on the channel. This is a "safe" operation, in that it won't result in an error even if there are no such messages.
(any)
Reject all messages outstanding on this channel. If requeue is truthy, or omitted, the server will try to re-enqueue the messages.
(any)
Requeue unacknowledged messages on this channel. The server will reply (with an empty object) once all messages are requeued.
(function (err)?
= noopThrowIfError
)
AMQP message fields
Type: Object
(string)
: Identifies the consumer for which the message is destined
(number)
: A serial number for the message
(boolean)
: If true indicates that this message has been delivered before and been handed back to the server
(e.g., by a nack or recover operation).
(string)
: Name of exchange the message was published to
(string)
: The routing key (if any) used when published
AMQP message properties
Type: Object
(string)
: A MIME type for the message content.
(string)
: A MIME encoding for the message content.
(Object)
: Any user provided headers
(string)
(string)
(string)
(string)
(string)
(string)
(string)
(string)
(string)
(string)
(string)
AMQP message
Type: Object
(AmqpMessageFields)
(AmqpMessageProperties)
(Buffer)
(function)
(function)