Skip to content

Commit

Permalink
Coordinator Cleanup (#481)
Browse files Browse the repository at this point in the history
* Move logic for evaluating if a cosign should occur to its own file

Cleans it up and makes it more robust.

* Have expected_next_batch return an error instead of retrying

While convenient to offer an error-free implementation, it potentially caused
very long lived lock acquisitions in handle_processor_message.

* Unify and clean DkgConfirmer and DkgRemoval

Does so via adding a new file for the common code, SigningProtocol.

Modifies from_cache to return the preprocess with the machine, as there's no
reason not to. Also removes an unused Result around the type.

Clarifies the security around deterministic nonces, removing them for
saved-to-disk cached preprocesses. The cached preprocesses are encrypted as the
DB is not a proper secret store.

Moves arguments always present in the protocol from function arguments into the
struct itself.

Removes the horribly ugly code in DkgRemoval, fixing multiple issues present
with it which would cause it to fail on use.

* Set SeraiBlockNumber in cosign.rs as it's used by the cosigning protocol

* Remove unnecessary Clone from lambdas in coordinator

* Remove the EventDb from Tributary scanner

We used per-Transaction DB TXNs so on error, we don't have to rescan the entire
block yet only the rest of it. We prevented scanning multiple transactions by
tracking which we already had.

This is over-engineered and not worth it.

* Implement borsh for HasEvents, removing the manual encoding

* Merge DkgConfirmer and DkgRemoval into signing_protocol.rs

Fixes a bug in DkgConfirmer which would cause it to improperly handle indexes
if any validator had multiple key shares.

* Strictly type DataSpecification's Label

* Correct threshold_i_map_to_keys_and_musig_i_map

It didn't include the participant's own index and accordingly was offset.

* Create TributaryBlockHandler

This struct contains all variables prior passed to handle_block and stops them
from being passed around again and again.

This also ensures fatal_slash is only called while handling a block, as needed
as it expects to operate under perfect consensus.

* Inline accumulate, store confirmation nonces with shares

Inlining accumulate makes sense due to the amount of data accumulate needed to
be passed.

Storing confirmation nonces with shares ensures that both are available or
neither. Prior, one could be yet the other may not have been (requiring an
assert in runtime to ensure we didn't bungle it somehow).

* Create helper functions for handling DkgRemoval/SubstrateSign/Sign Tributary TXs

* Move Label into SignData

All of our transactions which use SignData end up with the same common usage
pattern for Label, justifying this.

Removes 3 transactions, explicitly de-duplicating their handlers.

* Remove CurrentlyCompletingKeyPair for the non-contextual DkgKeyPair

* Remove the manual read/write for TributarySpec for borsh

This struct doesn't have any optimizations booned by the manual impl. Using
borsh reduces our scope.

* Use temporary variables to further minimize LoC in tributary handler

* Remove usage of tuples for non-trivial Tributary transactions

* Remove serde from dkg

serde could be used to deserialize intenrally inconsistent objects which could
lead to panics or faults.

The BorshDeserialize derives have been replaced with a manual implementation
which won't produce inconsistent objects.

* Abstract Future generics using new trait definitions in coordinator

* Move published_signed_transaction to tributary/mod.rs to reduce the size of main.rs

* Split coordinator/src/tributary/mod.rs into spec.rs and transaction.rs
  • Loading branch information
kayabaNerve authored Dec 11, 2023
1 parent 6caf45e commit 11fdb6d
Show file tree
Hide file tree
Showing 34 changed files with 2,487 additions and 2,948 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion coins/bitcoin/src/wallet/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl SignMachine<Transaction> for TransactionSignMachine {
_: (),
_: ThresholdKeys<Secp256k1>,
_: CachedPreprocess,
) -> Result<Self, FrostError> {
) -> (Self, Self::Preprocess) {
unimplemented!(
"Bitcoin transactions don't support caching their preprocesses due to {}",
"being already bound to a specific transaction"
Expand Down
2 changes: 1 addition & 1 deletion coins/monero/src/wallet/send/multisig.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl SignMachine<Transaction> for TransactionSignMachine {
);
}

fn from_cache(_: (), _: ThresholdKeys<Ed25519>, _: CachedPreprocess) -> Result<Self, FrostError> {
fn from_cache(_: (), _: ThresholdKeys<Ed25519>, _: CachedPreprocess) -> (Self, Self::Preprocess) {
unimplemented!(
"Monero transactions don't support caching their preprocesses due to {}",
"being already bound to a specific transaction"
Expand Down
1 change: 1 addition & 0 deletions common/db/src/create_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ macro_rules! create_db {
pub struct $field_name;
impl $field_name {
pub fn key($($arg: $arg_type),*) -> Vec<u8> {
use scale::Encode;
$crate::serai_db_key(
stringify!($db_name).as_bytes(),
stringify!($field_name).as_bytes(),
Expand Down
3 changes: 1 addition & 2 deletions coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ async-trait = { version = "0.1", default-features = false }

zeroize = { version = "^1.5", default-features = false, features = ["std"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
rand_chacha = { version = "0.3", default-features = false, features = ["std"] }

blake2 = { version = "0.10", default-features = false, features = ["std"] }

Expand All @@ -38,7 +37,7 @@ message-queue = { package = "serai-message-queue", path = "../message-queue" }
tributary = { package = "tributary-chain", path = "./tributary" }

sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }
serai-client = { path = "../substrate/client", default-features = false, features = ["serai"] }
serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] }

hex = { version = "0.4", default-features = false, features = ["std"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
Expand Down
23 changes: 4 additions & 19 deletions coordinator/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use blake2::{
};

use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
Expand All @@ -20,7 +21,6 @@ create_db!(
HandledMessageDb: (network: NetworkId) -> u64,
ActiveTributaryDb: () -> Vec<u8>,
RetiredTributaryDb: (set: ValidatorSet) -> (),
SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec<u8>,
FirstPreprocessDb: (
network: NetworkId,
id_type: RecognizedIdType,
Expand All @@ -43,7 +43,7 @@ impl ActiveTributaryDb {

let mut tributaries = vec![];
while !bytes_ref.is_empty() {
tributaries.push(TributarySpec::read(&mut bytes_ref).unwrap());
tributaries.push(TributarySpec::deserialize_reader(&mut bytes_ref).unwrap());
}

(bytes, tributaries)
Expand All @@ -57,7 +57,7 @@ impl ActiveTributaryDb {
}
}

spec.write(&mut existing_bytes).unwrap();
spec.serialize(&mut existing_bytes).unwrap();
ActiveTributaryDb::set(txn, &existing_bytes);
}

Expand All @@ -72,28 +72,13 @@ impl ActiveTributaryDb {

let mut bytes = vec![];
for active in active {
active.write(&mut bytes).unwrap();
active.serialize(&mut bytes).unwrap();
}
Self::set(txn, &bytes);
RetiredTributaryDb::set(txn, set, &());
}
}

impl SignedTransactionDb {
pub fn take_signed_transaction(
txn: &mut impl DbTxn,
order: &[u8],
nonce: u32,
) -> Option<Transaction> {
let res = SignedTransactionDb::get(txn, order, nonce)
.map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
if res.is_some() {
Self::del(txn, order, nonce);
}
res
}
}

impl FirstPreprocessDb {
pub fn save_first_preprocess(
txn: &mut impl DbTxn,
Expand Down
108 changes: 43 additions & 65 deletions coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ use tokio::{
time::sleep,
};

use ::tributary::{
ProvidedError, TransactionKind, TransactionError, TransactionTrait, Block, Tributary,
};
use ::tributary::{ProvidedError, TransactionKind, TransactionTrait, Block, Tributary};

mod tributary;
use crate::tributary::{TributarySpec, SignData, Transaction, scanner::RecognizedIdType, PlanIds};
use crate::tributary::{
TributarySpec, Label, SignData, Transaction, scanner::RecognizedIdType, PlanIds,
};

mod db;
use db::*;
Expand Down Expand Up @@ -126,48 +126,6 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
.unwrap();
}

async fn publish_signed_transaction<D: Db, P: P2p>(
txn: &mut D::Transaction<'_>,
tributary: &Tributary<D, Transaction, P>,
tx: Transaction,
) {
log::debug!("publishing transaction {}", hex::encode(tx.hash()));

let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() {
let signer = signed.signer;

// Safe as we should deterministically create transactions, meaning if this is already on-disk,
// it's what we're saving now
SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize());

(order, signer)
} else {
panic!("non-signed transaction passed to publish_signed_transaction");
};

// If we're trying to publish 5, when the last transaction published was 3, this will delay
// publication until the point in time we publish 4
while let Some(tx) = SignedTransactionDb::take_signed_transaction(
txn,
&order,
tributary
.next_nonce(&signer, &order)
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// We need to return a proper error here to enable that, due to a race condition around
// multiple publications
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces
Err(TransactionError::InvalidNonce) => {
log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?")
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
}

// TODO: Find a better pattern for this
static HANDOVER_VERIFY_QUEUE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();

Expand Down Expand Up @@ -317,7 +275,9 @@ async fn handle_processor_message<D: Db, P: P2p>(
BatchDb::set(&mut txn, batch.batch.network, batch.batch.id, &batch.clone());

// Get the next-to-execute batch ID
let mut next = substrate::get_expected_next_batch(serai, network).await;
let Ok(mut next) = substrate::expected_next_batch(serai, network).await else {
return false;
};

// Since we have a new batch, publish all batches yet to be published to Serai
// This handles the edge-case where batch n+1 is signed before batch n is
Expand All @@ -329,7 +289,10 @@ async fn handle_processor_message<D: Db, P: P2p>(

while let Some(batch) = batches.pop_front() {
// If this Batch should no longer be published, continue
if substrate::get_expected_next_batch(serai, network).await > batch.batch.id {
let Ok(expected_next_batch) = substrate::expected_next_batch(serai, network).await else {
return false;
};
if expected_next_batch > batch.batch.id {
continue;
}

Expand Down Expand Up @@ -398,7 +361,11 @@ async fn handle_processor_message<D: Db, P: P2p>(
let txs = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => {
vec![Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())]
vec![Transaction::DkgCommitments {
attempt: id.attempt,
commitments,
signed: Transaction::empty_signed(),
}]
}
key_gen::ProcessorMessage::InvalidCommitments { id: _, faulty } => {
// This doesn't need the ID since it's a Provided transaction which everyone will provide
Expand All @@ -411,7 +378,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
}
key_gen::ProcessorMessage::Shares { id, mut shares } => {
// Create a MuSig-based machine to inform Substrate of this key generation
let nonces = crate::tributary::dkg_confirmation_nonces(key, spec, id.attempt);
let nonces = crate::tributary::dkg_confirmation_nonces(key, spec, &mut txn, id.attempt);

let our_i = spec
.i(pub_key)
Expand Down Expand Up @@ -449,7 +416,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
// As for the safety of calling error_generating_key_pair, the processor is presumed
// to only send InvalidShare or GeneratedKeyPair for a given attempt
let mut txs = if let Some(faulty) =
crate::tributary::error_generating_key_pair::<_>(&txn, key, spec, id.attempt)
crate::tributary::error_generating_key_pair(&mut txn, key, spec, id.attempt)
{
vec![Transaction::RemoveParticipant(faulty)]
} else {
Expand Down Expand Up @@ -480,7 +447,11 @@ async fn handle_processor_message<D: Db, P: P2p>(

match share {
Ok(share) => {
vec![Transaction::DkgConfirmed(id.attempt, share, Transaction::empty_signed())]
vec![Transaction::DkgConfirmed {
attempt: id.attempt,
confirmation_share: share,
signed: Transaction::empty_signed(),
}]
}
Err(p) => {
vec![Transaction::RemoveParticipant(p)]
Expand Down Expand Up @@ -511,18 +482,20 @@ async fn handle_processor_message<D: Db, P: P2p>(

vec![]
} else {
vec![Transaction::SignPreprocess(SignData {
vec![Transaction::Sign(SignData {
plan: id.id,
attempt: id.attempt,
label: Label::Preprocess,
data: preprocesses,
signed: Transaction::empty_signed(),
})]
}
}
sign::ProcessorMessage::Share { id, shares } => {
vec![Transaction::SignShare(SignData {
vec![Transaction::Sign(SignData {
plan: id.id,
attempt: id.attempt,
label: Label::Share,
data: shares,
signed: Transaction::empty_signed(),
})]
Expand Down Expand Up @@ -555,9 +528,10 @@ async fn handle_processor_message<D: Db, P: P2p>(
vec![]
}
coordinator::ProcessorMessage::CosignPreprocess { id, preprocesses } => {
vec![Transaction::SubstratePreprocess(SignData {
vec![Transaction::SubstrateSign(SignData {
plan: id.id,
attempt: id.attempt,
label: Label::Preprocess,
data: preprocesses.into_iter().map(Into::into).collect(),
signed: Transaction::empty_signed(),
})]
Expand Down Expand Up @@ -586,13 +560,13 @@ async fn handle_processor_message<D: Db, P: P2p>(
preprocesses.into_iter().map(Into::into).collect(),
);

let intended = Transaction::Batch(
block.0,
match id.id {
let intended = Transaction::Batch {
block: block.0,
batch: match id.id {
SubstrateSignableId::Batch(id) => id,
_ => panic!("BatchPreprocess did not contain Batch ID"),
},
);
};

// If this is the new key's first Batch, only create this TX once we verify all
// all prior published `Batch`s
Expand Down Expand Up @@ -649,18 +623,20 @@ async fn handle_processor_message<D: Db, P: P2p>(
res
}
} else {
vec![Transaction::SubstratePreprocess(SignData {
vec![Transaction::SubstrateSign(SignData {
plan: id.id,
attempt: id.attempt,
label: Label::Preprocess,
data: preprocesses.into_iter().map(Into::into).collect(),
signed: Transaction::empty_signed(),
})]
}
}
coordinator::ProcessorMessage::SubstrateShare { id, shares } => {
vec![Transaction::SubstrateShare(SignData {
vec![Transaction::SubstrateSign(SignData {
plan: id.id,
attempt: id.attempt,
label: Label::Share,
data: shares.into_iter().map(|share| share.to_vec()).collect(),
signed: Transaction::empty_signed(),
})]
Expand Down Expand Up @@ -706,7 +682,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
}
TransactionKind::Signed(_, _) => {
tx.sign(&mut OsRng, genesis, key);
publish_signed_transaction(&mut txn, tributary, tx).await;
tributary::publish_signed_transaction(&mut txn, tributary, tx).await;
}
}
}
Expand Down Expand Up @@ -1079,16 +1055,18 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
};

let mut tx = match id_type {
RecognizedIdType::Batch => Transaction::SubstratePreprocess(SignData {
RecognizedIdType::Batch => Transaction::SubstrateSign(SignData {
data: get_preprocess(&raw_db, id_type, &id).await,
plan: SubstrateSignableId::Batch(id.as_slice().try_into().unwrap()),
label: Label::Preprocess,
attempt: 0,
signed: Transaction::empty_signed(),
}),

RecognizedIdType::Plan => Transaction::SignPreprocess(SignData {
RecognizedIdType::Plan => Transaction::Sign(SignData {
data: get_preprocess(&raw_db, id_type, &id).await,
plan: id.try_into().unwrap(),
label: Label::Preprocess,
attempt: 0,
signed: Transaction::empty_signed(),
}),
Expand Down Expand Up @@ -1119,7 +1097,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// TODO: Should this not take a txn accordingly? It's best practice to take a txn, yet
// taking a txn fails to declare its achieved independence
let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await;
tributary::publish_signed_transaction(&mut txn, tributary, tx).await;
txn.commit();
break;
}
Expand Down
Loading

0 comments on commit 11fdb6d

Please sign in to comment.