Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/val 1404 eip 7251 head watcher alerts for new el requests #58

Draft
wants to merge 19 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ KEYS_API_URI=URL_TO_KEYS_API
LIDO_LOCATOR_ADDRESS=ETHEREUM_ADDRESS
EXECUTION_CLIENT_URI=URL_TO_EL_API

# For option when KEYS_SOURCE is 'keys_file'
# For option when KEYS_SOURCE is 'file'
# CONSENSUS_CLIENT_URI: URL_TO_CL_API
# KEYS_SOURCE: keys_file
# KEYS_SOURCE: file
# KEYS_FILE_PATH: path/to/keys.yml
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,6 @@ dmypy.json

# IDE
.idea/

# Docker
.volumes
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ Currently it supports:
> All exits will be handled as unexpected for specified keys

1. Fill `docker/validators/keys.yml` with your values
2. Set `KEYS_SOURCE=keys_file` in `.env`
2. Set `KEYS_SOURCE=file` in `.env`

> If you want to use another path, specify it in `KEYS_FILE_PATH` env variable

Expand All @@ -42,12 +42,12 @@ Currently it supports:
* **Required:** false
* **Default:** false
---
`KEYS_SOURCE` - Keys source. If `keys_api` - application will fetch keys from Keys API, if `keys_file` - application will fetch keys from `KEYS_FILE_PATH`
`KEYS_SOURCE` - Keys source. If `keys_api` - application will fetch keys from Keys API, if `file` - application will fetch keys from `KEYS_FILE_PATH`
* **Required:** false
* **Default:** keys_api
---
`KEYS_FILE_PATH` - Path to file with keys
* **Required:** if `KEYS_SOURCE` is `keys_file`
* **Required:** if `KEYS_SOURCE` is `file`
* **Default:** ./docker/validators/keys.yml
---
`CONSENSUS_CLIENT_URI` - Ethereum consensus layer comma separated API urls
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ services:
- 9090

alertmanager:
image: prom/alertmanager:latest
image: prom/alertmanager:v0.25.0
container_name: alertmanager
restart: unless-stopped
networks:
Expand Down
42 changes: 42 additions & 0 deletions src/handlers/consolidation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import logging

from unsync import unsync

from src.alerts.common import CommonAlert
from src.handlers.handler import WatcherHandler
from src.handlers.helpers import beaconchain
from src.metrics.prometheus.duration_meter import duration_meter
from src.providers.consensus.typings import FullBlockInfo

logger = logging.getLogger()


class ConsolidationHandler(WatcherHandler):
@unsync
@duration_meter()
def handle(self, watcher, head: FullBlockInfo):
if not head.message.body.execution_requests or not head.message.body.execution_requests.consolidations:
logger.info({"msg": f"No consolidation requests in block [{head.message.slot}]"})
return

slot = head.message.slot
for consolidation in head.message.body.execution_requests.consolidations:
alert, summary = None, ""
if consolidation.source_address in watcher.suspicious_addresses:
alert = CommonAlert(name="HeadWatcherConsolidationSuspiciousSourceAddress", severity="critical")
dputko marked this conversation as resolved.
Show resolved Hide resolved
summary = "🔗🤗 Highly suspicious source address"
dputko marked this conversation as resolved.
Show resolved Hide resolved
elif consolidation.source_pubkey in watcher.user_keys:
alert = CommonAlert(name="HeadWatcherConsolidationSourceLido", severity="moderate")
AlexanderLukin marked this conversation as resolved.
Show resolved Hide resolved
summary = "🔗🤗 Attempt to consolidate Lido validator"
dputko marked this conversation as resolved.
Show resolved Hide resolved
elif consolidation.target_pubkey in watcher.user_keys:
alert = CommonAlert(name="HeadWatcherConsolidationPossibleDonation", severity="moderate")
dputko marked this conversation as resolved.
Show resolved Hide resolved
summary = "🔗🤗 Someone wants to donate to Lido"
dputko marked this conversation as resolved.
Show resolved Hide resolved
# in the future we should check the type of validator WC:
# if it is 0x02 and source_address == WCs of source validator - It's donation!
Comment on lines +34 to +35
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably implement a new alert class for these types of events (donations). It might handle two cases: when someone changes 0x01 validator withdrawal credentials to Lido withdrawal credentials, and when someone changes 0x02 validator withdrawal credentials to Lido withdrawal credentials. And we may have two new types of alerts for these events. Let's discuss it in more detail.


if alert:
description = (f"EL consolidation request source_address='{consolidation.source_address}', "
f"source_pubkey={consolidation.source_pubkey}, "
f"target_pubkey='{consolidation.target_pubkey}'\n"
Comment on lines +38 to +40
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to make alert description more informative. So, if the source_pubkey is a pubkey of Lido validator, print also the operator name to which this validator belongs and wrap this info to beaconcha.in link (if it's not too complex to get the validator_index, or maybe validator pubkey is enough to build a meaningful beaconcha.in link?)

The same logic for the target_pubkey.

Maybe it's better to have separate description templates for each type of alert, but maybe not. I'm not sure about it. Both decisions are probably OK.

f"Slot: {beaconchain(slot)}")
self.send_alert(watcher, alert.build_body(summary, description))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks suboptimal to send separate alerts for each consolidation. If we have N consolidations of the same type in the head block, the N separate alerts of the same type will be sent. I think it would be better to group alerts of the same type, list all necessary info in the alert description, and send only one alert for each type. Something like it's done in the exit.py. Feel free to disagree if you see any disadvantages in this idea.

37 changes: 37 additions & 0 deletions src/handlers/el_triggered_exit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import logging

from unsync import unsync

from src.alerts.common import CommonAlert
from src.handlers.handler import WatcherHandler
from src.handlers.helpers import beaconchain
from src.metrics.prometheus.duration_meter import duration_meter
from src.providers.consensus.typings import FullBlockInfo

logger = logging.getLogger()


class ElTriggeredExitHandler(WatcherHandler):
@unsync
@duration_meter()
def handle(self, watcher, head: FullBlockInfo):
if not head.message.body.execution_requests or not head.message.body.execution_requests.withdrawals:
logger.debug({"msg": f"No withdrawals requests in block [{head.message.slot}]"})
return

slot = head.message.slot
for withdrawal in head.message.body.execution_requests.withdrawals:
alert, summary = None, ""
if withdrawal.source_address in watcher.suspicious_addresses:
alert = CommonAlert(name="HeadWatcherElWithdrawalVault", severity="critical")
dputko marked this conversation as resolved.
Show resolved Hide resolved
summary = "🔗‍🏃🚪Highly suspicious source address"
dputko marked this conversation as resolved.
Show resolved Hide resolved
elif withdrawal.validator_pubkey in watcher.user_keys:
alert = CommonAlert(name="HeadWatcherElWithdrawalUnexpected", severity="moderate")
dputko marked this conversation as resolved.
Show resolved Hide resolved
summary = "🔗‍🏃🚪Unexpected EL withdrawal request found"
dputko marked this conversation as resolved.
Show resolved Hide resolved

if alert:
description = (f"EL withdrawals request source_address='{withdrawal.source_address}', "
f"validator_pubkey={withdrawal.validator_pubkey}, "
f"amount='{withdrawal.amount}'\n"
f"Slot: {beaconchain(slot)}")
self.send_alert(watcher, alert.build_body(summary, description))
Comment on lines +33 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same thoughts as for the Consolidation alerts. Seems better to print the opeartor name and beaconcha.in links where possible and group alerts of the same type.

8 changes: 3 additions & 5 deletions src/handlers/fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,9 @@

from src.alerts.common import CommonAlert
from src.handlers.handler import WatcherHandler
from src.handlers.helpers import beaconchain
from src.metrics.prometheus.duration_meter import duration_meter
from src.providers.consensus.typings import BlockHeaderResponseData, ChainReorgEvent
from src.variables import NETWORK_NAME

BEACONCHAIN_URL_TEMPLATE = "[{0}](https://{1}.beaconcha.in/slot/{0})"


class ForkHandler(WatcherHandler):
Expand Down Expand Up @@ -43,7 +41,7 @@ def _send_reorg_alert(self, watcher, chain_reorg: ChainReorgEvent):
alert = CommonAlert(name="UnhandledChainReorg", severity="info")
links = "\n".join(
[
BEACONCHAIN_URL_TEMPLATE.format(s, NETWORK_NAME)
beaconchain(s)
for s in range(int(chain_reorg.slot) - int(chain_reorg.depth), int(chain_reorg.slot) + 1)
]
)
Expand All @@ -59,5 +57,5 @@ def _send_unhandled_head_alert(self, watcher, head: BlockHeaderResponseData):
if diff > 0:
additional_msg = f"\nAnd {diff} slot(s) before it"
parent_root = head.header.message.parent_root
description = f"Please, check unhandled slot: {BEACONCHAIN_URL_TEMPLATE.format(parent_root, NETWORK_NAME)}{additional_msg}"
description = f"Please, check unhandled slot: {beaconchain(parent_root)}{additional_msg}"
self.send_alert(watcher, alert.build_body(summary, description))
7 changes: 7 additions & 0 deletions src/handlers/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from src.variables import NETWORK_NAME

BEACONCHAIN_URL_TEMPLATE = "[{0}](https://{1}.beaconcha.in/slot/{0})"


def beaconchain(slot) -> str:
return BEACONCHAIN_URL_TEMPLATE.format(slot, NETWORK_NAME)
4 changes: 4 additions & 0 deletions src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
from web3.middleware import simple_cache_middleware

from src import variables
from src.handlers.el_triggered_exit import ElTriggeredExitHandler
from src.handlers.exit import ExitsHandler
from src.handlers.fork import ForkHandler
from src.handlers.slashing import SlashingHandler
from src.handlers.consolidation import ConsolidationHandler
from src.keys_source.base_source import SourceType
from src.keys_source.file_source import FileSource
from src.keys_source.keys_api_source import KeysApiSource
Expand Down Expand Up @@ -60,6 +62,8 @@ def main():
ForkHandler(),
ExitsHandler(),
# FinalityHandler(), ???
ConsolidationHandler(),
ElTriggeredExitHandler()
]
Watcher(handlers, keys_source, web3).run()

Expand Down
27 changes: 27 additions & 0 deletions src/providers/consensus/typings.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,39 @@ class BlockVoluntaryExit(Nested, FromResponse):
signature: str


@dataclass
class ConsolidationRequest(FromResponse):
source_address: str
source_pubkey: str
target_pubkey: str

@dataclass
class WithdrawalRequest(FromResponse):
source_address: str
validator_pubkey: str
amount: str

@dataclass
class DepositRequest(FromResponse):
pubkey: str
withdrawal_credentials: str
amount: str
signature: str
index: int

@dataclass
class ExecutionRequests(Nested, FromResponse):
deposits: list[DepositRequest]
withdrawals: list[WithdrawalRequest]
consolidations: list[ConsolidationRequest]

@dataclass
class BlockBody(Nested, FromResponse):
execution_payload: BlockExecutionPayload
voluntary_exits: list[BlockVoluntaryExit]
proposer_slashings: list
attester_slashings: list
execution_requests: Optional[ExecutionRequests] = None


@dataclass
Expand Down
13 changes: 12 additions & 1 deletion src/utils/dataclass.py
dputko marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import functools
from dataclasses import dataclass, fields, is_dataclass
from types import GenericAlias
from typing import Callable, Self, Sequence, TypeVar
from typing import Callable, Self, Sequence, TypeVar, get_origin, Union, get_args


class DecodeToDataclassException(Exception):
pass


def try_extract_underlying_type_from_optional(field):
args = get_args(field)
types = [x for x in args if x != type(None)]
if get_origin(field) is Union and type(None) in args and len(types) == 1:
return types[0]
return None


@dataclass
class Nested:
"""
Expand All @@ -31,6 +39,9 @@ def __post_init__(self):
elif is_dataclass(field.type) and not is_dataclass(getattr(self, field.name)):
factory = self.__get_dataclass_factory(field.type)
setattr(self, field.name, factory(**getattr(self, field.name)))
elif getattr(self, field.name) and (underlying := try_extract_underlying_type_from_optional(field.type)):
factory = self.__get_dataclass_factory(underlying)
setattr(self, field.name, factory(**getattr(self, field.name)))

@staticmethod
def __get_dataclass_factory(field_type):
Expand Down
2 changes: 2 additions & 0 deletions src/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

LIDO_LOCATOR_ADDRESS = os.getenv('LIDO_LOCATOR_ADDRESS', '')

SUSPICIOUS_ADDRESSES = os.getenv('SUSPICIOUS_ADDRESSES', '').split(',')

dputko marked this conversation as resolved.
Show resolved Hide resolved
# - Metrics -
PROMETHEUS_PORT = int(os.getenv('PROMETHEUS_PORT', 9000))
PROMETHEUS_PREFIX = os.getenv("PROMETHEUS_PREFIX", "ethereum_head_watcher")
Expand Down
8 changes: 7 additions & 1 deletion src/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ def __init__(self, handlers: list[WatcherHandler], keys_source: BaseSource, web3
self.indexed_validators_keys: dict[str, str] = {}
self.chain_reorgs: dict[str, ChainReorgEvent] = {}
self.handled_headers: list[BlockHeaderResponseData] = []
self.suspicious_addresses = variables.SUSPICIOUS_ADDRESSES
if not self.suspicious_addresses and self.execution:
self.suspicious_addresses = {
self.execution.lido_contracts.lido_locator.functions.withdrawalVault().call()
}
AlexanderLukin marked this conversation as resolved.
Show resolved Hide resolved

def run(self, slots_range: Optional[str] = SLOTS_RANGE):
def _run(slot_to_handle='head'):
Expand Down Expand Up @@ -161,11 +166,12 @@ def force_use_fallback_callback(result) -> bool:
return False

slot = slot or 'head'

current_head = self.consensus.get_block_header(
slot, force_use_fallback_callback if slot == 'head' else lambda _: False
)
if len(self.handled_headers) > 0 and int(current_head.header.message.slot) == int(
self.handled_headers[-1].header.message.slot
self.handled_headers[-1].header.message.slot
):
return None
current_block = self.consensus.get_block_details(current_head.root)
Expand Down
Empty file added tests/eip7251/__init__.py
AlexanderLukin marked this conversation as resolved.
Show resolved Hide resolved
Empty file.
41 changes: 41 additions & 0 deletions tests/eip7251/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import pytest

from src.keys_source.base_source import NamedKey
from tests.eip7251.helpers import gen_random_address
from tests.eip7251.stubs import TestValidator, WatcherStub


@pytest.fixture
def user_keys() -> dict[str, NamedKey]:
return {}


@pytest.fixture
def validator():
return TestValidator.random()


@pytest.fixture
def lido_validator(user_keys):
dputko marked this conversation as resolved.
Show resolved Hide resolved
random_validator = TestValidator.random()
user_keys[random_validator.pubkey] = NamedKey(
key=random_validator.pubkey,
operatorName='Dimon operator',
dputko marked this conversation as resolved.
Show resolved Hide resolved
operatorIndex='1',
moduleIndex='1'
)
return random_validator


@pytest.fixture
def watcher(user_keys) -> WatcherStub:
return WatcherStub(
user_keys=user_keys
)


@pytest.fixture
def lido_withdrawal_vault(watcher: WatcherStub) -> str:
address = gen_random_address()
watcher.suspicious_addresses.add(address)
return address
56 changes: 56 additions & 0 deletions tests/eip7251/helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from secrets import token_hex

from src.providers.consensus.typings import FullBlockInfo, BlockHeader, BlockHeaderMessage, BlockMessage, BlockBody, \
BlockExecutionPayload, ExecutionRequests, WithdrawalRequest, ConsolidationRequest
from src.typings import StateRoot, BlockRoot


def gen_random_pubkey():
return random_hex(48)

def gen_random_address():
return random_hex(20)


def random_hex(length: int) -> str:
return '0x' + token_hex(length)


def create_sample_block(withdrawals: list[WithdrawalRequest] = None,
consolidations: list[ConsolidationRequest] = None) -> FullBlockInfo:
block = FullBlockInfo(
root=BlockRoot('0xa69fd326c1e4a84ac56a9f1e440cdb451fce8c4535e4fabd8447cda15506a8d5'),
canonical=True,
header=BlockHeader(
message=BlockHeaderMessage(
slot='33',
proposer_index='25',
parent_root=BlockRoot('0x924057843cd2718a918a1e354c0eb111b15f471319195ed9eeb45e7bf2dae3a7'),
state_root=StateRoot('0xcc026c107005b9442a26d763409886968cde30a1fbd605e2d9a1c813ddce9062'),
body_root='0x6f01de44a85b4cbe85d1d452de1979630217ce42e2326388152f39bb9d0a3dce'),
signature='0x99dde0eb3eaaec71e26e7a614f7eb99c37d7a143edbd55c0b2648dc9f2e754a4e26c3f1320592c2603567cc089a68d5d12f65ec1d9940837dd0d59b05356a0bbc1c3ad51a9546ece8c1233b7398ae3cf1df27c61591bf548b065b68d69bb9450'
),
message=BlockMessage(
slot='33',
proposer_index='25',
parent_root='0x924057843cd2718a918a1e354c0eb111b15f471319195ed9eeb45e7bf2dae3a7',
state_root=StateRoot('0xcc026c107005b9442a26d763409886968cde30a1fbd605e2d9a1c813ddce9062'),
body=BlockBody(
execution_payload=BlockExecutionPayload(block_number='31'),
voluntary_exits=[],
proposer_slashings=[],
attester_slashings=[],
)
),
signature='0x99dde0eb3eaaec71e26e7a614f7eb99c37d7a143edbd55c0b2648dc9f2e754a4e26c3f1320592c2603567cc089a68d5d12f65ec1d9940837dd0d59b05356a0bbc1c3ad51a9546ece8c1233b7398ae3cf1df27c61591bf548b065b68d69bb9450'
)
if not withdrawals and not consolidations:
return block

execution_requests = ExecutionRequests(
deposits=[],
withdrawals=withdrawals or [],
consolidations=consolidations or []
)
block.message.body.execution_requests = execution_requests
return block
Loading
Loading