Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

0.7 #56

Closed
wants to merge 25 commits into from
Closed

0.7 #56

Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
77c1d15
0.6.17
SomeKittens Jan 29, 2016
081d306
feat(external): Added GustavRedis class for external use
SomeKittens Jan 16, 2016
38476ef
chore(tests): use mocked out redis client in tests
SomeKittens Jan 16, 2016
1b4a958
chore(lint): lint /external
SomeKittens Jan 16, 2016
900cc04
chore(defs): Add interface for external connectors
SomeKittens Jan 16, 2016
7223f58
feat(external): Added ability to easily communicate with external dat…
SomeKittens Jan 17, 2016
8fd2fc5
feat(kafka): Added external connector for Kafka
SomeKittens Jan 17, 2016
d4397a5
chore: Clean up Kafka coupler
SomeKittens Jan 17, 2016
43bb3fd
chore(ci): change c++ build env
SomeKittens Jan 17, 2016
00f38de
chore(ci): fix travis env config
SomeKittens Jan 17, 2016
306b766
feat(rabbit): added rabbit coupler
SomeKittens Jan 18, 2016
fd02888
chore: s/external/coupler/
SomeKittens Jan 18, 2016
5e5b5ec
fix(coupler): actually use the coupler requested
SomeKittens Jan 18, 2016
b1532c1
test(coupler): test default name
SomeKittens Jan 18, 2016
21480c0
test(coupler): test multiple couplers
SomeKittens Jan 18, 2016
f006771
docs(couplers): Added details about couplers to README
SomeKittens Jan 18, 2016
6128ac4
fix: smart quotes are terrible
SomeKittens Jan 18, 2016
85624b3
docs(couplers): describe details of couplers spec
SomeKittens Jan 26, 2016
5600e0e
docs(couplers): describe how things should work by default
SomeKittens Jan 26, 2016
932aac9
docs: fix up some readme quirks under couplers
SomeKittens Feb 1, 2016
17f2658
feat: added __done events to couplers
SomeKittens Feb 2, 2016
d0f99f2
feat: export redis & kafka couplers
SomeKittens Feb 2, 2016
e244078
refactor: coupler -> addCoupler
SomeKittens Feb 2, 2016
f7aa05c
chore: update tsd defs
SomeKittens Feb 3, 2016
cc62a73
docs: fix typo in Coupler details
SomeKittens Feb 3, 2016
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ node_js:
- "4.1"
- "4.0"
env:
- TSD_GITHUB_TOKEN=dc0dec2e633e1366658a40961cdcf389eaf3dfba
- TSD_GITHUB_TOKEN=dc0dec2e633e1366658a40961cdcf389eaf3dfba CC=clang CXX=clang++ npm_config_clang=1z
script:
- npm run lint
- npm test
Expand Down
22 changes: 22 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,25 @@ Further nodes can be chained onto it, using the following methods:
- `.sink` Attach a sink (terminal, returns the workflow itself)
- `merge(...nodes)` Merge several nodes into a single stream
- `.tap` Attach a sink but return the previous node for chaining


## Couplers

Gustav is designed to work with an external tool that manages *where* all of the processed items go, while Gustav workflows manage the actual processing. Redis, Kafka and RabbitMQ are currently supported as well as an in-memory processor that provides no guarantees and is best used for testing.

These are used as such:

```typescript

// Second string param names the coupler
// optional, and the coupler does come with a default
gustav.coupler(new GustavRedis(), 'myRedis');

// Since we've specified a Redis coupler, the following automagically knows to listen to the `demo-in` channel and push any events down the workflow
.from('demo-in')
// Vanilla transformer node
.transf('someTransfNode')
// Anything hitting this sink publishes a message to the `demo-out` channel
.to('demo-out')
.start();
```
21 changes: 21 additions & 0 deletions Workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export interface IWorkflowChain {
transf(name: string | ITransfNode, config?: any, metaConfig?: IMetaConfig): IWorkflowChain;
sink(name: string | ISinkNode, config?: any, metaConfig?: IMetaConfig): Workflow;
merge(...nodes: IWorkflowChain[]): IWorkflowChain;
to (type: string, name: string): Workflow;
tap(name: string | ISinkNode, config?: any, metaConfig?: IMetaConfig): IWorkflowChain;
clone(): IWorkflowChain;
}
Expand Down Expand Up @@ -157,6 +158,17 @@ export class Workflow {
let prevNode = gustav.makeNode(<string>sourceName, this.ggraph, sourceConfig, metaConfig);
return new WorkflowChain(this, prevNode);
}

from (type: string, name: string): IWorkflowChain {
let prevNode;
try {
prevNode = gustav.makeNode(`__from-${type}`, this.ggraph, name, {external: name});
} catch (e) {
throw new Error(`Tried to define \`from\` node "${name}" with no external coupler defined`);
}

return new WorkflowChain(this, prevNode);
}
/**
* Creates a workflow from a JSON definition
* @param {INodeDef[]} config Array of node definitions to make the workflow from
Expand Down Expand Up @@ -284,6 +296,15 @@ class WorkflowChain {
this.addNodeToGraph(name, 'sink', config, metaConfig);
return new WorkflowChain(this.workflow, this.prevNode);
}
to (type: string, name: string): Workflow {
try {
this.addNodeToGraph(`__to-${type}`, 'sink', name, {external: name});
} catch (e) {
throw new Error(`Tried to define \`to\` node "${name}" with no external coupler defined`);
}

return this.workflow;
}
clone(): IWorkflowChain {
return new WorkflowChain(this.workflow, this.prevNode);
}
Expand Down
95 changes: 95 additions & 0 deletions couplers/GustavKafka.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
'use strict';

import {Observable, Subscription} from '@reactivex/rxjs';
import {ICoupler} from '../defs';

// No definitions for kafka, sad
let kafka = require('kafka-node');

export interface IConsumerConfig {
partition?: any;
offset?: any;
}

export class GustavKafka implements ICoupler {
defaultName: string;
constructor(public config?: any) {
this.config = this.config || {
connString: 'localhost:2181',
clientId: 'kafka-gustav-client'
};
this.defaultName = 'kafka';
}

getClient(): any {
return new kafka.Client(this.config.connString, this.config.clientId);
}

// two methods, one called on from, other on to
from(topic: string, offset?: number): Observable<any> {
let client = this.getClient();

let consumer = new kafka.Consumer(client, [{
topic
}]/* TODO */);

return new Observable(o => {
consumer.on('message', m => {
if (m.value === '__done') {
return o.complete();
}
o.next(m.value)
});
consumer.on('error', err => o.error(err));

return () => consumer.close(() => {});
});
}
to(topic: string, iO: Observable<any>): Subscription<any> {
let client = this.getClient();
let producer = new kafka.Producer(client);

let buffer = [];
let handleErr = err => {
if (err) { throw err; }
};

producer.on('ready', () => {
if (!buffer.length) { return; }
producer.send([{
topic: topic,
messages: buffer
}], handleErr);
});

return iO
// Things run faster overall with a little buffering
.bufferTime(50)
.subscribe(
msg => {
if (!msg.length) { return; }

for (let i = msg.length - 1; i >= 0; i--) {
msg[i] = typeof msg[i] === 'string' ? msg[i] : JSON.stringify(msg[i]);
}

if (!producer.ready) {
buffer = buffer.concat(msg);
return;
}

producer.send([{
topic: topic,
messages: msg
}], handleErr);
},
handleErr,
() => {
producer.send([{
topic: topic,
messages: ['__done']
}]);
}
);
}
}
57 changes: 57 additions & 0 deletions couplers/GustavMem.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
'use strict';

import {Observable, Subscription} from '@reactivex/rxjs';
import {ICoupler} from '../defs';

/**
* GustavMem is an in-memory messaging system. It provides NO GUARANTEES about anything.
* Use at your own risk!
*
* Mainly this is used for tests.
*/

export class GustavMem implements ICoupler {
channels: Object;
defaultName: string;
constructor() {
this.channels = {};
this.defaultName = 'mem';
};

from(channelName: string): Observable<any> {
this.initChannel(channelName);
return new Observable(o => {
this.channels[channelName].push(item => {
if (item === '__done') {
return o.complete();
}
o.next(item);
});
});
}
to(channelName: string, iO: Observable<any>): Subscription<any> {
this.initChannel(channelName);

return iO.subscribe(
item => this.channels[channelName].forEach(fn => fn(item)),
err => { throw err; },
() => this.channels[channelName].forEach(fn => fn('__done'))
);
}

// Testing
publish(channelName, item, cb?): void {
this.initChannel(channelName);
this.channels[channelName].forEach(fn => fn(item));
if (cb) { cb(); }
}
subscribe(channelName, fn): void {
this.initChannel(channelName);
this.channels[channelName].push(fn);
}

private initChannel (name: string): void {
if (this.channels[name]) { return; }
this.channels[name] = [];
}
}
80 changes: 80 additions & 0 deletions couplers/GustavRabbit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
'use strict';

import {Observable, Subscription} from '@reactivex/rxjs';
import {ICoupler} from '../defs';
import {connect} from 'amqplib';
import * as uuid from 'node-uuid';

// Untested, use at own risk

// Uses fanout exchanges so we can connect many queues to them
export class GustavRabbit implements ICoupler {
defaultName: string;
constructor(public config?: any) {
this.config = this.config || {
connString: 'amqp://localhost'
};
this.defaultName = 'rabbitmq';
}

from(exchange: string): Observable<any> {
let queue = uuid.v4();
return new Observable(o => {
let conn;
connect(this.config.connString)
.then(c => conn = c && c.createChannel())
.then(ch => {
ch.assertExchange(exchange, 'fanout', {durable: true});
ch.assertQueue(queue, {durable: true});

ch.bindQueue(queue, exchange, '');
ch.consume(queue, msg => {
let msgStr = msg.content.toString();
ch.ack(msg);
if (msgStr === '__done') {
return o.complete();
}
o.next(msgStr);
}, {noAck: false});
})
.catch(err => o.error(err));

return () => conn && conn.close();
});
}

to(exchange: string, iO: Observable<any>): Subscription<any> {
let channel, conn, cachedItems = [];
let queue = uuid.v4();

connect(this.config.connString)
.then(c => conn = c && c.createChannel())
.then(c => {
channel = c;

channel.assertExchange(exchange, 'fanout');
cachedItems.forEach(item => {
channel.publish(exchange, '', new Buffer(item));
});
})
.catch(err => { throw err; });

return iO
.subscribe(
msg => {
if (!channel) {
return cachedItems.push(msg);
}

channel.publish(exchange, '', new Buffer(msg));
},
err => console.error(`rabbitSink err, queue: ${queue}`, err),
() => {
if (conn) {
channel.publish(exchange, '', new Buffer('__done'));
conn.close()
}
}
);
}
}
43 changes: 43 additions & 0 deletions couplers/GustavRedis.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
'use strict';

import {createClient, RedisClient} from 'redis';
import {Observable, Subscription} from '@reactivex/rxjs';
import {ICoupler} from '../defs';

// Provides helpers for getting data from & to Redis event channels
export class GustavRedis implements ICoupler {
defaultName: string;
constructor(public config?: any) {
this.defaultName = 'redis';
}

// supposedly private but needs to be overridden in tests
getClient(): RedisClient {
// Need to create a new client for every connection
return createClient(this.config);
}

// two methods, one called on from, other on to
from(channelName: string): Observable<any> {
let client = this.getClient();
return new Observable(o => {
client.on('message', (channel, message) => {
if (message === '__done') {
return o.complete();
}
o.next(message);
});
client.subscribe(channelName);

return () => client.unsubscribe(channelName);
});
}
to(channelName: string, iO: Observable<any>): Subscription<any> {
let client = this.getClient();
return iO.subscribe(
item => client.publish(channelName, item),
err => { throw err; },
() => client.publish(channelName, '__done')
);
}
}
Loading