Skip to content

Commit 820b193

Browse files
committed
ops-catalog: thread through collection generation id on inferred schemas
This builds on previous work, which added the id of the publication that initially created a collection as a suffix of the journal template. Originally, this was intended to allow correct handling of cloud storage so that a collection (or task) can be deleted and re-created again with the same name. The journal template suffix will be different, so that data from the previous generation is kept separate from data for the new generation. But a problem remains with inferred schemas, since the inferred schemas table only stores the collection name and not the journal template suffix. This begins formalizing that journal template suffix as a `generation_id`, and updates the runtime and ops catalog to thread through the generation id as part of inferred schema updates. It is materialized as part of the `inferred_schemas` table. This commit stops short of actually changing how inferred schema resolution works. Notably, the primary key of the `inferred_schemas` table is unchanged, meaning that multiple different inferred schemas still cannot coexist for the same `collection_name`. Instead, the `collection_generation_id` is simply materialized alongside the schema, and we can figure later how we want that to factor into inferred schema resolutions as part of publications.
1 parent 6979dcf commit 820b193

File tree

15 files changed

+47
-9
lines changed

15 files changed

+47
-9
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ proto-grpc = { path = "../proto-grpc", features = [
3535
"runtime_server",
3636
] }
3737
simd-doc = { path = "../simd-doc" }
38+
tables = { path = "../tables" }
3839
tuple = { path = "../tuple" }
3940
unseal = { path = "../unseal" }
4041

crates/runtime/src/capture/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ pub struct Task {
3030
struct Binding {
3131
// Target collection.
3232
collection_name: String,
33+
// Generation id of the collection, which must be output as part of updating inferred schemas.
34+
collection_generation_id: models::Id,
3335
// JSON pointer at which document UUIDs are added.
3436
document_uuid_ptr: doc::Pointer,
3537
// Key components which are extracted from written documents.

crates/runtime/src/capture/protocol.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,11 +332,17 @@ pub async fn recv_client_start_commit(
332332
// produce structured logs of all inferred schemas that have changed
333333
// in this transaction.
334334
for binding in txn.updated_inferences.iter() {
335-
let serialized = doc::shape::schema::to_schema(shapes[*binding].clone());
335+
let mut serialized = doc::shape::schema::to_schema(shapes[*binding].clone());
336+
let gen_id = task.bindings[*binding].collection_generation_id.to_string();
337+
serialized.schema.extensions.insert(
338+
"x-collection-generation-id".to_string(),
339+
serde_json::Value::String(gen_id),
340+
);
336341

337342
tracing::info!(
338343
schema = ?ops::DebugJson(serialized),
339344
collection_name = %task.bindings[*binding].collection_name,
345+
collection_generation_id = %task.bindings[*binding].collection_generation_id,
340346
binding = binding,
341347
"inferred schema updated"
342348
);

crates/runtime/src/capture/task.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,12 @@ impl Binding {
132132
state_key: _,
133133
} = spec;
134134

135+
let collection_spec = collection.as_ref().context("missing collection")?;
136+
// Determine the generation id of the collection, which we'll need when updating inferred schemas.
137+
// For legacy collections that don't have a generation id, we'll use the zero id.
138+
let collection_generation_id = tables::utils::get_collection_generation_id(collection_spec)
139+
.unwrap_or(models::Id::zero());
140+
135141
let flow::CollectionSpec {
136142
ack_template_json: _,
137143
derivation: _,
@@ -143,7 +149,7 @@ impl Binding {
143149
read_schema_json: _,
144150
uuid_ptr,
145151
write_schema_json,
146-
} = collection.as_ref().context("missing collection")?;
152+
} = collection_spec;
147153

148154
let document_uuid_ptr = doc::Pointer::from(uuid_ptr);
149155
let key_extractors = extractors::for_key(&key, &projections, &ser_policy)?;
@@ -152,6 +158,7 @@ impl Binding {
152158

153159
Ok(Self {
154160
collection_name: name.clone(),
161+
collection_generation_id,
155162
document_uuid_ptr,
156163
key_extractors,
157164
partition_extractors,

crates/runtime/src/derive/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ impl<T: Stream<Item = anyhow::Result<Response>> + Send + 'static> ResponseStream
1919
pub struct Task {
2020
// Target collection.
2121
collection_name: String,
22+
/// The generation id of the derived collection, which gets output as part of inferred schema updates.
23+
collection_generation_id: models::Id,
2224
// JSON pointer at which document UUIDs are added.
2325
document_uuid_ptr: doc::Pointer,
2426
// Key components which are extracted from written documents.

crates/runtime/src/derive/protocol.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,7 @@ pub async fn recv_connector_started_commit(
365365
tracing::info!(
366366
schema = ?::ops::DebugJson(serialized),
367367
collection_name = %task.collection_name,
368+
collection_generation_id = %task.collection_generation_id,
368369
"inferred schema updated"
369370
);
370371
}

crates/runtime/src/derive/task.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ impl Task {
1414

1515
let response::Opened {} = opened.opened.as_ref().context("expected Opened")?;
1616

17+
let collection_spec = collection.context("missing collection")?;
18+
let collection_generation_id =
19+
tables::utils::get_collection_generation_id(&collection_spec)
20+
.unwrap_or(models::Id::zero());
1721
let flow::CollectionSpec {
1822
ack_template_json: _,
1923
derivation,
@@ -25,7 +29,7 @@ impl Task {
2529
read_schema_json: _,
2630
uuid_ptr,
2731
write_schema_json,
28-
} = collection.context("missing collection")?;
32+
} = collection_spec;
2933

3034
let flow::collection_spec::Derivation {
3135
config_json: _,
@@ -66,6 +70,7 @@ impl Task {
6670

6771
Ok(Self {
6872
collection_name,
73+
collection_generation_id,
6974
document_uuid_ptr,
7075
key_extractors,
7176
partition_extractors,

crates/tables/src/utils.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,20 @@
11
use models::{SourceCapture, SourceCaptureSchemaMode};
2+
use proto_flow::flow;
23
use serde::{Deserialize, Serialize};
34
use serde_json::Value;
45

6+
/// Returns the generation id for a given collection spec, if specified. Legacy
7+
/// collection specs may not have a generation id, in which case `None` is
8+
/// returned.
9+
pub fn get_collection_generation_id(c: &flow::CollectionSpec) -> Option<models::Id> {
10+
let partition_prefix = &c.partition_template.as_ref()?.name;
11+
let (_, last) = partition_prefix.rsplit_once('/')?;
12+
13+
// If this is a legacy collection spec, then the `last` path component will
14+
// be a string that cannot be parsed as an id.
15+
models::Id::from_hex(last).ok()
16+
}
17+
518
#[derive(Serialize, Deserialize)]
619
pub struct ResourceSpecPointers {
720
pub x_collection_name: doc::Pointer,

ops-catalog/data-plane-template.bundle.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
"schema": {"$defs":{"__flowInline1":{"$id":"file:///Users/phil/projects/flow/ops-catalog/logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Logs related to the processing of a Flow capture, derivation, or materialization","properties":{"fields":{"additionalProperties":true,"description":"Map of keys and values that are associated with this log entry.","type":"object"},"level":{"enum":["error","warn","info","debug","trace"]},"message":{"type":"string"},"shard":{"$ref":"shard.schema.yaml"},"ts":{"description":"Timestamp corresponding to the start of the transaction","format":"date-time","type":"string"}},"required":["shard","ts","level"],"title":"Flow task logs","type":"object"},"__flowInline2":{"$id":"file:///Users/phil/projects/flow/ops-catalog/shard.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Identifies a specific shard of a task, which may be the source of a log message or metrics","properties":{"build":{"description":"The id of the build that this shard was running at the time the log was written","pattern":"[0-9a-f]{16}","type":"string"},"keyBegin":{"description":"The inclusive beginning of the shard's assigned key range","pattern":"[0-9a-f]{8}","type":"string"},"kind":{"description":"The type of the catalog task","enum":["capture","derivation","materialization"]},"name":{"description":"The name of the catalog task (without the task type prefix)","type":"string"},"rClockBegin":{"description":"The inclusive beginning of the shard's assigned rClock range","pattern":"[0-9a-f]{8}","type":"string"}},"required":["kind","name","keyBegin","rClockBegin"],"title":"Flow shard id","type":"object"}},"$id":"file:///Users/phil/projects/flow/ops-catalog/events.schema.yaml","$ref":"logs.schema.yaml","$schema":"https://json-schema.org/draft-07/schema","description":"Events are special logs that are intended to be consumed by the control plane","properties":{"fields":{"additionalProperties":true,"properties":{"error":{"description":"If the event represents an error, this field contains the error message.\n","type":"string"},"eventTarget":{"description":"The target of the event is a catalog name that the event pertains to.\n","type":"string"},"eventType":{"description":"Identifies this log message as an event of the given type. Events\nare special logs that are meant to be observed by the Flow control plane.\n","type":"string"}},"required":["eventType","eventTarget"]},"shard":{"description":"The source of the event, which may differ from the eventTarget"}},"required":["fields"],"title":"Flow events"},
4747
"key": [
4848
"/fields/eventTarget",
49-
"/eventType"
49+
"/fields/eventType"
5050
],
5151
"projections": {
5252
"event_type": {
@@ -105,7 +105,7 @@
105105
"/shard/name"
106106
]
107107
},
108-
"lambda": "select\n $fields->>'collection_name' as collection_name,\n $fields->'schema' as schema\nwhere $message = 'inferred schema updated';\n"
108+
"lambda": "select\n $fields->>'collection_name' as collection_name,\n coalesce($fields->>'collection_generation_id', '0000000000000000') as collection_generation_id,\n $fields->'schema' as schema\nwhere $message = 'inferred schema updated';\n"
109109
}
110110
],
111111
"shards": {

0 commit comments

Comments
 (0)