Skip to content

Commit

Permalink
Decentralize execution relayer (#1268)
Browse files Browse the repository at this point in the history
* Decentralize execution relay

* Decentralize execution relayer

* Fix launch config
  • Loading branch information
yrong authored Aug 19, 2024
1 parent 6cbf2a8 commit 41b9cfc
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 59 deletions.
32 changes: 29 additions & 3 deletions relayer/relays/execution/config.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,37 @@
package execution

import (
"errors"
"fmt"

"github.com/snowfork/snowbridge/relayer/config"
beaconconf "github.com/snowfork/snowbridge/relayer/relays/beacon/config"
)

type Config struct {
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
InstantVerification bool `mapstructure:"instantVerification"`
Source SourceConfig `mapstructure:"source"`
Sink SinkConfig `mapstructure:"sink"`
InstantVerification bool `mapstructure:"instantVerification"`
Schedule ScheduleConfig `mapstructure:"schedule"`
}

type ScheduleConfig struct {
// ID of current relayer, starting from 0
ID uint64 `mapstructure:"id"`
// Number of total count of all relayers
TotalRelayerCount uint64 `mapstructure:"totalRelayerCount"`
// Sleep interval(in seconds) to check if message(nonce) has already been relayed
SleepInterval uint64 `mapstructure:"sleepInterval"`
}

func (r ScheduleConfig) Validate() error {
if r.TotalRelayerCount < 1 {
return errors.New("Number of relayer is not set")
}
if r.ID >= r.TotalRelayerCount {
return errors.New("ID of the Number of relayer is not set")
}
return nil
}

type SourceConfig struct {
Expand Down Expand Up @@ -44,5 +66,9 @@ func (c Config) Validate() error {
if c.Source.Contracts.Gateway == "" {
return fmt.Errorf("source setting [gateway] is not set")
}
err = c.Schedule.Validate()
if err != nil {
return fmt.Errorf("schedule config: %w", err)
}
return nil
}
133 changes: 87 additions & 46 deletions relayer/relays/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Relay struct {
gatewayContract *contracts.Gateway
beaconHeader *header.Header
writer *parachain.ParachainWriter
headerCache *ethereum.HeaderCache
}

func NewRelay(
Expand Down Expand Up @@ -77,6 +78,7 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
if err != nil {
return err
}
r.headerCache = headerCache

address := common.HexToAddress(r.config.Source.Contracts.Gateway)
contract, err := contracts.NewGateway(address, ethconn.Client())
Expand All @@ -101,11 +103,13 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
)
r.beaconHeader = &beaconHeader

log.Info("Current relay's ID:", r.config.Schedule.ID)

for {
select {
case <-ctx.Done():
return nil
case <-time.After(6 * time.Second):
case <-time.After(60 * time.Second):
log.WithFields(log.Fields{
"channelId": r.config.Source.ChannelID,
}).Info("Polling Nonces")
Expand Down Expand Up @@ -142,53 +146,10 @@ func (r *Relay) Start(ctx context.Context, eg *errgroup.Group) error {
}

for _, ev := range events {
inboundMsg, err := r.makeInboundMessage(ctx, headerCache, ev)
if err != nil {
return fmt.Errorf("make outgoing message: %w", err)
}
logger := log.WithFields(log.Fields{
"paraNonce": paraNonce,
"ethNonce": ethNonce,
"msgNonce": ev.Nonce,
"address": ev.Raw.Address.Hex(),
"blockHash": ev.Raw.BlockHash.Hex(),
"blockNumber": ev.Raw.BlockNumber,
"txHash": ev.Raw.TxHash.Hex(),
"txIndex": ev.Raw.TxIndex,
"channelID": types.H256(ev.ChannelID).Hex(),
})

if ev.Nonce <= paraNonce {
logger.Warn("inbound message outdated, just skipped")
continue
}
nextBlockNumber := new(big.Int).SetUint64(ev.Raw.BlockNumber + 1)

blockHeader, err := ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
proof, err := beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
logger.Warn("beacon header not finalized, just skipped")
continue
}
err = r.waitAndSend(ctx, ev)
if err != nil {
return fmt.Errorf("fetch execution header proof: %w", err)
}

err = r.writeToParachain(ctx, proof, inboundMsg)
if err != nil {
return fmt.Errorf("write to parachain: %w", err)
}

paraNonce, _ = r.fetchLatestParachainNonce()
if paraNonce != ev.Nonce {
return fmt.Errorf("inbound message fail to execute")
return fmt.Errorf("submit message: %w", err)
}
logger.Info("inbound message executed successfully")
}
}
}
Expand Down Expand Up @@ -388,3 +349,83 @@ func (r *Relay) makeInboundMessage(

return msg, nil
}

func (r *Relay) waitAndSend(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) (err error) {
var paraNonce uint64
ethNonce := ev.Nonce
waitingPeriod := (ethNonce + r.config.Schedule.TotalRelayerCount - r.config.Schedule.ID) % r.config.Schedule.TotalRelayerCount

var cnt uint64
for {
paraNonce, err = r.fetchLatestParachainNonce()
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
}
if ethNonce <= paraNonce {
log.Info(fmt.Sprintf("nonce %d picked up by another relayer, just skip", paraNonce))
return nil
}
if cnt == waitingPeriod {
break
}
time.Sleep(time.Duration(r.config.Schedule.SleepInterval) * time.Second)
cnt++
}
err = r.doSubmit(ctx, ev)
if err != nil {
return fmt.Errorf("submit inbound message: %w", err)
}

return nil
}

func (r *Relay) doSubmit(ctx context.Context, ev *contracts.GatewayOutboundMessageAccepted) error {
inboundMsg, err := r.makeInboundMessage(ctx, r.headerCache, ev)
if err != nil {
return fmt.Errorf("make outgoing message: %w", err)
}

logger := log.WithFields(log.Fields{
"ethNonce": ev.Nonce,
"msgNonce": ev.Nonce,
"address": ev.Raw.Address.Hex(),
"blockHash": ev.Raw.BlockHash.Hex(),
"blockNumber": ev.Raw.BlockNumber,
"txHash": ev.Raw.TxHash.Hex(),
"txIndex": ev.Raw.TxIndex,
"channelID": types.H256(ev.ChannelID).Hex(),
})

nextBlockNumber := new(big.Int).SetUint64(ev.Raw.BlockNumber + 1)

blockHeader, err := r.ethconn.Client().HeaderByNumber(ctx, nextBlockNumber)
if err != nil {
return fmt.Errorf("get block header: %w", err)
}

// ParentBeaconRoot in https://eips.ethereum.org/EIPS/eip-4788 from Deneb onward
proof, err := r.beaconHeader.FetchExecutionProof(*blockHeader.ParentBeaconRoot, r.config.InstantVerification)
if errors.Is(err, header.ErrBeaconHeaderNotFinalized) {
logger.Warn("beacon header not finalized, just skipped")
return nil
}
if err != nil {
return fmt.Errorf("fetch execution header proof: %w", err)
}

err = r.writeToParachain(ctx, proof, inboundMsg)
if err != nil {
return fmt.Errorf("write to parachain: %w", err)
}

paraNonce, err := r.fetchLatestParachainNonce()
if err != nil {
return fmt.Errorf("fetch latest parachain nonce: %w", err)
}
if paraNonce != ev.Nonce {
return fmt.Errorf("inbound message fail to execute")
}
logger.Info("inbound message executed successfully")

return nil
}
2 changes: 1 addition & 1 deletion web/packages/test/config/beacon-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@
"endpoint": "ws://127.0.0.1:11144",
"maxWatchedExtrinsics": 8
},
"updateSlotInterval": 316
"updateSlotInterval": 30
}
}
7 changes: 6 additions & 1 deletion web/packages/test/config/execution-relay.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,10 @@
"maxWatchedExtrinsics": 8
}
},
"instantVerification": true
"instantVerification": false,
"schedule": {
"id": null,
"totalRelayerCount": 3,
"sleepInterval": 20
}
}
2 changes: 1 addition & 1 deletion web/packages/test/config/launch-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ cumulus_based = true
## Penpal
[[parachains]]
id = 2000
chain = "penpal-rococo-2000"
chain = "penpal-westend-2000"
cumulus_based = true

[[parachains.collators]]
Expand Down
67 changes: 60 additions & 7 deletions web/packages/test/scripts/start-relayer.sh
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ config_relayer() {
' \
config/beacon-relay.json >$output_dir/beacon-relay.json

# Configure execution relay for assethub
# Configure execution relay for assethub-0
jq \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg k1 "$(address_for GatewayProxy)" \
Expand All @@ -156,8 +156,35 @@ config_relayer() {
.source.ethereum.endpoint = $eth_endpoint_ws
| .source.contracts.Gateway = $k1
| .source."channel-id" = $channelID
| .schedule.id = 0
' \
config/execution-relay.json >$output_dir/execution-relay-asset-hub.json
config/execution-relay.json >$output_dir/execution-relay-asset-hub-0.json

# Configure execution relay for assethub-1
jq \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg k1 "$(address_for GatewayProxy)" \
--arg channelID $ASSET_HUB_CHANNEL_ID \
'
.source.ethereum.endpoint = $eth_endpoint_ws
| .source.contracts.Gateway = $k1
| .source."channel-id" = $channelID
| .schedule.id = 1
' \
config/execution-relay.json >$output_dir/execution-relay-asset-hub-1.json

# Configure execution relay for assethub-2
jq \
--arg eth_endpoint_ws $eth_endpoint_ws \
--arg k1 "$(address_for GatewayProxy)" \
--arg channelID $ASSET_HUB_CHANNEL_ID \
'
.source.ethereum.endpoint = $eth_endpoint_ws
| .source.contracts.Gateway = $k1
| .source."channel-id" = $channelID
| .schedule.id = 2
' \
config/execution-relay.json >$output_dir/execution-relay-asset-hub-2.json

# Configure execution relay for penpal
jq \
Expand Down Expand Up @@ -278,15 +305,41 @@ start_relayer() {
done
) &

# Launch execution relay for assethub
# Launch execution relay for assethub-0
(
: >$output_dir/execution-relay-asset-hub.log
: >$output_dir/execution-relay-asset-hub-0.log
while :; do
echo "Starting execution relay (asset-hub) at $(date)"
echo "Starting execution relay (asset-hub-0) at $(date)"
"${relay_bin}" run execution \
--config $output_dir/execution-relay-asset-hub.json \
--config $output_dir/execution-relay-asset-hub-0.json \
--substrate.private-key "//ExecutionRelayAssetHub" \
>>"$output_dir"/execution-relay-asset-hub.log 2>&1 || true
>>"$output_dir"/execution-relay-asset-hub-0.log 2>&1 || true
sleep 20
done
) &

# Launch execution relay for assethub-1
(
: >$output_dir/execution-relay-asset-hub-1.log
while :; do
echo "Starting execution relay (asset-hub-1) at $(date)"
"${relay_bin}" run execution \
--config $output_dir/execution-relay-asset-hub-1.json \
--substrate.private-key "//Alice" \
>>"$output_dir"/execution-relay-asset-hub-1.log 2>&1 || true
sleep 20
done
) &

# Launch execution relay for assethub-2
(
: >$output_dir/execution-relay-asset-hub-2.log
while :; do
echo "Starting execution relay (asset-hub-2) at $(date)"
"${relay_bin}" run execution \
--config $output_dir/execution-relay-asset-hub-2.json \
--substrate.private-key "//Bob" \
>>"$output_dir"/execution-relay-asset-hub-2.log 2>&1 || true
sleep 20
done
) &
Expand Down

0 comments on commit 41b9cfc

Please sign in to comment.