Skip to content

Commit c20869d

Browse files
committed
activate: support new estuary.dev/cordon label
`estuary.dev/cordon` is a new runtime label which is passed-through from journal and shard splits when activating catalog changes into a data-plane. - A cordoned journal is marked as read-only, preventing further appends. - A cordoned task shard is disabled. In essence, the label "fences" future or raced task activations which might otherwise inadvertenly re-activate a cordoned spec.
1 parent 0f41c22 commit c20869d

File tree

5 files changed

+177
-103
lines changed

5 files changed

+177
-103
lines changed

crates/activate/src/lib.rs

Lines changed: 145 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -38,26 +38,76 @@ struct TaskTemplate<'a> {
3838
recovery: &'a JournalSpec,
3939
}
4040

41-
/// Activate a capture into a data-plane.
42-
pub async fn activate_capture(
43-
journal_client: &gazette::journal::Client,
44-
shard_client: &gazette::shard::Client,
45-
capture: &models::Capture,
46-
task_spec: Option<&flow::CaptureSpec>,
47-
ops_logs_template: Option<&broker::JournalSpec>,
48-
ops_stats_template: Option<&broker::JournalSpec>,
49-
initial_splits: usize,
50-
) -> anyhow::Result<()> {
51-
let task_template = if let Some(task_spec) = task_spec {
52-
let shard_template = task_spec
41+
// Map a CaptureSpec into its activation TaskTemplate.
42+
fn capture_template(task_spec: Option<&flow::CaptureSpec>) -> anyhow::Result<Option<TaskTemplate>> {
43+
let Some(task_spec) = task_spec else {
44+
return Ok(None);
45+
};
46+
47+
let shard_template = task_spec
48+
.shard_template
49+
.as_ref()
50+
.context("CaptureSpec missing shard_template")?;
51+
52+
let recovery_template = task_spec
53+
.recovery_log_template
54+
.as_ref()
55+
.context("CaptureSpec missing recovery_log_template")?;
56+
57+
Ok(Some(TaskTemplate {
58+
shard: shard_template,
59+
recovery: recovery_template,
60+
}))
61+
}
62+
63+
// Map a MaterializationSpec into its activation TaskTemplate.
64+
fn materialization_template(
65+
task_spec: Option<&flow::MaterializationSpec>,
66+
) -> anyhow::Result<Option<TaskTemplate>> {
67+
let Some(task_spec) = task_spec else {
68+
return Ok(None);
69+
};
70+
71+
let shard_template = task_spec
72+
.shard_template
73+
.as_ref()
74+
.context("MaterializationSpec missing shard_template")?;
75+
76+
let recovery_template = task_spec
77+
.recovery_log_template
78+
.as_ref()
79+
.context("MaterializationSpec missing recovery_log_template")?;
80+
81+
Ok(Some(TaskTemplate {
82+
shard: shard_template,
83+
recovery: recovery_template,
84+
}))
85+
}
86+
87+
// Map a CollectionSpeck into its activation partition template and,
88+
// if a derivation, its activation TaskTemplate.
89+
fn collection_template(
90+
task_spec: Option<&flow::CollectionSpec>,
91+
) -> anyhow::Result<(Option<&JournalSpec>, Option<TaskTemplate>)> {
92+
let Some(task_spec) = task_spec else {
93+
return Ok((None, None));
94+
};
95+
96+
let partition_template = task_spec
97+
.partition_template
98+
.as_ref()
99+
.context("CollectionSpec missing partition_template")?;
100+
101+
let task_template = if let Some(derivation) = &task_spec.derivation {
102+
let shard_template = derivation
53103
.shard_template
54104
.as_ref()
55-
.context("CaptureSpec missing shard_template")?;
105+
.context("CollectionSpec.Derivation missing shard_template")?;
56106

57-
let recovery_template = task_spec
107+
let recovery_template = derivation
58108
.recovery_log_template
59109
.as_ref()
60-
.context("CaptureSpec missing recovery_log_template")?;
110+
.context("CollectionSpec.Derivation missing recovery_log_template")?;
61111

62112
Some(TaskTemplate {
63113
shard: shard_template,
@@ -67,6 +117,21 @@ pub async fn activate_capture(
67117
None
68118
};
69119

120+
Ok((Some(partition_template), task_template))
121+
}
122+
123+
/// Activate a capture into a data-plane.
124+
pub async fn activate_capture(
125+
journal_client: &gazette::journal::Client,
126+
shard_client: &gazette::shard::Client,
127+
capture: &models::Capture,
128+
task_spec: Option<&flow::CaptureSpec>,
129+
ops_logs_template: Option<&broker::JournalSpec>,
130+
ops_stats_template: Option<&broker::JournalSpec>,
131+
initial_splits: usize,
132+
) -> anyhow::Result<()> {
133+
let task_template = capture_template(task_spec)?;
134+
70135
let changes = converge_task_changes(
71136
journal_client,
72137
shard_client,
@@ -92,35 +157,7 @@ pub async fn activate_collection(
92157
ops_stats_template: Option<&broker::JournalSpec>,
93158
initial_splits: usize,
94159
) -> anyhow::Result<()> {
95-
let (task_template, partition_template) = if let Some(task_spec) = task_spec {
96-
let partition_template = task_spec
97-
.partition_template
98-
.as_ref()
99-
.context("CollectionSpec missing partition_template")?;
100-
101-
let task_template = if let Some(derivation) = &task_spec.derivation {
102-
let shard_template = derivation
103-
.shard_template
104-
.as_ref()
105-
.context("CollectionSpec.Derivation missing shard_template")?;
106-
107-
let recovery_template = derivation
108-
.recovery_log_template
109-
.as_ref()
110-
.context("CollectionSpec.Derivation missing recovery_log_template")?;
111-
112-
Some(TaskTemplate {
113-
shard: shard_template,
114-
recovery: recovery_template,
115-
})
116-
} else {
117-
None
118-
};
119-
120-
(task_template, Some(partition_template))
121-
} else {
122-
(None, None)
123-
};
160+
let (partition_template, task_template) = collection_template(task_spec)?;
124161

125162
let (changes_1, changes_2) = futures::try_join!(
126163
converge_task_changes(
@@ -154,24 +191,7 @@ pub async fn activate_materialization(
154191
ops_stats_template: Option<&broker::JournalSpec>,
155192
initial_splits: usize,
156193
) -> anyhow::Result<()> {
157-
let task_template = if let Some(task_spec) = task_spec {
158-
let shard_template = task_spec
159-
.shard_template
160-
.as_ref()
161-
.context("MaterializationSpec missing shard_template")?;
162-
163-
let recovery_template = task_spec
164-
.recovery_log_template
165-
.as_ref()
166-
.context("MaterializationSpec missing recovery_log_template")?;
167-
168-
Some(TaskTemplate {
169-
shard: shard_template,
170-
recovery: recovery_template,
171-
})
172-
} else {
173-
None
174-
};
194+
let task_template = materialization_template(task_spec)?;
175195

176196
let changes = converge_task_changes(
177197
journal_client,
@@ -519,6 +539,16 @@ fn task_changes<'a>(
519539
..template.shard.clone()
520540
};
521541

542+
// Next resolve the shard's recovery-log JournalSpec.
543+
let recovery_name = format!("{}/{}", shard_spec.recovery_log_prefix, shard_spec.id);
544+
let recovery_split = recovery.remove(&recovery_name).unwrap_or_default();
545+
546+
let mut recovery_spec = JournalSpec {
547+
name: recovery_name,
548+
suspend: recovery_split.suspend, // Must be passed through.
549+
..template.recovery.clone()
550+
};
551+
522552
// Resolve the labels of the ShardSpec by merging labels managed the
523553
// control-plane versus the data-plane.
524554
let mut shard_labels = shard_spec.labels.take().unwrap_or_default();
@@ -536,21 +566,17 @@ fn task_changes<'a>(
536566
if label.name == labels::SPLIT_SOURCE {
537567
shard_spec.hot_standbys = 0
538568
}
569+
570+
// A cordoned task is disabled with its recovery log marked read-only.
571+
if label.name == labels::CORDON {
572+
shard_spec.disable = true;
573+
recovery_spec.flags = proto_gazette::broker::journal_spec::Flag::ORdonly as u32;
574+
}
539575
}
540576
shard_labels = labels::set_value(shard_labels, labels::LOGS_JOURNAL, ops_logs_name);
541577
shard_labels = labels::set_value(shard_labels, labels::STATS_JOURNAL, ops_stats_name);
542578
shard_spec.labels = Some(shard_labels);
543579

544-
// Next resolve the shard's recovery-log JournalSpec.
545-
let recovery_name = format!("{}/{}", shard_spec.recovery_log_prefix, shard_spec.id);
546-
let recovery_split = recovery.remove(&recovery_name).unwrap_or_default();
547-
548-
let recovery_spec = JournalSpec {
549-
name: recovery_name,
550-
suspend: recovery_split.suspend, // Must be passed through.
551-
..template.recovery.clone()
552-
};
553-
554580
changes.push(Change::Shard(consumer::apply_request::Change {
555581
expect_mod_revision: shard_revision,
556582
upsert: Some(shard_spec),
@@ -626,6 +652,11 @@ fn partition_changes(
626652
continue;
627653
}
628654
spec_labels = labels::add_value(spec_labels, &label.name, &label.value);
655+
656+
// A cordoned journal is marked as read-only to prevent further writes.
657+
if label.name == labels::CORDON {
658+
spec.flags = proto_gazette::broker::journal_spec::Flag::ORdonly as u32;
659+
}
629660
}
630661
spec.labels = Some(spec_labels);
631662

@@ -932,9 +963,9 @@ mod test {
932963
let mut all_recovery = Vec::new();
933964
let mut all_recovery_disabled = Vec::new();
934965

935-
let mut make_partition = |key_begin, key_end, doc: serde_json::Value| {
966+
let mut make_partition = |key_begin, key_end, doc: serde_json::Value, labels: LabelSet| {
936967
let labels = labels::partition::encode_field_range(
937-
labels::build_set([("extra", "1")]),
968+
labels::add_value(labels, "extra", "1"),
938969
key_begin,
939970
key_end,
940971
partition_fields,
@@ -958,9 +989,11 @@ mod test {
958989
});
959990
};
960991

961-
let mut make_task = |range_spec| {
962-
let labels =
963-
labels::shard::encode_range_spec(labels::build_set([("extra", "1")]), range_spec);
992+
let mut make_task = |range_spec, labels: LabelSet| {
993+
let labels = labels::shard::encode_range_spec(
994+
labels::add_value(labels, "extra", "1"),
995+
range_spec,
996+
);
964997
let shard_id = format!(
965998
"{}/{}",
966999
shard_template.id,
@@ -1008,32 +1041,48 @@ mod test {
10081041
0x10000000,
10091042
0x3fffffff,
10101043
json!({"a_bool": true, "a_str": "a-val"}),
1044+
LabelSet::default(),
10111045
);
10121046
make_partition(
10131047
0x40000000,
10141048
0x5fffffff,
10151049
json!({"a_bool": true, "a_str": "a-val"}),
1050+
LabelSet::default(),
1051+
);
1052+
make_partition(
1053+
0,
1054+
u32::MAX,
1055+
json!({"a_bool": false, "a_str": "other-val"}),
1056+
labels::build_set([(labels::CORDON, "true")]),
10161057
);
1017-
make_partition(0, u32::MAX, json!({"a_bool": false, "a_str": "other-val"}));
10181058

1019-
make_task(&flow::RangeSpec {
1020-
key_begin: 0x10000000,
1021-
key_end: 0x2fffffff,
1022-
r_clock_begin: 0x60000000,
1023-
r_clock_end: 0x9fffffff,
1024-
});
1025-
make_task(&flow::RangeSpec {
1026-
key_begin: 0x30000000,
1027-
key_end: 0x3fffffff,
1028-
r_clock_begin: 0x60000000,
1029-
r_clock_end: 0x7fffffff,
1030-
});
1031-
make_task(&flow::RangeSpec {
1032-
key_begin: 0x30000000,
1033-
key_end: 0x3fffffff,
1034-
r_clock_begin: 0x80000000,
1035-
r_clock_end: 0x9fffffff,
1036-
});
1059+
make_task(
1060+
&flow::RangeSpec {
1061+
key_begin: 0x10000000,
1062+
key_end: 0x2fffffff,
1063+
r_clock_begin: 0x60000000,
1064+
r_clock_end: 0x9fffffff,
1065+
},
1066+
LabelSet::default(),
1067+
);
1068+
make_task(
1069+
&flow::RangeSpec {
1070+
key_begin: 0x30000000,
1071+
key_end: 0x3fffffff,
1072+
r_clock_begin: 0x60000000,
1073+
r_clock_end: 0x7fffffff,
1074+
},
1075+
LabelSet::default(),
1076+
);
1077+
make_task(
1078+
&flow::RangeSpec {
1079+
key_begin: 0x30000000,
1080+
key_end: 0x3fffffff,
1081+
r_clock_begin: 0x80000000,
1082+
r_clock_end: 0x9fffffff,
1083+
},
1084+
labels::build_set([(labels::CORDON, "true")]),
1085+
);
10371086

10381087
// Case: test update of existing specs.
10391088
{

crates/activate/src/snapshots/activate__test__update-disabled.snap

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,10 @@ expression: "(partition_changes, task_changes)"
150150
"name": "estuary.dev/collection",
151151
"value": "example/collection"
152152
},
153+
{
154+
"name": "estuary.dev/cordon",
155+
"value": "true"
156+
},
153157
{
154158
"name": "estuary.dev/field/a_bool",
155159
"value": "%_false"
@@ -178,7 +182,7 @@ expression: "(partition_changes, task_changes)"
178182
"flushInterval": "900s",
179183
"pathPostfixTemplate": "utc_date={{.Spool.FirstAppendTime.Format \"2006-01-02\"}}/utc_hour={{.Spool.FirstAppendTime.Format \"15\"}}"
180184
},
181-
"flags": 4,
185+
"flags": 1,
182186
"maxAppendRate": "4194304",
183187
"suspend": {
184188
"level": "PARTIAL",
@@ -432,6 +436,10 @@ expression: "(partition_changes, task_changes)"
432436
"name": "estuary.dev/build",
433437
"value": "0101010101010101"
434438
},
439+
{
440+
"name": "estuary.dev/cordon",
441+
"value": "true"
442+
},
435443
{
436444
"name": "estuary.dev/key-begin",
437445
"value": "30000000"
@@ -513,7 +521,7 @@ expression: "(partition_changes, task_changes)"
513521
],
514522
"refreshInterval": "300s"
515523
},
516-
"flags": 4,
524+
"flags": 1,
517525
"maxAppendRate": "4194304",
518526
"suspend": {
519527
"level": "FULL",

0 commit comments

Comments
 (0)