JavaScript NSQ client WIP.
- actually written in js :p
- easier debugging via debug() instrumentation
- native json message support
- does not arbitrarily apply backoff on requeues
- disabling of auto-RDY support for manual control (high throughput etc)
- reconnection to dead nsqd nodes
- graceful close support
$ npm install nsq.js
The DEBUG environment variable can be used to enable traces within the module, for example all nsq debug() calls except fo the framer:
$ DEBUG=nsq*,-nsq:framer node test
nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms
nsq:connection connect: 0.0.0.0:4150 V2 +0ms
nsq:connection command: IDENTIFY null +2ms
nsq:connection command: SUB ["events","ingestion"] +1ms
nsq:connection command: RDY [5] +0ms
nsq:connection connect: undefined:4150 V2 +0ms
nsq:connection command: IDENTIFY null +1ms
nsq:connection command: PUB ["events"] +0ms
nsq:reconnect reset backoff +0ms
nsq:reconnect reset backoff +1ms
nsq:connection response OK +3ms
nsq:connection response OK +0ms
nsq:connection response OK +0ms
The NSQD documentation recommends applying backoff when requeueing implying that the consumer is faulty, IMO this is a weird default, and the opposite of what we need so it's not applied in this client.
var nsq = require('nsq.js');
// subscribe
var reader = nsq.reader({
nsqd: [':4150'],
maxInFlight: 1,
maxAttempts: 5,
topic: 'events',
channel: 'ingestion'
});
reader.on('error', function(err){
console.log(err.stack);
});
reader.on('message', function(msg){
var body = msg.body.toString();
console.log('%s attempts=%s', body, msg.attempts);
msg.requeue(2000);
});
reader.on('discard', function(msg){
var body = msg.body.toString();
console.log('giving up on %s', body);
msg.finish();
});
// publish
var writer = nsq.writer(':4150');
writer.on('ready', function() {
writer.publish('events', 'foo');
writer.publish('events', 'bar');
writer.publish('events', 'baz');
});
Create a reader:
id
connection identifier (seeclient_id
in the spec)topic
topic namechannel
channel namensqd
array of nsqd addressesnsqlookupd
array of nsqlookupd addressesmaxAttempts
max attempts before discarding [Infinity]maxConnectionAttempts
max reconnection attempts [Infinity]maxInFlight
max messages distributed across connections [10]msgTimeout
session-specific msg timeoutpollInterval
nsqlookupd poll interval[10000]ready
whenfalse
auto-RDY maintenance will be disabledtrace
trace function
Events:
message
(msg) incoming messagediscard
(msg) discarded messageerror response
(err) response from nsqerror
(err)
Close the reader's connection(s) and fire the optional [fn] when completed.
Create a writer. By default a connection attempt to 0.0.0.0:4150 will be made unless one of the following options are provided:
port
numberhost
namensqd
array of nsqd addressesnsqlookupd
array of nsqlookupd addresses
Events:
error response
(err) response from nsqerror
(err)
Publish the given message
to topic
where message
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed.
Close the writer's connection(s) and fire the optional [fn] when completed.
A single message.
Mark message as complete.
Re-queue the message immediately, or with the
given delay
in milliseconds, or a string such
as "5s", "10m" etc.
Reset the message's timeout, increasing the length of time before NSQD considers it timed out.
Return parsed JSON object.
The following jstrace probes are available:
connection:ready
ready count sentconnection:message
message receivedmessage:finish
finished a messagemessage:requeue
requeued a messagemessage:touch
touched a message
nsqd --lookupd-tcp-address=0.0.0.0:4160 &
nsqadmin --lookupd-http-address=0.0.0.0:4161 &
nsqlookupd &
make test
MIT