Skip to content

Commit

Permalink
Implement Helper async aggregation.
Browse files Browse the repository at this point in the history
Implementation strategy: two new report aggregation states,
HelperInitProcessing & HelperContinueProcessing, are introduced. These
states represent a report aggregation which has received an
initialization/continuation message from the Leader, but has not yet
processed that message (and, in the case of continuation, the existing
state) into the next HelperInit/HelperContinue state.

The aggregation initialization/continuation endpoints now always
construct these new Processing states as part of initial request
handling. In the synchronous aggregation case, the Processing states are
processed into HelperInit/HelperContinue states before being written
back to durable storage. In the asynchronous aggregation case, the
Processing states are written back to storage immediately, and are later
processed by the aggregation job driver into the next
HelperInit/HelperContinue state.

An upshot of this is that a Helper deployment performing asynchronous
aggregation MUST now run an aggregation job driver.

The implementation is complete as far as the protocol is concerned.
However, there is no logic yet to implement setting the Retry-After
header, which is spec-optional but likely important to a production
deployment. To facilitate testing, a new aggregation job driver
parameter is introduced which controls how quickly the Leader should
poll for outstanding asynchronous aggregaiton jobs if the Retry-After
header is not set.
  • Loading branch information
branlwyd committed Jan 31, 2025
1 parent 706911f commit 6caeacf
Show file tree
Hide file tree
Showing 56 changed files with 7,044 additions and 3,041 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aggregator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ test-util = [
"janus_aggregator_core/test-util",
"janus_core/test-util",
"janus_messages/test-util",
"dep:assert_matches",
"dep:testcontainers",
"dep:trillium-testing",
]

[dependencies]
anyhow.workspace = true
assert_matches = { workspace = true, optional = true }
assert_matches = { workspace = true }
async-trait = { workspace = true }
aws-lc-rs = { workspace = true }
backoff = { workspace = true, features = ["tokio"] }
Expand Down Expand Up @@ -70,6 +69,7 @@ postgres-protocol = { workspace = true }
postgres-types = { workspace = true, features = ["derive", "array-impls"] }
prio.workspace = true
prometheus = { workspace = true, optional = true }
querystring = { workspace = true }
rand = { workspace = true, features = ["min_const_gen"] }
rayon.workspace = true
regex = { workspace = true }
Expand Down
Loading

0 comments on commit 6caeacf

Please sign in to comment.