From 77c1d15e44a2966826974b45f260bd52bf86ddee Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Thu, 28 Jan 2016 18:01:39 -0800 Subject: [PATCH 01/25] 0.6.17 --- package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package.json b/package.json index 41fe4cd..d82b663 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "gustav", - "version": "0.6.16", + "version": "0.6.17", "description": "Framework for building realtime processing flows in Node.js", "main": "dist/index.js", "scripts": { From 081d3064a39d0af81659e7713980175b436b4aa0 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 13:22:15 -0800 Subject: [PATCH 02/25] feat(external): Added GustavRedis class for external use --- external/GustavRedis.ts | 31 +++++++++++++++ package.json | 1 + test/GRedis.ts | 87 +++++++++++++++++++++++++++++++++++++++++ tsconfig.json | 4 ++ tsd.json | 5 ++- 5 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 external/GustavRedis.ts create mode 100644 test/GRedis.ts diff --git a/external/GustavRedis.ts b/external/GustavRedis.ts new file mode 100644 index 0000000..0c08a1f --- /dev/null +++ b/external/GustavRedis.ts @@ -0,0 +1,31 @@ +'use strict'; + +import {gustav} from '../index'; +import {createClient} from 'redis'; +import {Observable, Subscription} from '@reactivex/rxjs'; + +// Attach to a Gustav instance + +// Provides helpers for getting data from & to Redis event channels +export class GustavRedis { + constructor(public config?) {} + + // two methods, one called on from, other on to + from(channelName: string): Observable { + let client = createClient(this.config); + return new Observable(o => { + client.on('message', (channel, message) => { + o.next(message); + }); + client.subscribe(channelName); + + return () => client.unsubscribe(channelName); + }); + } + to(channelName: string, iO: Observable): Subscription { + let client = createClient(this.config); + return iO.subscribe( + item => client.publish(channelName, item) + ); + } +} \ No newline at end of file diff --git a/package.json b/package.json index d82b663..9dd6ed0 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "dependencies": { "@reactivex/rxjs": "5.0.0-beta.0", "node-uuid": "1.4.7", + "redis": "^2.4.2", "tail": "~0.4.0" }, "devDependencies": { diff --git a/test/GRedis.ts b/test/GRedis.ts new file mode 100644 index 0000000..f9d2f56 --- /dev/null +++ b/test/GRedis.ts @@ -0,0 +1,87 @@ +'use strict'; + +import {GustavRedis} from '../external/GustavRedis'; +import {createClient} from 'redis'; +import {Observable} from '@reactivex/rxjs'; + +import {expect} from 'chai'; + +describe('GustavRedis', () => { + let client, gr; + beforeEach(() => { + client = createClient(); + gr = new GustavRedis(); + }); + + it('constructs without errors', () => { + let a = new GustavRedis(); + expect(a).to.be.ok; + }); + + it('listens to a redis channel', (done) => { + let recieved = 0; + let channel = 'test-0'; + + let redisObservable = gr.from(channel); + + redisObservable.subscribe(item => { + recieved++; + expect(item, 'Recieved proper message').to.equal('hello'); + expect(recieved, 'Correct number of runs').to.equal(1); + done(); + }, err => { throw err; }); + + // TODO: Why is GR's subscribe happening after this publish + // when run sync? + setTimeout(() => { + client.publish(channel, 'hello', (err) => { + if (err) { throw err; } + }); + }, 15); + }); + + it('publishes to a redis channel', (done) => { + let channel = 'test-1'; + + let obs = new Observable(o => { + setTimeout(() => o.next('hello'), 15); + }); + + gr.to(channel, obs); + + client.on('message', (channelIn, message) => { + expect(channelIn).to.equal(channel); + expect(message).to.equal('hello'); + done(); + }); + client.subscribe(channel); + }); + + // Might not be needed now that we don't share clients + it.skip('listens to only one redis channel', (done) => { + let recieved = 0; + let channel = 'test-1'; + let otherChannel = 'test-other'; + + let redisObservable = gr.from(channel); + let redObs2 = gr.from(otherChannel); + + redisObservable.subscribe(item => { + throw 'Unexpected subscribe call'; + }, err => { throw err; }); + + redObs2.subscribe(item => { + recieved++; + }, err => { throw err; }); + + // TODO: Why is GR's subscribe happening after this publish + // when run sync? + setTimeout(() => { + client.publish(otherChannel, 'hello', (err) => { + if (err) { throw err; } + expect(recieved, 'Correct number of runs').to.equal(1); + done(); + }); + }, 15); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index f77aa2f..3506822 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -11,6 +11,7 @@ "helpers.ts", "Workflow.ts", "defs.ts", + "test/common.ts", "test/startstop.ts", "test/json.ts", @@ -19,6 +20,9 @@ "test/register.ts", "test/misc.ts", "test/fullGraph.ts", + "test/GRedis.ts", + + "external/GustavRedis.ts", "Symbol.d.ts", "Promise.d.ts", diff --git a/tsd.json b/tsd.json index 7213cd2..68e1b29 100644 --- a/tsd.json +++ b/tsd.json @@ -6,7 +6,7 @@ "bundle": "typings/tsd.d.ts", "installed": { "node/node.d.ts": { - "commit": "efd40e67ff323f7147651bdbef03c03ead7b1675" + "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" }, "mocha/mocha.d.ts": { "commit": "efd40e67ff323f7147651bdbef03c03ead7b1675" @@ -25,6 +25,9 @@ }, "node-uuid/node-uuid-base.d.ts": { "commit": "efd40e67ff323f7147651bdbef03c03ead7b1675" + }, + "redis/redis.d.ts": { + "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" } } } From 38476ef9f905b4bd49a955154166052a8a685310 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 14:00:25 -0800 Subject: [PATCH 03/25] chore(tests): use mocked out redis client in tests --- external/GustavRedis.ts | 12 +++++++++--- test/GRedis.ts | 26 ++++++++++++++++++++++++-- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/external/GustavRedis.ts b/external/GustavRedis.ts index 0c08a1f..4abef25 100644 --- a/external/GustavRedis.ts +++ b/external/GustavRedis.ts @@ -1,7 +1,7 @@ 'use strict'; import {gustav} from '../index'; -import {createClient} from 'redis'; +import {createClient, RedisClient} from 'redis'; import {Observable, Subscription} from '@reactivex/rxjs'; // Attach to a Gustav instance @@ -10,9 +10,15 @@ import {Observable, Subscription} from '@reactivex/rxjs'; export class GustavRedis { constructor(public config?) {} + // supposedly private but needs to be overridden in tests + getClient(): RedisClient { + // Need to create a new client for every connection + return createClient(this.config); + } + // two methods, one called on from, other on to from(channelName: string): Observable { - let client = createClient(this.config); + let client = this.getClient(); return new Observable(o => { client.on('message', (channel, message) => { o.next(message); @@ -23,7 +29,7 @@ export class GustavRedis { }); } to(channelName: string, iO: Observable): Subscription { - let client = createClient(this.config); + let client = this.getClient(); return iO.subscribe( item => client.publish(channelName, item) ); diff --git a/test/GRedis.ts b/test/GRedis.ts index f9d2f56..bd41fec 100644 --- a/test/GRedis.ts +++ b/test/GRedis.ts @@ -1,16 +1,38 @@ 'use strict'; import {GustavRedis} from '../external/GustavRedis'; -import {createClient} from 'redis'; import {Observable} from '@reactivex/rxjs'; import {expect} from 'chai'; +class MockClient { + listeners: any[]; + channels: any[]; + constructor() { + this.channels = []; + this.listeners = []; + } + publish(channel: string, item: string, fn?): void { + if (this.channels.indexOf(channel) > -1) { + this.listeners.forEach(listener => listener(channel, item)); + } + if (fn) { fn(null, channel); } + } + on(message: string, fn): void { + this.listeners.push(fn); + } + subscribe(channel: string, fn?): void { + this.channels.push(channel); + if (fn) { fn(); } + } +} + describe('GustavRedis', () => { let client, gr; beforeEach(() => { - client = createClient(); + client = new MockClient(); gr = new GustavRedis(); + gr.getClient = () => client; }); it('constructs without errors', () => { From 1b4a958c786e2b2d1a122ba2680b94f805233520 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 14:09:28 -0800 Subject: [PATCH 04/25] chore(lint): lint /external --- external/GustavRedis.ts | 3 +-- package.json | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/external/GustavRedis.ts b/external/GustavRedis.ts index 4abef25..9ba25c6 100644 --- a/external/GustavRedis.ts +++ b/external/GustavRedis.ts @@ -1,6 +1,5 @@ 'use strict'; -import {gustav} from '../index'; import {createClient, RedisClient} from 'redis'; import {Observable, Subscription} from '@reactivex/rxjs'; @@ -34,4 +33,4 @@ export class GustavRedis { item => client.publish(channelName, item) ); } -} \ No newline at end of file +} diff --git a/package.json b/package.json index 9dd6ed0..ca2e49f 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "scripts": { "build": "tsd install && tsc", "build:watch": "tsc --watch", - "lint": "tslint *.ts && tslint test/*.ts", + "lint": "tslint *.ts && tslint external/*.ts && tslint test/*.ts", "prepublish": "npm run build", "snyk": "snyk test", "test": "mocha dist/test", From 900cc042264d9aa3534acfc0c3aad4dc6daf6a02 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 14:25:18 -0800 Subject: [PATCH 05/25] chore(defs): Add interface for external connectors --- defs.ts | 7 +++++++ external/GustavRedis.ts | 5 +++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/defs.ts b/defs.ts index e7b906e..bcecc16 100644 --- a/defs.ts +++ b/defs.ts @@ -27,3 +27,10 @@ export interface ISinkNode { (iO: Observable): Subscription; (config: Object, iO: Observable): Subscription; } + +export interface IExternalClient { + config: any; + getClient?(): any; + from(name: string): Observable; + to(name: string, iO: Observable): Subscription; +} diff --git a/external/GustavRedis.ts b/external/GustavRedis.ts index 9ba25c6..f5e5f05 100644 --- a/external/GustavRedis.ts +++ b/external/GustavRedis.ts @@ -2,12 +2,13 @@ import {createClient, RedisClient} from 'redis'; import {Observable, Subscription} from '@reactivex/rxjs'; +import {IExternalClient} from '../defs'; // Attach to a Gustav instance // Provides helpers for getting data from & to Redis event channels -export class GustavRedis { - constructor(public config?) {} +export class GustavRedis implements IExternalClient { + constructor(public config?: any) {} // supposedly private but needs to be overridden in tests getClient(): RedisClient { From 7223f58ebdc0468b0c9b896b61254c9bae931333 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 16:08:15 -0800 Subject: [PATCH 06/25] feat(external): Added ability to easily communicate with external data sources Through gustav.external, you can now set an external data service for Gustav to communicate with There are two currently supported: in memory and Redis. closes #44, closes #39 --- Workflow.ts | 21 +++++++++++++++++ defs.ts | 2 +- external/GustavMem.ts | 55 +++++++++++++++++++++++++++++++++++++++++++ index.ts | 27 ++++++++++++++------- test/GMem.ts | 50 +++++++++++++++++++++++++++++++++++++++ test/common.ts | 6 +++-- test/external.ts | 53 +++++++++++++++++++++++++++++++++++++++++ test/fullGraph.ts | 7 +++--- test/json.ts | 5 +++- test/meta.ts | 7 +++--- test/misc.ts | 46 +++++++++++++++++++----------------- test/register.ts | 6 +++-- test/startstop.ts | 6 +++-- test/testNodes.ts | 4 ---- tsconfig.json | 7 ++++-- 15 files changed, 253 insertions(+), 49 deletions(-) create mode 100644 external/GustavMem.ts create mode 100644 test/GMem.ts create mode 100644 test/external.ts diff --git a/Workflow.ts b/Workflow.ts index f1e3d75..c12c4b7 100644 --- a/Workflow.ts +++ b/Workflow.ts @@ -16,6 +16,7 @@ export interface IWorkflowChain { transf(name: string | ITransfNode, config?: any, metaConfig?: IMetaConfig): IWorkflowChain; sink(name: string | ISinkNode, config?: any, metaConfig?: IMetaConfig): Workflow; merge(...nodes: IWorkflowChain[]): IWorkflowChain; + to (name: string): Workflow; tap(name: string | ISinkNode, config?: any, metaConfig?: IMetaConfig): IWorkflowChain; clone(): IWorkflowChain; } @@ -157,6 +158,17 @@ export class Workflow { let prevNode = gustav.makeNode(sourceName, this.ggraph, sourceConfig, metaConfig); return new WorkflowChain(this, prevNode); } + + from (name: string): IWorkflowChain { + let prevNode; + try { + prevNode = gustav.makeNode('__from', this.ggraph, name, {external: name}); + } catch (e) { + throw new Error(`Tried to define \`from\` node "${name}" with no external interface defined`); + } + + return new WorkflowChain(this, prevNode); + } /** * Creates a workflow from a JSON definition * @param {INodeDef[]} config Array of node definitions to make the workflow from @@ -284,6 +296,15 @@ class WorkflowChain { this.addNodeToGraph(name, 'sink', config, metaConfig); return new WorkflowChain(this.workflow, this.prevNode); } + to (name: string): Workflow { + try { + this.addNodeToGraph('__to', 'sink', name, {external: name}); + } catch (e) { + throw new Error(`Tried to define \`to\` node "${name}" with no external interface defined`); + } + + return this.workflow; + } clone(): IWorkflowChain { return new WorkflowChain(this.workflow, this.prevNode); } diff --git a/defs.ts b/defs.ts index bcecc16..5f9bee5 100644 --- a/defs.ts +++ b/defs.ts @@ -29,7 +29,7 @@ export interface ISinkNode { } export interface IExternalClient { - config: any; + config?: any; getClient?(): any; from(name: string): Observable; to(name: string, iO: Observable): Subscription; diff --git a/external/GustavMem.ts b/external/GustavMem.ts new file mode 100644 index 0000000..3c20462 --- /dev/null +++ b/external/GustavMem.ts @@ -0,0 +1,55 @@ +'use strict'; + +import {Observable, Subscription} from '@reactivex/rxjs'; +import {IExternalClient} from '../defs'; + +/** + * GustavMem is an in-memory messaging system. It provides NO GUARANTEES about anything. + * Use at your own risk! + * + * Mainly this is used for tests. + */ + +export class GustavMem implements IExternalClient { + channels: Object; + constructor() { + this.channels = {}; + }; + + from(channelName: string): Observable { + this.initChannel(channelName); + return new Observable(o => { + this.channels[channelName].push(item => { + if (item === '__done') { + return o.complete(); + } + o.next(item); + }); + }); + } + to(channelName: string, iO: Observable): Subscription { + this.initChannel(channelName); + + return iO.subscribe( + item => this.channels[channelName].forEach(fn => fn(item)), + err => { throw err; }, + () => this.channels[channelName].forEach(fn => fn('__done')) + ); + } + + // Testing + publish(channelName, item, cb?): void { + this.initChannel(channelName); + this.channels[channelName].forEach(fn => fn(item)); + if (cb) { cb(); } + } + subscribe(channelName, fn): void { + this.initChannel(channelName); + this.channels[channelName].push(fn); + } + + private initChannel (name: string): void { + if (this.channels[name]) { return; } + this.channels[name] = []; + } +} diff --git a/index.ts b/index.ts index a4a7811..af2be3e 100644 --- a/index.ts +++ b/index.ts @@ -2,7 +2,7 @@ import {GustavGraph} from './GustavGraph'; import {Workflow} from './Workflow'; -import {IMetaConfig} from './defs'; +import {IMetaConfig, IExternalClient} from './defs'; export interface INodeFactory { (...config: any[]): symbol; @@ -25,6 +25,14 @@ let workflows = {}; let register; let anonWfId = 0; +// Meta nodes +// TODO: move into its own file +let registerMetaNodes = (gustav) => { + gustav.transformer('__gmergeNode', (nodes, iO) => { + return iO.do(() => {}); + }); +}; + export let gustav = { makeNode: (nodeName: string, graph: GustavGraph, config: any, metaConfig?: IMetaConfig): symbol => { let node = registeredNodes.filter((regNode) => regNode.name === nodeName)[0]; @@ -64,6 +72,8 @@ export let gustav = { reset: (): void => { anonWfId = 0; workflows = {}; + registeredNodes = []; + registerMetaNodes(gustav); }, getNodeTypes: (): INodeCollection => { return registeredNodes.reduce((obj, node) => { @@ -127,9 +137,13 @@ export let gustav = { return graph; }, - source(name: string, factory: Function): Function { return register('source', name, factory); }, - transformer(name: string, factory: Function): Function { return register('transformer', name, factory); }, - sink(name: string, factory: Function): Function { return register('sink', name, factory); }, + external: (externalConnector: IExternalClient): void => { + gustav.source('__from', (name) => externalConnector.from(name)); + gustav.sink('__to', (name, iO) => externalConnector.to(name, iO)); + }, + source: (name: string, factory: Function): Function => { return register('source', name, factory); }, + transformer: (name: string, factory: Function): Function => { return register('transformer', name, factory); }, + sink: (name: string, factory: Function): Function => { return register('sink', name, factory); } }; // TODO: new type of registration that's just a singleton @@ -158,7 +172,4 @@ register = (type: string, name: string, factory): INodeFactory => { return gustav.makeNode.bind(null, name); }; -// Meta nodes -gustav.transformer('__gmergeNode', (nodes, iO) => { - return iO.do(() => {}); -}); +registerMetaNodes(gustav); diff --git a/test/GMem.ts b/test/GMem.ts new file mode 100644 index 0000000..164899a --- /dev/null +++ b/test/GMem.ts @@ -0,0 +1,50 @@ +'use strict'; + +import {GustavMem} from '../external/GustavMem'; +import {Observable} from '@reactivex/rxjs'; + +import {expect} from 'chai'; + +describe('GustavMem', () => { + let gm; + beforeEach(() => { + gm = new GustavMem(); + }); + + it('constructs without errors', () => { + let a = new GustavMem(); + expect(a).to.be.ok; + }); + + it('listens to a channel', (done) => { + let recieved = 0; + let channel = 'test-0'; + + let myObservable = gm.from(channel); + + myObservable.subscribe(item => { + recieved++; + expect(item, 'Recieved proper message').to.equal('hello'); + expect(recieved, 'Correct number of runs').to.equal(1); + done(); + }, err => { throw err; }); + + gm.publish(channel, 'hello'); + }); + + it('publishes to a channel', (done) => { + let channel = 'test-1'; + + let obs = new Observable(o => { + setTimeout(() => o.next('hello'), 15); + }); + + gm.subscribe(channel, (message) => { + expect(message).to.equal('hello'); + done(); + }); + + gm.to(channel, obs); + + }); +}); diff --git a/test/common.ts b/test/common.ts index c320d9f..f01a1a7 100644 --- a/test/common.ts +++ b/test/common.ts @@ -5,10 +5,12 @@ import {Workflow} from '../Workflow'; import {expect} from 'chai'; import {addCommonNodes} from './testNodes'; -addCommonNodes(gustav); - // Couple of common workflows describe('Common workflows', () => { + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); + }); let wfFactories = []; wfFactories.push((done): Workflow => { diff --git a/test/external.ts b/test/external.ts new file mode 100644 index 0000000..a9029a0 --- /dev/null +++ b/test/external.ts @@ -0,0 +1,53 @@ +'use strict'; + +import {gustav} from '../index'; +import {Workflow} from '../Workflow'; +import {GustavMem} from '../external/GustavMem'; +import {addCommonNodes} from './testNodes'; + + +describe('gustav.external', () => { + let gm; + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); + gm = new GustavMem(); + }); + let wfFactories = []; + wfFactories.push((done): Workflow => { + return gustav.createWorkflow('ex-0') + .source('intSource') + .to('bill'); + }); + + wfFactories.push((done): Workflow => { + return gustav.createWorkflow('ex-1') + .from('bill') + .transf('timesTwo') + .sink('fromIntTransformer', done); + }); + + it('allows for multiple workflows', (done) => { + gustav.external(gm); + + wfFactories[1](done).start(); + wfFactories[0]().start(); + }); + + it('allows for forking workflows', done => { + gustav.external(gm); + let inProgress = 3; + + let partDone = () => { + inProgress--; + if (!inProgress) { + done(); + } + }; + + wfFactories[1](partDone).start(); + wfFactories[1](partDone).start(); + wfFactories[1](partDone).start(); + wfFactories[0]().start(); + }); +}); diff --git a/test/fullGraph.ts b/test/fullGraph.ts index 67b6fd3..6d5dc5a 100644 --- a/test/fullGraph.ts +++ b/test/fullGraph.ts @@ -4,11 +4,12 @@ import {gustav} from '../index'; import {addCommonNodes} from './testNodes'; import {expect} from 'chai'; -addCommonNodes(gustav); - describe('FullGraph of multiple workflows', () => { - it('should create a full graph, including external nodes', () => { + beforeEach(() => { gustav.reset(); + addCommonNodes(gustav); + }); + it('should create a full graph, including external nodes', () => { gustav.createWorkflow('wf1') .source('intSource', undefined, { external: 'bill' diff --git a/test/json.ts b/test/json.ts index 76ee914..98ffa13 100644 --- a/test/json.ts +++ b/test/json.ts @@ -5,8 +5,11 @@ import {INodeDef} from '../defs'; import {expect} from 'chai'; import {addCommonNodes} from './testNodes'; -addCommonNodes(gustav); describe(`Workflow's .fromJSON()`, () => { + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); + }); let noop = (): void => {}; let simpleWf = (done): INodeDef[] => [{ id: 1, diff --git a/test/meta.ts b/test/meta.ts index 850de4d..bc4e742 100644 --- a/test/meta.ts +++ b/test/meta.ts @@ -4,12 +4,13 @@ import {gustav} from '../index'; import {addCommonNodes} from './testNodes'; import {expect} from 'chai'; -addCommonNodes(gustav); - // TODO: not stupid way of doing this // i.e. no try/catch describe('Node metadata', () => { - + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); + }); it('should be able to attach a metadata watcher', (done) => { try { let i = 0; diff --git a/test/misc.ts b/test/misc.ts index ae83364..c2bb34c 100644 --- a/test/misc.ts +++ b/test/misc.ts @@ -4,31 +4,35 @@ import {gustav} from '../index'; import {expect} from 'chai'; import {addCommonNodes} from './testNodes'; -addCommonNodes(gustav); - -describe('Adding a custom id to a node', () => { - it('should allow the user to add a custom id', () => { - let wf = gustav.createWorkflow('gid test') - .source('intSource', null, {gid: 'hello'}) - .sink('fromIntSource', () => {}, {gid: 'bye'}); - - - // Simple way to get the nodes themselves - expect(wf.ggraph.sinkEdges[0].to.toString()).to.equal('Symbol(intSource-hello)'); - expect(wf.ggraph.sinkEdges[0].from.toString()).to.equal('Symbol(fromIntSource-bye)'); +describe('misc tests', () => { + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); }); -}); + describe('Adding a custom id to a node', () => { + it('should allow the user to add a custom id', () => { + let wf = gustav.createWorkflow('gid test') + .source('intSource', null, {gid: 'hello'}) + .sink('fromIntSource', () => {}, {gid: 'bye'}); -describe('Workflows have name even when one is not passed in', () => { - before(() => gustav.reset()); - it('assigns a name to a workflow', () => { - let wf = gustav.createWorkflow(); - expect(wf.name).to.equal('Unnamed Workflow 0'); + // Simple way to get the nodes themselves + expect(wf.ggraph.sinkEdges[0].to.toString()).to.equal('Symbol(intSource-hello)'); + expect(wf.ggraph.sinkEdges[0].from.toString()).to.equal('Symbol(fromIntSource-bye)'); + }); }); - it('increments the number in the name', () => { - let wf = gustav.createWorkflow(); - expect(wf.name).to.equal('Unnamed Workflow 1'); + describe('Workflows have name even when one is not passed in', () => { + it('assigns a name to a workflow', () => { + let wf = gustav.createWorkflow(); + expect(wf.name).to.equal('Unnamed Workflow 0'); + }); + + it('increments the number in the name', () => { + let wf0 = gustav.createWorkflow(); + let wf1 = gustav.createWorkflow(); + expect(wf0.name).to.equal('Unnamed Workflow 0'); + expect(wf1.name).to.equal('Unnamed Workflow 1'); + }); }); }); diff --git a/test/register.ts b/test/register.ts index 7b3e5f9..9ad4253 100644 --- a/test/register.ts +++ b/test/register.ts @@ -4,9 +4,11 @@ import {gustav} from '../index'; import {expect} from 'chai'; import {addCommonNodes} from './testNodes'; -addCommonNodes(gustav); - describe('registration errors', () => { + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); + }); it('should throw an error when we forget a name', () => { let thrower = () => gustav.source('', () => {}); diff --git a/test/startstop.ts b/test/startstop.ts index 902ced9..94374f7 100644 --- a/test/startstop.ts +++ b/test/startstop.ts @@ -3,11 +3,13 @@ import {gustav} from '../index'; import {addCommonNodes} from './testNodes'; -addCommonNodes(gustav); - // TODO: not stupid way of doing this // i.e. no try/catch describe('Workflow start/stop', () => { + beforeEach(() => { + gustav.reset(); + addCommonNodes(gustav); + }); it('should be able to start & stop a workflow', (done) => { try { let firstRun = true; diff --git a/test/testNodes.ts b/test/testNodes.ts index f040a5a..70deaf0 100644 --- a/test/testNodes.ts +++ b/test/testNodes.ts @@ -7,11 +7,7 @@ import {expect} from 'chai'; /** * Nodes common to all test workflows */ -let registered = false; export let addCommonNodes = gustav => { - // Avoids double registration problems - if (registered) { return; } - registered = true; // Used by several nodes let words = ['hello', 'world', 'gustav', 'is', 'neat']; diff --git a/tsconfig.json b/tsconfig.json index 3506822..bac6d4a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,6 +12,9 @@ "Workflow.ts", "defs.ts", + "external/GustavRedis.ts", + "external/GustavMem.ts", + "test/common.ts", "test/startstop.ts", "test/json.ts", @@ -20,9 +23,9 @@ "test/register.ts", "test/misc.ts", "test/fullGraph.ts", + "test/external.ts", "test/GRedis.ts", - - "external/GustavRedis.ts", + "test/GMem.ts", "Symbol.d.ts", "Promise.d.ts", From 8fd2fc5ba60070fba25a67437357aec7e9509f1a Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 20:01:59 -0800 Subject: [PATCH 07/25] feat(kafka): Added external connector for Kafka --- external/GustavKafka.ts | 83 +++++++++++++++++++++++++++++++++++++++++ package.json | 2 + test/GKafka.ts | 70 ++++++++++++++++++++++++++++++++++ tsconfig.json | 2 + 4 files changed, 157 insertions(+) create mode 100644 external/GustavKafka.ts create mode 100644 test/GKafka.ts diff --git a/external/GustavKafka.ts b/external/GustavKafka.ts new file mode 100644 index 0000000..0b9acce --- /dev/null +++ b/external/GustavKafka.ts @@ -0,0 +1,83 @@ +'use strict'; + +import {Observable, Subscription} from '@reactivex/rxjs'; +import {IExternalClient} from '../defs'; + +// No definitions for kafka, sad +let kafka = require('kafka-node'); + +export interface IConsumerConfig { + partition?: any; + offset?: any; +} + +export class GustavKafka implements IExternalClient { + constructor(public config?: any) { + this.config = this.config || { + connString: 'localhost:2181', + clientId: 'kafka-gustav-client' + }; + } + + getClient(): any { + return new kafka.Client(this.config.connString, this.config.clientId); + } + + // two methods, one called on from, other on to + from(topic: string, offset?: number): Observable { + let client = this.getClient(); + + let consumer = new kafka.Consumer(client, [{ + topic + }]/* TODO */); + + return new Observable(o => { + consumer.on('message', m => o.next(m.value)); + consumer.on('error', err => o.error(err)); + + return () => consumer.close(() => {}); + }); + } + to(topic: string, iO: Observable): Subscription { + let client = this.getClient(); + let producer = new kafka.Producer(client); + + let buffer = []; + let handleErr = err => { + if (err) { throw err; } + }; + + producer.on('ready', () => { + if (!buffer.length) { return; } + producer.send([{ + topic: topic, + messages: buffer + }], handleErr); + }); + + return iO + .bufferTime(50) + .subscribe( + msg => { + if (!msg.length) { return; } + + for (let i = msg.length - 1; i >= 0; i--) { + msg[i] = typeof msg[i] === 'string' ? msg[i] : JSON.stringify(msg[i]); + } + + if (!producer.ready) { + console.log('buffering'); + buffer = buffer.concat(msg); + return; + } + + producer.send([{ + topic: topic, + messages: msg + }], handleErr); + }, + handleErr, + () => {/* noop */}// client.close(console.log.bind(console, 'done', config.topic)) + ); + } +} diff --git a/package.json b/package.json index ca2e49f..572e45d 100644 --- a/package.json +++ b/package.json @@ -10,6 +10,7 @@ "prepublish": "npm run build", "snyk": "snyk test", "test": "mocha dist/test", + "test:int": "INTE=true npm run test -s", "test:watch": "mocha --watch dist/test" }, "author": "Randall Koutnik", @@ -24,6 +25,7 @@ }, "dependencies": { "@reactivex/rxjs": "5.0.0-beta.0", + "kafka-node": "^0.3.1", "node-uuid": "1.4.7", "redis": "^2.4.2", "tail": "~0.4.0" diff --git a/test/GKafka.ts b/test/GKafka.ts new file mode 100644 index 0000000..74c9efa --- /dev/null +++ b/test/GKafka.ts @@ -0,0 +1,70 @@ +'use strict'; + +import {GustavKafka} from '../external/GustavKafka'; +import {Observable} from '@reactivex/rxjs'; +import {expect} from 'chai'; + +let kafka = require('kafka-node'); + +describe('GustavKafka', () => { + // Hacky way of doing integration testing + let kit = process.env.INTE ? it.bind(it) : it.skip.bind(it); + + let handleErr = err => { if (err) { throw err; }}; + + let client, gr; + beforeEach(() => { + client = new kafka.Client('localhost:2181'); + gr = new GustavKafka(); + }); + + kit('constructs without errors', () => { + let a = new GustavKafka(); + expect(a).to.be.ok; + }); + + kit('listens to a kafka topic', (done) => { + let recieved = 0; + let topic = 'gustavTest-listen'; + + let producer = new kafka.Producer(client); + + producer.on('ready', () => { + producer.send([{ + topic, + messages: ['hello'] + }], handleErr); + }); + + producer.on('err', handleErr); + + let kafObservable = gr.from(topic); + + kafObservable.subscribe(item => { + recieved++; + expect(item, 'Recieved proper message').to.equal('hello'); + expect(recieved, 'Correct number of runs').to.equal(1); + done(); + }, + handleErr); + }); + + kit('publishes to a kafka topic', (done) => { + let topic = 'gustavTest-publish'; + + let consumer = new kafka.Consumer(client, [{ + topic + }]); + + let obs = new Observable(o => { + setTimeout(() => o.next('hello'), 15); + }); + + gr.to(topic, obs); + + consumer.on('message', (message) => { + expect(message.value).to.equal('hello'); + done(); + }); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index bac6d4a..f135fcb 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,6 +13,7 @@ "defs.ts", "external/GustavRedis.ts", + "external/GustavKafka.ts", "external/GustavMem.ts", "test/common.ts", @@ -25,6 +26,7 @@ "test/fullGraph.ts", "test/external.ts", "test/GRedis.ts", + "test/GKafka.ts", "test/GMem.ts", "Symbol.d.ts", From d4397a5e8a47433e5ac34b3dec99b5abc7fe9858 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sat, 16 Jan 2016 20:05:35 -0800 Subject: [PATCH 08/25] chore: Clean up Kafka coupler --- external/GustavKafka.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/GustavKafka.ts b/external/GustavKafka.ts index 0b9acce..b72ed7f 100644 --- a/external/GustavKafka.ts +++ b/external/GustavKafka.ts @@ -56,6 +56,7 @@ export class GustavKafka implements IExternalClient { }); return iO + // Things run faster overall with a little buffering .bufferTime(50) .subscribe( msg => { @@ -66,7 +67,6 @@ export class GustavKafka implements IExternalClient { } if (!producer.ready) { - console.log('buffering'); buffer = buffer.concat(msg); return; } From 43bb3fd3e297193af3b8a9e30922d86f652e72d4 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sun, 17 Jan 2016 10:27:17 -0800 Subject: [PATCH 09/25] chore(ci): change c++ build env --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 7c6aa92..fc86ae3 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ node_js: - "4.0" env: - TSD_GITHUB_TOKEN=dc0dec2e633e1366658a40961cdcf389eaf3dfba + - CC=clang CXX=clang++ npm_config_clang=1z script: - npm run lint - npm test From 00f38de16225469edc76ad2355f07f038aa05ef5 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Sun, 17 Jan 2016 10:30:08 -0800 Subject: [PATCH 10/25] chore(ci): fix travis env config --- .travis.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index fc86ae3..ea71102 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,8 +9,7 @@ node_js: - "4.1" - "4.0" env: - - TSD_GITHUB_TOKEN=dc0dec2e633e1366658a40961cdcf389eaf3dfba - - CC=clang CXX=clang++ npm_config_clang=1z + - TSD_GITHUB_TOKEN=dc0dec2e633e1366658a40961cdcf389eaf3dfba CC=clang CXX=clang++ npm_config_clang=1z script: - npm run lint - npm test From 306b7669f05c6fd9c0bfa2bb1b141662cbe3905c Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 13:39:29 -0800 Subject: [PATCH 11/25] feat(rabbit): added rabbit coupler --- external/GustavRabbit.ts | 69 +++++++++++++++++++++++++++++++++++++ package.json | 1 + test/GRabbit.ts | 73 ++++++++++++++++++++++++++++++++++++++++ tsconfig.json | 2 ++ tsd.json | 6 ++++ 5 files changed, 151 insertions(+) create mode 100644 external/GustavRabbit.ts create mode 100644 test/GRabbit.ts diff --git a/external/GustavRabbit.ts b/external/GustavRabbit.ts new file mode 100644 index 0000000..d046604 --- /dev/null +++ b/external/GustavRabbit.ts @@ -0,0 +1,69 @@ +'use strict'; + +import {Observable, Subscription} from '@reactivex/rxjs'; +import {IExternalClient} from '../defs'; +import {connect} from 'amqplib'; +import * as uuid from 'node-uuid'; + +// Untested, use at own risk + +// Uses fanout exchanges so we can connect many queues to them +export class GustavRabbit implements IExternalClient { + constructor(public config?: any) { + this.config = this.config || { + connString: 'amqp://localhost' + }; + } + + from(exchange: string): Observable { + let queue = uuid.v4(); + return new Observable(o => { + let conn; + connect(this.config.connString) + .then(c => conn = c && c.createChannel()) + .then(ch => { + ch.assertExchange(exchange, 'fanout', {durable: true}); + ch.assertQueue(queue, {durable: true}); + + ch.bindQueue(queue, exchange, ''); + ch.consume(queue, msg => { + o.next(msg.content.toString()); + ch.ack(msg); + }, {noAck: false}); + }) + .catch(err => o.error(err)); + + return () => conn && conn.close(); + }); + } + + to(exchange: string, iO: Observable): Subscription { + let channel, conn, cachedItems = []; + let queue = uuid.v4(); + + connect(this.config.connString) + .then(c => conn = c && c.createChannel()) + .then(c => { + channel = c; + + channel.assertExchange(exchange, 'fanout'); + cachedItems.forEach(item => { + channel.publish(exchange, '', new Buffer(item)); + }); + }) + .catch(err => { throw err; }); + + return iO + .subscribe( + msg => { + if (!channel) { + return cachedItems.push(msg); + } + + channel.publish(exchange, '', new Buffer(msg)); + }, + err => console.error(`rabbitSink err, queue: ${queue}`, err), + () => conn && conn.close() + ); + } +} diff --git a/package.json b/package.json index 572e45d..340a2ea 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ }, "dependencies": { "@reactivex/rxjs": "5.0.0-beta.0", + "amqplib": "^0.4.0", "kafka-node": "^0.3.1", "node-uuid": "1.4.7", "redis": "^2.4.2", diff --git a/test/GRabbit.ts b/test/GRabbit.ts new file mode 100644 index 0000000..bdd666a --- /dev/null +++ b/test/GRabbit.ts @@ -0,0 +1,73 @@ +'use strict'; + +import {GustavRabbit} from '../external/GustavRabbit'; +import {Observable} from '@reactivex/rxjs'; +import {expect} from 'chai'; +import {connect} from 'amqplib'; +import * as uuid from 'node-uuid'; + +describe('GustavRabbit', () => { + // Hacky way of doing integration testing + let kit = process.env.INTE ? it.bind(it) : it.skip.bind(it); + + let handleErr = err => { if (err) { throw err; }}; + + let client, gr, connProm; + beforeEach(() => { + connProm = connect('amqp://localhost') + .then(c => client = c && c.createChannel()); + gr = new GustavRabbit(); + }); + + kit('constructs without errors', () => { + let a = new GustavRabbit(); + expect(a).to.be.ok; + }); + + kit('listens to a rabbitmq exchange', (done) => { + let recieved = 0; + let exchange = 'gustavTest-listen'; + + let rabbitObservable = gr.from(exchange); + + rabbitObservable.subscribe(item => { + recieved++; + expect(item, 'Recieved proper message').to.equal('hello'); + expect(recieved, 'Correct number of runs').to.equal(1); + done(); + }, + handleErr); + + connProm.then(ch => { + ch.assertExchange(exchange, 'fanout'); + setTimeout(() => { + ch.publish(exchange, '', new Buffer('hello')); + }, 15); + }).catch(handleErr); + }); + + kit('publishes to a rabbitmq exchange', (done) => { + let exchange = 'gustavTest-publish'; + let queue = uuid.v4(); + + connProm + .then(ch => { + ch.assertExchange(exchange, 'fanout', {durable: true}); + ch.assertQueue(queue, {durable: true}); + + ch.bindQueue(queue, exchange, ''); + ch.consume(queue, msg => { + expect(msg.content.toString()).to.equal('hello'); + done(); + ch.ack(msg); + }, {noAck: false}); + }) + .catch(handleErr); + + let obs = new Observable(o => { + setTimeout(() => o.next('hello'), 15); + }); + + gr.to(exchange, obs); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index f135fcb..5b44d2d 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -14,6 +14,7 @@ "external/GustavRedis.ts", "external/GustavKafka.ts", + "external/GustavRabbit.ts", "external/GustavMem.ts", "test/common.ts", @@ -27,6 +28,7 @@ "test/external.ts", "test/GRedis.ts", "test/GKafka.ts", + "test/GRabbit.ts", "test/GMem.ts", "Symbol.d.ts", diff --git a/tsd.json b/tsd.json index 68e1b29..d8123e9 100644 --- a/tsd.json +++ b/tsd.json @@ -28,6 +28,12 @@ }, "redis/redis.d.ts": { "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" + }, + "amqplib/amqplib.d.ts": { + "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" + }, + "when/when.d.ts": { + "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" } } } From fd028882c29df63a17598c987b5c14d1ead67c9d Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 14:26:34 -0800 Subject: [PATCH 12/25] chore: s/external/coupler/ --- Workflow.ts | 10 +++++----- {external => couplers}/GustavKafka.ts | 6 ++++-- {external => couplers}/GustavMem.ts | 6 ++++-- {external => couplers}/GustavRabbit.ts | 6 ++++-- {external => couplers}/GustavRedis.ts | 9 ++++++--- defs.ts | 3 ++- index.ts | 11 +++++++---- test/GKafka.ts | 2 +- test/GMem.ts | 2 +- test/GRabbit.ts | 2 +- test/GRedis.ts | 2 +- test/{external.ts => couplers.ts} | 12 ++++++------ tsconfig.json | 10 +++++----- 13 files changed, 47 insertions(+), 34 deletions(-) rename {external => couplers}/GustavKafka.ts (93%) rename {external => couplers}/GustavMem.ts (90%) rename {external => couplers}/GustavRabbit.ts (92%) rename {external => couplers}/GustavRedis.ts (84%) rename test/{external.ts => couplers.ts} (82%) diff --git a/Workflow.ts b/Workflow.ts index c12c4b7..918113a 100644 --- a/Workflow.ts +++ b/Workflow.ts @@ -16,7 +16,7 @@ export interface IWorkflowChain { transf(name: string | ITransfNode, config?: any, metaConfig?: IMetaConfig): IWorkflowChain; sink(name: string | ISinkNode, config?: any, metaConfig?: IMetaConfig): Workflow; merge(...nodes: IWorkflowChain[]): IWorkflowChain; - to (name: string): Workflow; + to (type: string, name: string): Workflow; tap(name: string | ISinkNode, config?: any, metaConfig?: IMetaConfig): IWorkflowChain; clone(): IWorkflowChain; } @@ -159,12 +159,12 @@ export class Workflow { return new WorkflowChain(this, prevNode); } - from (name: string): IWorkflowChain { + from (type: string, name: string): IWorkflowChain { let prevNode; try { prevNode = gustav.makeNode('__from', this.ggraph, name, {external: name}); } catch (e) { - throw new Error(`Tried to define \`from\` node "${name}" with no external interface defined`); + throw new Error(`Tried to define \`from\` node "${name}" with no external coupler defined`); } return new WorkflowChain(this, prevNode); @@ -296,11 +296,11 @@ class WorkflowChain { this.addNodeToGraph(name, 'sink', config, metaConfig); return new WorkflowChain(this.workflow, this.prevNode); } - to (name: string): Workflow { + to (type: string, name: string): Workflow { try { this.addNodeToGraph('__to', 'sink', name, {external: name}); } catch (e) { - throw new Error(`Tried to define \`to\` node "${name}" with no external interface defined`); + throw new Error(`Tried to define \`to\` node "${name}" with no external coupler defined`); } return this.workflow; diff --git a/external/GustavKafka.ts b/couplers/GustavKafka.ts similarity index 93% rename from external/GustavKafka.ts rename to couplers/GustavKafka.ts index b72ed7f..4612919 100644 --- a/external/GustavKafka.ts +++ b/couplers/GustavKafka.ts @@ -1,7 +1,7 @@ 'use strict'; import {Observable, Subscription} from '@reactivex/rxjs'; -import {IExternalClient} from '../defs'; +import {ICoupler} from '../defs'; // No definitions for kafka, sad let kafka = require('kafka-node'); @@ -11,12 +11,14 @@ export interface IConsumerConfig { offset?: any; } -export class GustavKafka implements IExternalClient { +export class GustavKafka implements ICoupler { + defaultName: string; constructor(public config?: any) { this.config = this.config || { connString: 'localhost:2181', clientId: 'kafka-gustav-client' }; + this.defaultName = 'kafka'; } getClient(): any { diff --git a/external/GustavMem.ts b/couplers/GustavMem.ts similarity index 90% rename from external/GustavMem.ts rename to couplers/GustavMem.ts index 3c20462..f7b2c6c 100644 --- a/external/GustavMem.ts +++ b/couplers/GustavMem.ts @@ -1,7 +1,7 @@ 'use strict'; import {Observable, Subscription} from '@reactivex/rxjs'; -import {IExternalClient} from '../defs'; +import {ICoupler} from '../defs'; /** * GustavMem is an in-memory messaging system. It provides NO GUARANTEES about anything. @@ -10,10 +10,12 @@ import {IExternalClient} from '../defs'; * Mainly this is used for tests. */ -export class GustavMem implements IExternalClient { +export class GustavMem implements ICoupler { channels: Object; + defaultName: string; constructor() { this.channels = {}; + this.defaultName = 'mem'; }; from(channelName: string): Observable { diff --git a/external/GustavRabbit.ts b/couplers/GustavRabbit.ts similarity index 92% rename from external/GustavRabbit.ts rename to couplers/GustavRabbit.ts index d046604..fe8b256 100644 --- a/external/GustavRabbit.ts +++ b/couplers/GustavRabbit.ts @@ -1,18 +1,20 @@ 'use strict'; import {Observable, Subscription} from '@reactivex/rxjs'; -import {IExternalClient} from '../defs'; +import {ICoupler} from '../defs'; import {connect} from 'amqplib'; import * as uuid from 'node-uuid'; // Untested, use at own risk // Uses fanout exchanges so we can connect many queues to them -export class GustavRabbit implements IExternalClient { +export class GustavRabbit implements ICoupler { + defaultName: string; constructor(public config?: any) { this.config = this.config || { connString: 'amqp://localhost' }; + this.defaultName = 'rabbitmq'; } from(exchange: string): Observable { diff --git a/external/GustavRedis.ts b/couplers/GustavRedis.ts similarity index 84% rename from external/GustavRedis.ts rename to couplers/GustavRedis.ts index f5e5f05..1c65f77 100644 --- a/external/GustavRedis.ts +++ b/couplers/GustavRedis.ts @@ -2,13 +2,16 @@ import {createClient, RedisClient} from 'redis'; import {Observable, Subscription} from '@reactivex/rxjs'; -import {IExternalClient} from '../defs'; +import {ICoupler} from '../defs'; // Attach to a Gustav instance // Provides helpers for getting data from & to Redis event channels -export class GustavRedis implements IExternalClient { - constructor(public config?: any) {} +export class GustavRedis implements ICoupler { + defaultName: string; + constructor(public config?: any) { + this.defaultName = 'redis'; + } // supposedly private but needs to be overridden in tests getClient(): RedisClient { diff --git a/defs.ts b/defs.ts index 5f9bee5..63acd9b 100644 --- a/defs.ts +++ b/defs.ts @@ -28,8 +28,9 @@ export interface ISinkNode { (config: Object, iO: Observable): Subscription; } -export interface IExternalClient { +export interface ICoupler { config?: any; + defaultName: string; getClient?(): any; from(name: string): Observable; to(name: string, iO: Observable): Subscription; diff --git a/index.ts b/index.ts index af2be3e..1310e4f 100644 --- a/index.ts +++ b/index.ts @@ -2,7 +2,7 @@ import {GustavGraph} from './GustavGraph'; import {Workflow} from './Workflow'; -import {IMetaConfig, IExternalClient} from './defs'; +import {IMetaConfig, ICoupler} from './defs'; export interface INodeFactory { (...config: any[]): symbol; @@ -137,9 +137,12 @@ export let gustav = { return graph; }, - external: (externalConnector: IExternalClient): void => { - gustav.source('__from', (name) => externalConnector.from(name)); - gustav.sink('__to', (name, iO) => externalConnector.to(name, iO)); + coupler: (externalCoupler: ICoupler, couplerName?: string): void => { + if (!couplerName) { + couplerName = externalCoupler.defaultName; + } + gustav.source('__from', (name) => externalCoupler.from(name)); + gustav.sink('__to', (name, iO) => externalCoupler.to(name, iO)); }, source: (name: string, factory: Function): Function => { return register('source', name, factory); }, transformer: (name: string, factory: Function): Function => { return register('transformer', name, factory); }, diff --git a/test/GKafka.ts b/test/GKafka.ts index 74c9efa..4d21611 100644 --- a/test/GKafka.ts +++ b/test/GKafka.ts @@ -1,6 +1,6 @@ 'use strict'; -import {GustavKafka} from '../external/GustavKafka'; +import {GustavKafka} from '../couplers/GustavKafka'; import {Observable} from '@reactivex/rxjs'; import {expect} from 'chai'; diff --git a/test/GMem.ts b/test/GMem.ts index 164899a..31b454f 100644 --- a/test/GMem.ts +++ b/test/GMem.ts @@ -1,6 +1,6 @@ 'use strict'; -import {GustavMem} from '../external/GustavMem'; +import {GustavMem} from '../couplers/GustavMem'; import {Observable} from '@reactivex/rxjs'; import {expect} from 'chai'; diff --git a/test/GRabbit.ts b/test/GRabbit.ts index bdd666a..ab1563a 100644 --- a/test/GRabbit.ts +++ b/test/GRabbit.ts @@ -1,6 +1,6 @@ 'use strict'; -import {GustavRabbit} from '../external/GustavRabbit'; +import {GustavRabbit} from '../couplers/GustavRabbit'; import {Observable} from '@reactivex/rxjs'; import {expect} from 'chai'; import {connect} from 'amqplib'; diff --git a/test/GRedis.ts b/test/GRedis.ts index bd41fec..da80061 100644 --- a/test/GRedis.ts +++ b/test/GRedis.ts @@ -1,6 +1,6 @@ 'use strict'; -import {GustavRedis} from '../external/GustavRedis'; +import {GustavRedis} from '../couplers/GustavRedis'; import {Observable} from '@reactivex/rxjs'; import {expect} from 'chai'; diff --git a/test/external.ts b/test/couplers.ts similarity index 82% rename from test/external.ts rename to test/couplers.ts index a9029a0..9a3cae5 100644 --- a/test/external.ts +++ b/test/couplers.ts @@ -2,11 +2,11 @@ import {gustav} from '../index'; import {Workflow} from '../Workflow'; -import {GustavMem} from '../external/GustavMem'; +import {GustavMem} from '../couplers/GustavMem'; import {addCommonNodes} from './testNodes'; -describe('gustav.external', () => { +describe('gustav.coupler', () => { let gm; beforeEach(() => { gustav.reset(); @@ -17,25 +17,25 @@ describe('gustav.external', () => { wfFactories.push((done): Workflow => { return gustav.createWorkflow('ex-0') .source('intSource') - .to('bill'); + .to('redis', 'bill'); }); wfFactories.push((done): Workflow => { return gustav.createWorkflow('ex-1') - .from('bill') + .from('redis', 'bill') .transf('timesTwo') .sink('fromIntTransformer', done); }); it('allows for multiple workflows', (done) => { - gustav.external(gm); + gustav.coupler(gm, 'redis'); wfFactories[1](done).start(); wfFactories[0]().start(); }); it('allows for forking workflows', done => { - gustav.external(gm); + gustav.coupler(gm, 'redis'); let inProgress = 3; let partDone = () => { diff --git a/tsconfig.json b/tsconfig.json index 5b44d2d..69cb11a 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -12,10 +12,10 @@ "Workflow.ts", "defs.ts", - "external/GustavRedis.ts", - "external/GustavKafka.ts", - "external/GustavRabbit.ts", - "external/GustavMem.ts", + "couplers/GustavRedis.ts", + "couplers/GustavKafka.ts", + "couplers/GustavRabbit.ts", + "couplers/GustavMem.ts", "test/common.ts", "test/startstop.ts", @@ -25,7 +25,7 @@ "test/register.ts", "test/misc.ts", "test/fullGraph.ts", - "test/external.ts", + "test/couplers.ts", "test/GRedis.ts", "test/GKafka.ts", "test/GRabbit.ts", From 5e5b5ecd6bef96d39cd5c7cf41368a842f7f0c60 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 14:40:14 -0800 Subject: [PATCH 13/25] fix(coupler): actually use the coupler requested --- Workflow.ts | 4 ++-- index.ts | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Workflow.ts b/Workflow.ts index 918113a..8056ce6 100644 --- a/Workflow.ts +++ b/Workflow.ts @@ -162,7 +162,7 @@ export class Workflow { from (type: string, name: string): IWorkflowChain { let prevNode; try { - prevNode = gustav.makeNode('__from', this.ggraph, name, {external: name}); + prevNode = gustav.makeNode(`__from-${type}`, this.ggraph, name, {external: name}); } catch (e) { throw new Error(`Tried to define \`from\` node "${name}" with no external coupler defined`); } @@ -298,7 +298,7 @@ class WorkflowChain { } to (type: string, name: string): Workflow { try { - this.addNodeToGraph('__to', 'sink', name, {external: name}); + this.addNodeToGraph(`__to-${type}`, 'sink', name, {external: name}); } catch (e) { throw new Error(`Tried to define \`to\` node "${name}" with no external coupler defined`); } diff --git a/index.ts b/index.ts index 1310e4f..deeda3b 100644 --- a/index.ts +++ b/index.ts @@ -141,8 +141,8 @@ export let gustav = { if (!couplerName) { couplerName = externalCoupler.defaultName; } - gustav.source('__from', (name) => externalCoupler.from(name)); - gustav.sink('__to', (name, iO) => externalCoupler.to(name, iO)); + gustav.source(`__from-${couplerName}`, (name) => externalCoupler.from(name)); + gustav.sink(`__to-${couplerName}`, (name, iO) => externalCoupler.to(name, iO)); }, source: (name: string, factory: Function): Function => { return register('source', name, factory); }, transformer: (name: string, factory: Function): Function => { return register('transformer', name, factory); }, From b1532c12f573ba3bbb7d90bb5ada93cd2a117ec6 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 14:59:18 -0800 Subject: [PATCH 14/25] test(coupler): test default name --- test/couplers.ts | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/test/couplers.ts b/test/couplers.ts index 9a3cae5..c063888 100644 --- a/test/couplers.ts +++ b/test/couplers.ts @@ -17,25 +17,32 @@ describe('gustav.coupler', () => { wfFactories.push((done): Workflow => { return gustav.createWorkflow('ex-0') .source('intSource') - .to('redis', 'bill'); + .to('mem', 'bill'); }); wfFactories.push((done): Workflow => { return gustav.createWorkflow('ex-1') - .from('redis', 'bill') + .from('mem', 'bill') .transf('timesTwo') .sink('fromIntTransformer', done); }); it('allows for multiple workflows', (done) => { - gustav.coupler(gm, 'redis'); + gustav.coupler(gm, 'mem'); + + wfFactories[1](done).start(); + wfFactories[0]().start(); + }); + + it('accepts a default name', (done) => { + gustav.coupler(gm); wfFactories[1](done).start(); wfFactories[0]().start(); }); it('allows for forking workflows', done => { - gustav.coupler(gm, 'redis'); + gustav.coupler(gm, 'mem'); let inProgress = 3; let partDone = () => { From 21480c0dfcbbfd51d216e7b9dd1adab4e87d168d Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 15:03:44 -0800 Subject: [PATCH 15/25] test(coupler): test multiple couplers --- test/couplers.ts | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/test/couplers.ts b/test/couplers.ts index c063888..339754d 100644 --- a/test/couplers.ts +++ b/test/couplers.ts @@ -27,6 +27,19 @@ describe('gustav.coupler', () => { .sink('fromIntTransformer', done); }); + wfFactories.push((): Workflow => { + return gustav.createWorkflow('ex-2') + .from('mem', 'bill') + .transf('timesTwo') + .to('mem2', 'bob'); + }); + + wfFactories.push((done): Workflow => { + return gustav.createWorkflow('ex-3') + .from('mem2', 'bob') + .sink('fromIntTransformer', done); + }); + it('allows for multiple workflows', (done) => { gustav.coupler(gm, 'mem'); @@ -41,6 +54,15 @@ describe('gustav.coupler', () => { wfFactories[0]().start(); }); + it('allows for multiple couplers', done => { + gustav.coupler(gm); + gustav.coupler(new GustavMem(), 'mem2'); + + wfFactories[3](done).start(); + wfFactories[2]().start(); + wfFactories[0]().start(); + }); + it('allows for forking workflows', done => { gustav.coupler(gm, 'mem'); let inProgress = 3; From f00677164f1f6f3332dfc6b4238418fed4c108e3 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 15:17:26 -0800 Subject: [PATCH 16/25] docs(couplers): Added details about couplers to README --- README.md | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/README.md b/README.md index aef7c90..61b43ed 100644 --- a/README.md +++ b/README.md @@ -153,3 +153,25 @@ Further nodes can be chained onto it, using the following methods: - `.sink` Attach a sink (terminal, returns the workflow itself) - `merge(...nodes)` Merge several nodes into a single stream - `.tap` Attach a sink but return the previous node for chaining + + +## Couplers + +Gustav is designed to work with an external tool that manages *where* all of the processed items go, while Gustav workflows manage the actual processing. Redis, Kafka and RabbitMQ are currently supported as well as an in-memory processor that provides no guarantees and is best used for testing. + +These are used as such: + +```typescript + +// Second string param names the coupler +// optional, and the coupler does come with a default +gustav.coupler(new GustavRedis(), ‘myRedis’); + +// SInce we've specified a Redis coupler, the following automagically knows to listen to the `demo-in` channel and push any events down the workflow + .from('demo-in') + // Vanilla transformer node + .transf('someTransfNode') + // Anything hitting this sink publishes a message to the `demo-out` channel + .to('demo-out') + .start(); +``` \ No newline at end of file From 6128ac49e774254ff3766e3f23ac86e0b6c6b63a Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 18 Jan 2016 15:53:40 -0800 Subject: [PATCH 17/25] fix: smart quotes are terrible --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 61b43ed..6015a6f 100644 --- a/README.md +++ b/README.md @@ -165,7 +165,7 @@ These are used as such: // Second string param names the coupler // optional, and the coupler does come with a default -gustav.coupler(new GustavRedis(), ‘myRedis’); +gustav.coupler(new GustavRedis(), 'myRedis'); // SInce we've specified a Redis coupler, the following automagically knows to listen to the `demo-in` channel and push any events down the workflow .from('demo-in') From 85624b3db5b879816351f5b4b665ca8a8649c476 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Tue, 26 Jan 2016 13:14:31 -0800 Subject: [PATCH 18/25] docs(couplers): describe details of couplers spec --- couplers/README.md | 59 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) create mode 100644 couplers/README.md diff --git a/couplers/README.md b/couplers/README.md new file mode 100644 index 0000000..4e44145 --- /dev/null +++ b/couplers/README.md @@ -0,0 +1,59 @@ +# Couplers + +*This document is intended to describe the spec for building a Gustav coupler. If you're interested in integrating couplers into your project, see [the main readme.](https://github.com/SomeKittens/gustav/tree/0.7#couplers)* + +## Interface + +First off, couplers should implement the ICoupler interface: + +```typescript + +export interface ICoupler { + config?: any; + defaultName: string; + getClient?(): any; + from(name: string): Observable; + to(name: string, iO: Observable): Subscription; +} +``` + +### `config?: any` + +The coupler may expose an optional config property of any sort. + +### `defaultName` + +If the user does not pass in a name, this will be used instead. Example: + +```typescript +let dc = new DemoCoupler(); +console.log(dc.defaultName); // 'demo' + +gustav.coupler(new DemoCoupler()); + +gustav.createWorkflow('hello') + .in('demo', 'hello-in') + // ...etc +``` + +This is used behind the scenes in gustav itself when creating the internal-only source/sink nodes. + +### `getClient?(): any` + +This is used to override the client in unit testing. Unsure if it'll stay in, we'll see as more couplers get added to the project. This is used internally with tools that require a new client for every connection (or connection type). + +### `from(name: string): Observable` + +Given a string, this should connect to the service of choice and open a channel to `name`. Any events on that channel should be emitted through the returned Observable sequence. + +#### Note: + +If the string `__done` is passed as an event, then the created observable should be closed (calling `.complete`). + +### `to(name: string, iO: Observable): Subscription` + +Given a string & Observable, subscribe to the Observable and push any events to the channel on your tool of choice. + +#### Note: + +Matching the above note, if the subscribed observable is completed, pass the string `__done` to the channel. \ No newline at end of file From 5600e0edb1ba69b9ef43be5c0a3311e8c969eb86 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Tue, 26 Jan 2016 13:24:25 -0800 Subject: [PATCH 19/25] docs(couplers): describe how things should work by default --- couplers/README.md | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/couplers/README.md b/couplers/README.md index 4e44145..a15daa8 100644 --- a/couplers/README.md +++ b/couplers/README.md @@ -56,4 +56,28 @@ Given a string & Observable, subscribe to the Observable and push any events to #### Note: -Matching the above note, if the subscribed observable is completed, pass the string `__done` to the channel. \ No newline at end of file +Matching the above note, if the subscribed observable is completed, pass the string `__done` to the channel. + +## Connection configuration + +## Defaults + +The coupler should use smart defaults for connecting, such that the constructor can be called with no arguments and everything still works. When possible, default to a passwordless default install of the tool on `localhost`. + +## Config + +When the user does pass in details, it should be in a single object with sane property names. In other words, instead of: + +`let g = new GustavDemo(ip, port, dbName, userName, lugnuts);` + +Use: + +```typescript +let g = new GustavDemo({ + ip, + port, + dbName, + userName, + lugNuts +}); +``` \ No newline at end of file From 932aac947ec9b69ed429e4d173e4dc8904e1b872 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 1 Feb 2016 11:46:38 -0800 Subject: [PATCH 20/25] docs: fix up some readme quirks under couplers --- couplers/README.md | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/couplers/README.md b/couplers/README.md index a15daa8..2b39a3f 100644 --- a/couplers/README.md +++ b/couplers/README.md @@ -19,11 +19,19 @@ export interface ICoupler { ### `config?: any` -The coupler may expose an optional config property of any sort. +The coupler may expose an optional config property of any sort. For instance, a config for [GustavRedis](https://github.com/SomeKittens/gustav/blob/master/couplers/GustavRedis.ts) might look like: + +```typescript +{ + host: 1.2.3.4, + port: 1337, + connect_timeout: 9001 +} +``` ### `defaultName` -If the user does not pass in a name, this will be used instead. Example: +If the implementing code does not pass in a name, this will be used instead. Example: ```typescript let dc = new DemoCoupler(); @@ -66,7 +74,7 @@ The coupler should use smart defaults for connecting, such that the constructor ## Config -When the user does pass in details, it should be in a single object with sane property names. In other words, instead of: +When the implementing code does pass in details, it should be in a single object with sane property names. In other words, instead of: `let g = new GustavDemo(ip, port, dbName, userName, lugnuts);` From 17f26587d85cf5691fac05df6ac6ca26259da1ed Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 1 Feb 2016 16:54:43 -0800 Subject: [PATCH 21/25] feat: added __done events to couplers --- couplers/GustavKafka.ts | 14 ++++++++++++-- couplers/GustavRabbit.ts | 13 +++++++++++-- couplers/GustavRedis.ts | 9 ++++++--- 3 files changed, 29 insertions(+), 7 deletions(-) diff --git a/couplers/GustavKafka.ts b/couplers/GustavKafka.ts index 4612919..997aabd 100644 --- a/couplers/GustavKafka.ts +++ b/couplers/GustavKafka.ts @@ -34,7 +34,12 @@ export class GustavKafka implements ICoupler { }]/* TODO */); return new Observable(o => { - consumer.on('message', m => o.next(m.value)); + consumer.on('message', m => { + if (m.value === '__done') { + return o.complete(); + } + o.next(m.value) + }); consumer.on('error', err => o.error(err)); return () => consumer.close(() => {}); @@ -79,7 +84,12 @@ export class GustavKafka implements ICoupler { }], handleErr); }, handleErr, - () => {/* noop */}// client.close(console.log.bind(console, 'done', config.topic)) + () => { + producer.send([{ + topic: topic, + messages: ['__done'] + }]); + } ); } } diff --git a/couplers/GustavRabbit.ts b/couplers/GustavRabbit.ts index fe8b256..b77462b 100644 --- a/couplers/GustavRabbit.ts +++ b/couplers/GustavRabbit.ts @@ -29,8 +29,12 @@ export class GustavRabbit implements ICoupler { ch.bindQueue(queue, exchange, ''); ch.consume(queue, msg => { - o.next(msg.content.toString()); + let msgStr = msg.content.toString(); ch.ack(msg); + if (msgStr === '__done') { + return o.complete(); + } + o.next(msgStr); }, {noAck: false}); }) .catch(err => o.error(err)); @@ -65,7 +69,12 @@ export class GustavRabbit implements ICoupler { channel.publish(exchange, '', new Buffer(msg)); }, err => console.error(`rabbitSink err, queue: ${queue}`, err), - () => conn && conn.close() + () => { + if (conn) { + channel.publish(exchange, '', new Buffer('__done')); + conn.close() + } + } ); } } diff --git a/couplers/GustavRedis.ts b/couplers/GustavRedis.ts index 1c65f77..e4a3699 100644 --- a/couplers/GustavRedis.ts +++ b/couplers/GustavRedis.ts @@ -4,8 +4,6 @@ import {createClient, RedisClient} from 'redis'; import {Observable, Subscription} from '@reactivex/rxjs'; import {ICoupler} from '../defs'; -// Attach to a Gustav instance - // Provides helpers for getting data from & to Redis event channels export class GustavRedis implements ICoupler { defaultName: string; @@ -24,6 +22,9 @@ export class GustavRedis implements ICoupler { let client = this.getClient(); return new Observable(o => { client.on('message', (channel, message) => { + if (message === '__done') { + return o.complete(); + } o.next(message); }); client.subscribe(channelName); @@ -34,7 +35,9 @@ export class GustavRedis implements ICoupler { to(channelName: string, iO: Observable): Subscription { let client = this.getClient(); return iO.subscribe( - item => client.publish(channelName, item) + item => client.publish(channelName, item), + err => { throw err; }, + () => client.publish(channelName, '__done') ); } } From d0f99f26156d5ba7efd69ff316719a64a3bdf9c6 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 1 Feb 2016 21:19:59 -0800 Subject: [PATCH 22/25] feat: export redis & kafka couplers --- index.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/index.ts b/index.ts index deeda3b..ea10a5a 100644 --- a/index.ts +++ b/index.ts @@ -4,6 +4,12 @@ import {GustavGraph} from './GustavGraph'; import {Workflow} from './Workflow'; import {IMetaConfig, ICoupler} from './defs'; +import * as GR from './couplers/GustavRedis'; +import * as GK from './couplers/GustavKafka'; + +export let GustavRedis = GR.GustavRedis; +export let GustavKafka = GK.GustavKafka; + export interface INodeFactory { (...config: any[]): symbol; } From e244078114de0cf60c374271c86c24e0d948d749 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Mon, 1 Feb 2016 21:20:16 -0800 Subject: [PATCH 23/25] refactor: coupler -> addCoupler --- index.ts | 2 +- test/couplers.ts | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/index.ts b/index.ts index ea10a5a..675763e 100644 --- a/index.ts +++ b/index.ts @@ -143,7 +143,7 @@ export let gustav = { return graph; }, - coupler: (externalCoupler: ICoupler, couplerName?: string): void => { + addCoupler: (externalCoupler: ICoupler, couplerName?: string): void => { if (!couplerName) { couplerName = externalCoupler.defaultName; } diff --git a/test/couplers.ts b/test/couplers.ts index 339754d..a4adb27 100644 --- a/test/couplers.ts +++ b/test/couplers.ts @@ -6,7 +6,7 @@ import {GustavMem} from '../couplers/GustavMem'; import {addCommonNodes} from './testNodes'; -describe('gustav.coupler', () => { +describe('gustav.addCoupler', () => { let gm; beforeEach(() => { gustav.reset(); @@ -41,22 +41,22 @@ describe('gustav.coupler', () => { }); it('allows for multiple workflows', (done) => { - gustav.coupler(gm, 'mem'); + gustav.addCoupler(gm, 'mem'); wfFactories[1](done).start(); wfFactories[0]().start(); }); it('accepts a default name', (done) => { - gustav.coupler(gm); + gustav.addCoupler(gm); wfFactories[1](done).start(); wfFactories[0]().start(); }); it('allows for multiple couplers', done => { - gustav.coupler(gm); - gustav.coupler(new GustavMem(), 'mem2'); + gustav.addCoupler(gm); + gustav.addCoupler(new GustavMem(), 'mem2'); wfFactories[3](done).start(); wfFactories[2]().start(); @@ -64,7 +64,7 @@ describe('gustav.coupler', () => { }); it('allows for forking workflows', done => { - gustav.coupler(gm, 'mem'); + gustav.addCoupler(gm, 'mem'); let inProgress = 3; let partDone = () => { From f7aa05c170837fab14e854859d0dd7c4f0d3074d Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Wed, 3 Feb 2016 14:15:03 -0800 Subject: [PATCH 24/25] chore: update tsd defs --- tsd.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tsd.json b/tsd.json index d8123e9..667f31d 100644 --- a/tsd.json +++ b/tsd.json @@ -6,7 +6,7 @@ "bundle": "typings/tsd.d.ts", "installed": { "node/node.d.ts": { - "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" + "commit": "717a5fdb079f8dd7c19f1b22f7f656dd990f0ccf" }, "mocha/mocha.d.ts": { "commit": "efd40e67ff323f7147651bdbef03c03ead7b1675" @@ -27,7 +27,7 @@ "commit": "efd40e67ff323f7147651bdbef03c03ead7b1675" }, "redis/redis.d.ts": { - "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" + "commit": "717a5fdb079f8dd7c19f1b22f7f656dd990f0ccf" }, "amqplib/amqplib.d.ts": { "commit": "70bf7e2bfeb0d5b1b651ef3219bcc65c8eec117e" From cc62a731fc03b0041e92022310a10ce318b99d23 Mon Sep 17 00:00:00 2001 From: SomeKittens Date: Wed, 3 Feb 2016 14:15:57 -0800 Subject: [PATCH 25/25] docs: fix typo in Coupler details --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6015a6f..a033d8d 100644 --- a/README.md +++ b/README.md @@ -167,7 +167,7 @@ These are used as such: // optional, and the coupler does come with a default gustav.coupler(new GustavRedis(), 'myRedis'); -// SInce we've specified a Redis coupler, the following automagically knows to listen to the `demo-in` channel and push any events down the workflow +// Since we've specified a Redis coupler, the following automagically knows to listen to the `demo-in` channel and push any events down the workflow .from('demo-in') // Vanilla transformer node .transf('someTransfNode')