Skip to content

Commit a90fbdd

Browse files
committed
examples: fix derive-patterns examples
1 parent 27c5253 commit a90fbdd

13 files changed

+164
-210
lines changed

examples/derive-patterns/README.md

Lines changed: 33 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,18 @@
22

33
## Where to accumulate?
44

5-
When building a derived collection, the central question is where
6-
accumulation will happen: within derivation registers, or within a
7-
materialized database? Both approaches can produce equivalent results,
8-
but they do it in very different ways.
5+
When building a derived collection, the central question is where accumulation
6+
will happen: within derivation state, or within an external database that you
7+
materialize into? Both approaches can produce equivalent results, but they do it
8+
in very different ways.
99

10-
### Accumulate in the Database
10+
### Accumulate in the external database
1111

12-
To accumulate in the database, you'll define a collection having a reducible
13-
schema with a derivation that uses only "publish" lambdas and no registers.
14-
The Flow runtime uses the provided annotations to reduce new documents into
15-
the collection, and ultimately keep the materialized table up to date.
12+
To accumulate in the external database, you'll define a collection having a
13+
reducible schema with a stateless derivation. The derivation can be written
14+
in either SQL or Typescript, but for these examples we use Typescript. The
15+
Flow runtime uses the provided annotations to reduce new documents into the
16+
collection, and ultimately keep the materialized table up to date.
1617

1718
A key insight is that the database is the _only_ stateful system in this
1819
scenario, and that Flow is making use of reductions in two places:
@@ -45,42 +46,43 @@ When materializing into a pub/sub topic, there _is_ no store to hold final value
4546
and Flow will publish delta states: each a partial update of the (unknown)
4647
final value.
4748

48-
### Accumulate in Registers
49+
### Accumulate in derivation state
4950

50-
Accumulating in registers involves a derivation that defines a reducible
51-
register schema, and uses "update" lambdas.
52-
Registers are arbitrary documents that can be shared and updated by the various
53-
transformations of a derivation. The Flow runtime allocates, manages, and scales
54-
durable storage for registers; you don't have to.
51+
Accumulating in derivation state involves a `sqlite` derivation having one
52+
or more tables, which are created by `migrations`. These tables can be shared
53+
and updated by the various transforms of the derivation. The Flow runtime
54+
transactionally persists modifications to these tables.
5555

56-
When using registers, the typical pattern is to use reduction annotations
57-
within updates of the register, and to then publish last-write-wins "snapshots"
58-
of the fully reduced value.
56+
When using a stateful derivation, the typical pattern is to use `INSERT ... ON
57+
CONFLICT ...` to accumulate state in your tables, and then `SELECT` from those
58+
tables to emit the documents.
5959

6060
Returning to our summing example:
6161

62-
| Time | Register | Lambdas | Derived Document |
63-
| ---- | -------- | ----------------------------------- | ---------------- |
64-
| T0 | **0** | update(2, 1, 2), publish(register) | **5** |
65-
| T1 | **5** | update(-2, 1), publish(register) | **4** |
66-
| T2 | **4** | update(3, -2, 1), publish(register) | **6** |
67-
| T3 | **6** | update() |
62+
| Time | sum table | Lambdas | Derived Document |
63+
| ---- | --------- | -------------------------------- | ---------------- |
64+
| T0 | **0** | update(2, 1, 2), select sum ... | **5** |
65+
| T1 | **5** | update(-2, 1), select sum ... | **4** |
66+
| T2 | **4** | update(3, -2, 1), select sum ... | **6** |
67+
| T3 | **6** | update() |
6868

69-
Register derivations are a great solution for materializations into non-
69+
Stateful derivations are a great solution for materializations into non-
7070
transactional stores, because the documents they produce can be applied
7171
multiple times without breaking correctness.
7272

7373
They're also well suited for materializations that publish into pub/sub,
7474
as they can produce stand-alone updates of a fully-reduced value.
7575

76-
### Example: Summing in DB vs Register
76+
Additionally, stateful derivations are the best way to perform inner joins and time-windowed joins.
77+
78+
### Example: Summing in a stateless vs a stateful derivation
7779

7880
See [summer.flow.yaml](summer.flow.yaml) for a simple example
79-
of summing counts in the database, vs in registers.
81+
of summing counts using both approaches.
8082

8183
## Types of Joins
8284

83-
### Outer Join accumulated in Database
85+
### Outer Join using a stateless derivation
8486

8587
Example of an outer join, which is reduced within a target database table.
8688
This join is "fully reactive": it updates with either source collection,
@@ -89,11 +91,11 @@ and reflects the complete accumulation of their documents on both sides.
8991
The literal documents written to the collection are combined delta states,
9092
reflecting changes on one or both sides of the join. These delta states
9193
are then fully reduced into the database table, and no other storage _but_
92-
the table is required by this example.
94+
the table being materialized into is required.
9395

9496
See [join-outer-flow.yaml](join-outer-flow.yaml).
9597

96-
### Inner Join accumulated in Registers
98+
### Inner Join using a stateful derivation
9799

98100
Example of an inner join, which is reduced within the derivation's registers.
99101
This join is also "fully reactive", updating with either source collection,
@@ -108,7 +110,7 @@ join are matched.
108110

109111
See [join-inner.flow.yaml](join-inner.flow.yaml).
110112

111-
### One-sided Join accumulated in Registers
113+
### One-sided join using a stateful derivation
112114

113115
Example of a one-sided join, which publishes a current LHS joined
114116
with an accumulated RHS.
@@ -118,13 +120,3 @@ paired with a reduced snapshot of the RHS accumulator at that time.
118120

119121
See [join-one-sided.flow.yaml](join-one-sided.yaml).
120122

121-
### Comparing Registers
122-
123-
Suppose we want to take action based on how a register is changing.
124-
125-
For example, suppose we want to detect "zero crossings" of a running sum,
126-
and then filter the source collection to those documents which caused the
127-
sum to cross from positive to negative (or vice versa).
128-
129-
We can use the `previous` register value to do so.
130-
See [zero-crossing.flow.yaml](zero-crossing.flow.yaml).

examples/derive-patterns/flow.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,3 @@ import:
44
- join-one-sided.flow.yaml
55
- join-outer.flow.yaml
66
- summer.flow.yaml
7-
- zero-crossing.flow.yaml

examples/derive-patterns/join-inner.flow.ts

Lines changed: 0 additions & 24 deletions
This file was deleted.

examples/derive-patterns/join-inner.flow.yaml

Lines changed: 41 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,26 +9,50 @@ collections:
99
required: [Key]
1010
key: [/Key]
1111

12-
derivation:
13-
register:
14-
schema: schema.yaml#Join
15-
initial: {}
16-
17-
transform:
18-
fromInts:
19-
source: { name: patterns/ints }
12+
derive:
13+
using:
14+
sqlite:
15+
migrations:
16+
- |
17+
create table join_state (
18+
key text not null primary key,
19+
-- Stores the left hand side of the join
20+
lhs integer,
21+
-- Stores the right hand side of the join, using a JSON array
22+
-- since this is a one-to-many join
23+
rhs json
24+
);
25+
transforms:
26+
- name: fromInts
27+
source: patterns/ints
2028
shuffle: { key: [/Key] }
21-
update: { lambda: typescript }
22-
publish: { lambda: typescript }
29+
lambda: |
30+
insert into join_state (key, lhs) values ($Key, $Int)
31+
on conflict (key) do update set lhs = lhs + $Int;
32+
-- now emit the joined result
33+
SELECT JSON_OBJECT(
34+
'Key', $Key,
35+
'LHS', lhs,
36+
'RHS', JSON(rhs)
37+
)
38+
FROM join_state
39+
WHERE key = $Key AND lhs IS NOT NULL AND rhs IS NOT NULL;
2340
24-
fromStrings:
25-
source: { name: patterns/strings }
41+
- name: fromStrings
42+
source: patterns/strings
2643
shuffle: { key: [/Key] }
27-
update: { lambda: typescript }
28-
publish: { lambda: typescript }
29-
typescript:
30-
module: join-inner.flow.ts
31-
44+
lambda: |
45+
INSERT INTO join_state (key, rhs) VALUES ($Key, JSON_ARRAY($String))
46+
ON CONFLICT (key) DO UPDATE SET rhs = JSON_INSERT(COALESCE(rhs, '[]'), '$[#]', $String);
47+
-- now emit the joined result
48+
SELECT JSON_OBJECT(
49+
'Key', $Key,
50+
'LHS', lhs,
51+
'RHS', JSON(rhs)
52+
)
53+
FROM join_state
54+
WHERE key = $Key AND lhs IS NOT NULL AND rhs IS NOT NULL;
55+
3256
tests:
3357
patterns/test/inner-join:
3458
- ingest:

examples/derive-patterns/join-one-sided.flow.ts

Lines changed: 0 additions & 11 deletions
This file was deleted.

examples/derive-patterns/join-one-sided.flow.yaml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,42 @@ collections:
99
required: [Key]
1010
key: [/Key]
1111

12+
derive:
13+
using:
14+
sqlite:
15+
migrations:
16+
- |
17+
create table join_state (
18+
key text not null primary key,
19+
-- Stores the left hand side of the join
20+
lhs integer,
21+
-- Stores the right hand side of the join, using a JSON array
22+
-- since this is a one-to-many join
23+
rhs json
24+
);
25+
transforms:
26+
- name: fromInts
27+
source: patterns/ints
28+
shuffle: { key: [/Key] }
29+
lambda: |
30+
insert into join_state (key, lhs) values ($Key, $Int)
31+
on conflict (key) do update set lhs = $Int;
32+
-- now emit the joined result
33+
SELECT JSON_OBJECT(
34+
'Key', $Key,
35+
'LHS', lhs,
36+
'RHS', JSON(COALESCE(rhs, '[]'))
37+
)
38+
FROM join_state
39+
WHERE key = $Key;
40+
41+
- name: fromStrings
42+
source: patterns/strings
43+
shuffle: { key: [/Key] }
44+
lambda: |
45+
INSERT INTO join_state (key, rhs) VALUES ($Key, JSON_ARRAY($String))
46+
ON CONFLICT (key) DO UPDATE SET rhs = JSON_INSERT(COALESCE(rhs, '[]'), '$[#]', $String);
47+
1248
derivation:
1349
register:
1450
schema: schema.yaml#Join
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
import { IDerivation, Document, Register, FromIntsSource, FromStringsSource } from 'flow/patterns/outer-join';
1+
import { IDerivation, Document, SourceFromInts, SourceFromStrings } from 'flow/patterns/outer-join.ts';
22

33
// Implementation for derivation examples/derive-patterns/join-outer.flow.yaml#/collections/patterns~1outer-join/derivation.
4-
export class Derivation implements IDerivation {
5-
fromIntsPublish(source: FromIntsSource, _register: Register, _previous: Register): Document[] {
6-
return [{ Key: source.Key, LHS: source.Int }];
4+
export class Derivation extends IDerivation {
5+
fromInts(read: {doc: SourceFromInts}): Document[] {
6+
return [{ Key: read.doc.Key, LHS: read.doc.Int }];
77
}
8-
fromStringsPublish(source: FromStringsSource, _register: Register, _previous: Register): Document[] {
9-
return [{ Key: source.Key, RHS: [source.String] }];
8+
fromStrings(read: {doc: SourceFromStrings}): Document[] {
9+
return [{ Key: read.doc.Key, RHS: [read.doc.String] }];
1010
}
1111
}

examples/derive-patterns/join-outer.flow.yaml

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,17 @@ collections:
88
required: [Key]
99
key: [/Key]
1010

11-
derivation:
12-
transform:
13-
fromInts:
11+
derive:
12+
using:
13+
typescript: { module: join-outer.flow.ts }
14+
transforms:
15+
- name: fromInts
1416
source: { name: patterns/ints }
1517
shuffle: { key: [/Key] }
16-
publish: { lambda: typescript }
1718

18-
fromStrings:
19+
- name: fromStrings
1920
source: { name: patterns/strings }
2021
shuffle: { key: [/Key] }
21-
publish: { lambda: typescript }
22-
typescript:
23-
module: join-outer.flow.ts
2422

2523
tests:
2624
patterns/test/outer-join:
Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
import { IDerivation, Document, Register, FromIntsSource } from 'flow/patterns/sums-db';
1+
import { IDerivation, Document, SourceFromInts } from 'flow/patterns/sums-reductions.ts';
22

33
// Implementation for derivation examples/derive-patterns/summer.flow.yaml#/collections/patterns~1sums-db/derivation.
4-
export class Derivation implements IDerivation {
5-
fromIntsPublish(source: FromIntsSource, _register: Register, _previous: Register): Document[] {
6-
return [{ Key: source.Key, Sum: source.Int }];
4+
export class Derivation extends IDerivation {
5+
fromInts(read: { doc: SourceFromInts }): Document[] {
6+
return [{ Key: read.doc.Key, Sum: read.doc.Int }];
77
}
88
}

0 commit comments

Comments
 (0)