Skip to content

Commit 8e60b80

Browse files
committed
adapter: in sequencer(sink), factor out retire, make sure we always call it
1 parent 62077b0 commit 8e60b80

File tree

1 file changed

+19
-18
lines changed

1 file changed

+19
-18
lines changed

src/adapter/src/coord/sequencer/inner.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1092,25 +1092,26 @@ impl Coordinator {
10921092

10931093
let result = self.catalog_transact(Some(ctx.session()), ops).await;
10941094

1095-
match result {
1096-
Ok(()) => {
1097-
ctx.retire(Ok(ExecuteResponse::CreatedSink));
1098-
}
1099-
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1100-
kind:
1101-
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1102-
})) if if_not_exists => {
1103-
ctx.session()
1104-
.add_notice(AdapterNotice::ObjectAlreadyExists {
1105-
name: name.item,
1106-
ty: "sink",
1107-
});
1108-
ctx.retire(Ok(ExecuteResponse::CreatedSink));
1109-
}
1110-
Err(e) => {
1111-
ctx.retire(Err(e));
1095+
// Make sure we can't early-return and always retire the context.
1096+
let infallible = |result| -> Result<_, _> {
1097+
match result {
1098+
Ok(()) => Ok(ExecuteResponse::CreatedSink),
1099+
Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
1100+
kind:
1101+
mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
1102+
})) if if_not_exists => {
1103+
ctx.session()
1104+
.add_notice(AdapterNotice::ObjectAlreadyExists {
1105+
name: name.item,
1106+
ty: "sink",
1107+
});
1108+
Ok(ExecuteResponse::CreatedSink)
1109+
}
1110+
Err(e) => Err(e),
11121111
}
1113-
}
1112+
};
1113+
let result = infallible(result);
1114+
ctx.retire(result);
11141115
}
11151116

11161117
/// Validates that a view definition does not contain any expressions that may lead to

0 commit comments

Comments
 (0)