Skip to content

Commit

Permalink
Merge pull request #476 from PeggyJV/bolten/orch-cosmos-refactor-alte…
Browse files Browse the repository at this point in the history
…rnative

Prevent cosmos messaging getting stuck
  • Loading branch information
EricBolten authored Nov 11, 2022
2 parents a5c7b97 + ee6cd29 commit 4a107a8
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 22 deletions.
2 changes: 1 addition & 1 deletion integration_tests/setup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (s *IntegrationTestSuite) initGenesis() {
s.Require().NoError(cdc.UnmarshalJSON(appGenState[gravitytypes.ModuleName], &gravityGenState))
gravityGenState.Params.GravityId = "gravitytest"
gravityGenState.Params.BridgeEthereumAddress = gravityContract.String()
gravityGenState.Params.SignedBatchesWindow = 15
gravityGenState.Params.SignedBatchesWindow = 35

bz, err = cdc.MarshalJSON(&gravityGenState)
s.Require().NoError(err)
Expand Down
21 changes: 15 additions & 6 deletions integration_tests/validator_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import (
// Start the chain with validators
func (s *IntegrationTestSuite) TestValidatorOut() {
s.Run("Bring up chain, and test the valset update", func() {
s.dockerPool.RemoveContainerByName("orchestrator3")

s.T().Logf("approving Gravity to spend ERC 20")
err := s.approveERC20()
s.Require().NoError(err, "error approving spending balance for the gravity contract")
Expand Down Expand Up @@ -83,6 +81,9 @@ func (s *IntegrationTestSuite) TestValidatorOut() {
sdk.Coin{Denom: gravityDenom, Amount: sdk.NewInt(1)},
)

s.dockerPool.RemoveContainerByName("orchestrator3")
s.dockerPool.RemoveContainerByName("orchestrator2")

// Send NewMsgSendToEthereum Message
s.Require().Eventuallyf(func() bool {
val := s.chain.validators[1]
Expand Down Expand Up @@ -149,6 +150,7 @@ func (s *IntegrationTestSuite) TestValidatorOut() {

// Check jail status of validators
s.Require().Eventuallyf(func() bool {
observed_jailing := true
orchKey := s.chain.validators[3]
keyring, err := orchKey.keyring()
s.Require().NoError(err)
Expand All @@ -161,14 +163,20 @@ func (s *IntegrationTestSuite) TestValidatorOut() {
s.T().Logf("error: %s", err)
return false
}
s.Require().True(valThree.GetValidator().IsJailed())
if !valThree.GetValidator().IsJailed() {
observed_jailing = false
s.T().Logf("validator 3 not jailed yet")
}

valTwo, err := newQ.Validator(context.Background(), &stakingtypes.QueryValidatorRequest{ValidatorAddr: sdk.ValAddress(s.chain.validators[2].keyInfo.GetAddress()).String()})
if err != nil {
s.T().Logf("error: %s", err)
return false
}
s.Require().False(valTwo.GetValidator().IsJailed())
if !valTwo.GetValidator().IsJailed() {
observed_jailing = false
s.T().Logf("validator 2 not jailed yet")
}

valOne, err := newQ.Validator(context.Background(), &stakingtypes.QueryValidatorRequest{ValidatorAddr: sdk.ValAddress(s.chain.validators[1].keyInfo.GetAddress()).String()})
if err != nil {
Expand All @@ -183,7 +191,8 @@ func (s *IntegrationTestSuite) TestValidatorOut() {
return false
}
s.Require().False(valZero.GetValidator().IsJailed())
return true
}, 5*time.Minute, 1*time.Minute, "can't find slashing info")

return observed_jailing
}, 10*time.Minute, 1*time.Minute, "can't confirm jailing status")
})
}
2 changes: 1 addition & 1 deletion module/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func NewGravityApp(
stakingtypes.NewMultiStakingHooks(
app.distrKeeper.Hooks(),
app.slashingKeeper.Hooks(),
app.gravityKeeper.Hooks(),
//app.gravityKeeper.Hooks(), TODO(bolten): this hook is broken, do not set it, to be fixed
),
)

Expand Down
4 changes: 2 additions & 2 deletions orchestrator/cosmos_gravity/src/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ pub async fn batch_tx_confirmation_messages(
ethereum_signer: format_eth_address(ethereum_address),
signature: signature.into(),
};
let msg = proto::MsgSubmitEthereumEvent {
let msg = proto::MsgSubmitEthereumTxConfirmation {
signer: cosmos_address.to_string(),
event: confirmation.to_any(),
confirmation: confirmation.to_any(),
};
let msg = Msg::new("/gravity.v1.MsgSubmitEthereumTxConfirmation", msg);
msgs.push(msg);
Expand Down
49 changes: 37 additions & 12 deletions orchestrator/cosmos_gravity/src/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub async fn send_main_loop(
) {
while let Some(messages) = rx.recv().await {
for msg_chunk in messages.chunks(msg_batch_size) {
let batch = msg_chunk.to_vec();
match send_messages(
contact,
cosmos_key,
Expand All @@ -184,21 +185,45 @@ pub async fn send_main_loop(
)
.await
{
Ok(res) => trace!("okay: {:?}", res),
Ok(res) => debug!("message batch sent: {:?}", res),
Err(err) => {
let msg_types = msg_chunk
.iter()
.map(|msg| prost_types::Any::from(msg.clone()).type_url)
.collect::<HashSet<String>>();

error!(
"Error during gRPC call to Cosmos containing {} messages of types {:?}: {:?}",
msg_chunk.len(),
msg_types,
err
);
log_send_error(&batch, err);

// multiple messages in a single Cosmos transaction will be rejected
// atomically if that transaction cannot be delivered, so retry each
// element separately
info!("Trying each message in batch individually");
for msg in batch {
let msg_vec = vec![msg];
match send_messages(
contact,
cosmos_key,
gas_price.to_owned(),
msg_vec.clone(),
gas_adjustment,
)
.await
{
Ok(res) => debug!("message sent: {:?}", res),
Err(err) => log_send_error(&msg_vec, err),
}
}
}
}
}
}
}

fn log_send_error(messages: &Vec<Msg>, err: GravityError) {
let msg_types = messages
.iter()
.map(|msg| prost_types::Any::from(msg.clone()).type_url)
.collect::<HashSet<String>>();

error!(
"Error during gRPC call to Cosmos containing {} messages of types {:?}: {:?}",
messages.len(),
msg_types,
err
);
}

0 comments on commit 4a107a8

Please sign in to comment.