Skip to content

Commit 975220d

Browse files
committed
crates/activate & go/shuffle: support for journal suspension
Activation is refactored to preserve and pass-through JournalSpec::Suspend fields when applying control-plane activations. The prior dekaf-specific activation logic is moved to `agent`, and simplified to use no initial shard splits for dekaf tasks. Activation is also updated to avoid creating data-plane resources for tasks which are created into a disabled state. The shuffled read machinery is updated to be aware of suspended journals: to drain reads when they become suspended, and to not read them again until they re-activate.
1 parent 5068a69 commit 975220d

14 files changed

+1266
-403
lines changed

crates/activate/src/lib.rs

Lines changed: 504 additions & 328 deletions
Large diffs are not rendered by default.
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
source: crates/activate/src/lib.rs
3+
expression: "(partition_changes, task_changes)"
4+
---
5+
[
6+
[],
7+
[]
8+
]
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
source: crates/activate/src/lib.rs
3+
expression: "(partition_changes, task_changes)"
4+
---
5+
[
6+
[],
7+
[]
8+
]

crates/activate/src/snapshots/activate__test__ops_collection_partition.snap

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
source: crates/activate/src/lib.rs
3-
expression: "json!([\"list_req\", list_req, \"spec\", spec])"
3+
expression: "json!([\"list_req\", list_req, \"spec\", spec, \"create\",\nops_journal_changes(Some(spec.clone()), Vec::new()), \"update-exists\",\nops_journal_changes(Some(spec.clone()), vec![exists]),])"
44
---
55
[
66
"list_req",
@@ -68,5 +68,64 @@ expression: "json!([\"list_req\", list_req, \"spec\", spec])"
6868
"maxAppendRate": "4194304",
6969
"name": "ops/tasks/BASE_NAME/logs/2020202020202020/kind=capture/name=the%2Ftask%2Fname/pivot=00",
7070
"replication": 3
71-
}
71+
},
72+
"create",
73+
{
74+
"Journal": {
75+
"upsert": {
76+
"flags": 4,
77+
"fragment": {
78+
"compressionCodec": "GZIP",
79+
"flushInterval": "86400s",
80+
"length": "536870912",
81+
"pathPostfixTemplate": "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}",
82+
"refreshInterval": "300s",
83+
"stores": [
84+
"gs://example-bucket/"
85+
]
86+
},
87+
"labels": {
88+
"labels": [
89+
{
90+
"name": "app.gazette.dev/managed-by",
91+
"value": "estuary.dev/flow"
92+
},
93+
{
94+
"name": "content-type",
95+
"value": "application/x-ndjson"
96+
},
97+
{
98+
"name": "estuary.dev/build",
99+
"value": "0101010101010101"
100+
},
101+
{
102+
"name": "estuary.dev/collection",
103+
"value": "ops/tasks/BASE_NAME/logs"
104+
},
105+
{
106+
"name": "estuary.dev/field/kind",
107+
"value": "capture"
108+
},
109+
{
110+
"name": "estuary.dev/field/name",
111+
"value": "the%2Ftask%2Fname"
112+
},
113+
{
114+
"name": "estuary.dev/key-begin",
115+
"value": "00000000"
116+
},
117+
{
118+
"name": "estuary.dev/key-end",
119+
"value": "ffffffff"
120+
}
121+
]
122+
},
123+
"maxAppendRate": "4194304",
124+
"name": "ops/tasks/BASE_NAME/logs/2020202020202020/kind=capture/name=the%2Ftask%2Fname/pivot=00",
125+
"replication": 3
126+
}
127+
}
128+
},
129+
"update-exists",
130+
null
72131
]

crates/activate/src/snapshots/activate__test__partition_splits.snap

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,12 @@
11
---
22
source: crates/activate/src/lib.rs
3-
expression: "json!([\"splits\", splits, \"partition_changes\", partition_changes])"
3+
expression: "json!([\"splits\", (lhs, rhs), \"partition_changes\", partition_changes])"
44
---
55
[
66
"splits",
77
[
8-
[
9-
"example/collection/2020202020202020/a_bool=%_true/a_str=a-val/pivot=10000000",
10-
{
8+
{
9+
"labels": {
1110
"labels": [
1211
{
1312
"name": "estuary.dev/field/a_bool",
@@ -31,11 +30,15 @@ expression: "json!([\"splits\", splits, \"partition_changes\", partition_changes
3130
}
3231
]
3332
},
34-
111
35-
],
36-
[
37-
"example/collection/2020202020202020/a_bool=%_true/a_str=a-val/pivot=28000000",
38-
{
33+
"mod_revision": 111,
34+
"name": "example/collection/2020202020202020/a_bool=%_true/a_str=a-val/pivot=10000000",
35+
"suspend": {
36+
"level": "PARTIAL",
37+
"offset": "112233"
38+
}
39+
},
40+
{
41+
"labels": {
3942
"labels": [
4043
{
4144
"name": "estuary.dev/field/a_bool",
@@ -59,8 +62,10 @@ expression: "json!([\"splits\", splits, \"partition_changes\", partition_changes
5962
}
6063
]
6164
},
62-
0
63-
]
65+
"mod_revision": 0,
66+
"name": "example/collection/2020202020202020/a_bool=%_true/a_str=a-val/pivot=28000000",
67+
"suspend": null
68+
}
6469
],
6570
"partition_changes",
6671
[
@@ -117,7 +122,11 @@ expression: "json!([\"splits\", splits, \"partition_changes\", partition_changes
117122
},
118123
"maxAppendRate": "4194304",
119124
"name": "example/collection/2020202020202020/a_bool=%_true/a_str=a-val/pivot=10000000",
120-
"replication": 3
125+
"replication": 3,
126+
"suspend": {
127+
"level": "PARTIAL",
128+
"offset": "112233"
129+
}
121130
}
122131
}
123132
},

crates/activate/src/snapshots/activate__test__shard_splits.snap

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
---
22
source: crates/activate/src/lib.rs
3-
expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits, \"key_changes\",\n key_changes, \"clock_changes\", clock_changes,])"
3+
expression: "json!([\"key_splits\", (&key_lhs, &key_rhs), \"clock_splits\",\n(&clock_lhs, &clock_rhs), \"key_changes\", key_changes, \"clock_changes\",\nclock_changes,])"
44
---
55
[
66
"key_splits",
77
[
8-
[
9-
"derivation/example/derivation/2020202020202020/10000000-60000000",
10-
{
8+
{
9+
"id": "derivation/example/derivation/2020202020202020/10000000-60000000",
10+
"labels": {
1111
"labels": [
1212
{
1313
"name": "estuary.dev/key-begin",
@@ -35,11 +35,11 @@ expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits,
3535
}
3636
]
3737
},
38-
111
39-
],
40-
[
41-
"derivation/example/derivation/2020202020202020/20000000-60000000",
42-
{
38+
"mod_revision": 111
39+
},
40+
{
41+
"id": "derivation/example/derivation/2020202020202020/20000000-60000000",
42+
"labels": {
4343
"labels": [
4444
{
4545
"name": "estuary.dev/key-begin",
@@ -67,14 +67,14 @@ expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits,
6767
}
6868
]
6969
},
70-
0
71-
]
70+
"mod_revision": 0
71+
}
7272
],
7373
"clock_splits",
7474
[
75-
[
76-
"derivation/example/derivation/2020202020202020/10000000-60000000",
77-
{
75+
{
76+
"id": "derivation/example/derivation/2020202020202020/10000000-60000000",
77+
"labels": {
7878
"labels": [
7979
{
8080
"name": "estuary.dev/key-begin",
@@ -102,11 +102,11 @@ expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits,
102102
}
103103
]
104104
},
105-
111
106-
],
107-
[
108-
"derivation/example/derivation/2020202020202020/10000000-80000000",
109-
{
105+
"mod_revision": 111
106+
},
107+
{
108+
"id": "derivation/example/derivation/2020202020202020/10000000-80000000",
109+
"labels": {
110110
"labels": [
111111
{
112112
"name": "estuary.dev/key-begin",
@@ -134,8 +134,8 @@ expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits,
134134
}
135135
]
136136
},
137-
0
138-
]
137+
"mod_revision": 0
138+
}
139139
],
140140
"key_changes",
141141
[
@@ -246,7 +246,10 @@ expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits,
246246
},
247247
"maxAppendRate": "4194304",
248248
"name": "recovery/derivation/example/derivation/2020202020202020/10000000-60000000",
249-
"replication": 3
249+
"replication": 3,
250+
"suspend": {
251+
"offset": "445566"
252+
}
250253
}
251254
}
252255
},
@@ -468,7 +471,10 @@ expression: "json!([\"key_splits\", &key_splits, \"clock_splits\", clock_splits,
468471
},
469472
"maxAppendRate": "4194304",
470473
"name": "recovery/derivation/example/derivation/2020202020202020/10000000-60000000",
471-
"replication": 3
474+
"replication": 3,
475+
"suspend": {
476+
"offset": "445566"
477+
}
472478
}
473479
}
474480
},

0 commit comments

Comments
 (0)