Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Etherscan contract verification #330

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
6 changes: 3 additions & 3 deletions boa/contracts/vyper/vyper_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,11 +573,11 @@ def __init__(
self.env.register_contract(self._address, self)

def _run_init(self, *args, value=0, override_address=None, gas=None):
encoded_args = b""
self.ctor_calldata = b""
if self._ctor:
encoded_args = self._ctor.prepare_calldata(*args)
self.ctor_calldata = self._ctor.prepare_calldata(*args)

initcode = self.compiler_data.bytecode + encoded_args
initcode = self.compiler_data.bytecode + self.ctor_calldata
with self._anchor_source_map(self._deployment_source_map):
address, computation = self.env.deploy(
bytecode=initcode,
Expand Down
115 changes: 113 additions & 2 deletions boa/explorer.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,23 @@
import re
import time
from dataclasses import dataclass
from datetime import timedelta
from typing import Optional

import boa
from boa.rpc import json
from boa.util.abi import Address
from boa.verifiers import ContractVerifier, VerificationResult, _wait_until

try:
from requests_cache import CachedSession

def filter_fn(response):
return response.ok and _is_success_response(response.json())

SESSION = CachedSession(
"~/.cache/titanoboa/explorer_cache",
filter_fn=lambda response: _is_success_response(response.json()),
filter_fn=filter_fn,
allowable_codes=[200],
cache_control=True,
expire_after=3600 * 6,
Expand All @@ -22,15 +30,118 @@
SESSION = Session()

DEFAULT_ETHERSCAN_URI = "https://api.etherscan.io/api"
VERSION_RE = re.compile(r"v(\d+\.\d+\.\d+)(\+commit.*)?")


@dataclass
class Etherscan:
class Etherscan(ContractVerifier[str]):
uri: Optional[str] = DEFAULT_ETHERSCAN_URI
api_key: Optional[str] = None
num_retries: int = 10
backoff_ms: int | float = 400.0
backoff_factor: float = 1.1 # 1.1**10 ~= 2.59
timeout = timedelta(minutes=2)

def verify(
self,
address: Address,
contract_name: str,
solc_json: dict,
constructor_calldata: bytes,
license_type: str = "1",
wait: bool = False,
) -> Optional["VerificationResult[str]"]:
"""
Verify the Vyper contract on Etherscan.
:param address: The address of the contract.
:param contract_name: The name of the contract.
:param solc_json: The solc_json output of the Vyper compiler.
:param constructor_calldata: The calldata for the contract constructor.
:param license_type: The license to use for the contract. Defaults to "none".
:param wait: Whether to return a VerificationResult immediately
or wait for verification to complete. Defaults to False
"""
api_key = self.api_key or ""
output_selection = solc_json["settings"]["outputSelection"]
contract_file = next(k for k, v in output_selection.items() if "*" in v)
compiler_version = solc_json["compiler_version"]
version_match = re.match(VERSION_RE, compiler_version)
if not version_match:
raise ValueError(f"Failed to extract Vyper version from {compiler_version}")

data = {
"module": "contract",
"action": "verifysourcecode",
"apikey": api_key,
"chainId": boa.env.get_chain_id(),
DanielSchiavini marked this conversation as resolved.
Show resolved Hide resolved
"codeformat": "vyper-json",
"sourceCode": json.dumps(solc_json),
"constructorArguments": constructor_calldata.hex(),
"contractaddress": address,
"contractname": f"{contract_file}:{contract_name}",
"compilerversion": f"vyper:{version_match.group(1)}",
"licenseType": license_type,
"optimizationUsed": "1",
}

def verification_created():
# we need to retry until the contract is found by Etherscan
response = SESSION.post(self.uri, data=data)
response.raise_for_status()
response_json = response.json()
if response_json.get("status") == "1":
return response_json["result"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a string?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes definitely, that's the identifier

if (
response_json.get("message") == "NOTOK"
and "Unable to locate ContractCode" not in response_json["result"]
):
raise ValueError(f"Failed to verify: {response_json['result']}")
print(
f'Verification could not be created yet: {response_json["result"]}. Retrying...'
)
return None

identifier = _wait_until(
DanielSchiavini marked this conversation as resolved.
Show resolved Hide resolved
verification_created, timedelta(minutes=2), timedelta(seconds=5), 1.1
)
print(f"Verification started with identifier {identifier}")
if not wait:
return VerificationResult(identifier, self)

self.wait_for_verification(identifier)
return None

def wait_for_verification(self, identifier: str) -> None:
"""
Waits for the contract to be verified on Etherscan.
:param identifier: The identifier of the contract.
"""
_wait_until(
lambda: self.is_verified(identifier),
self.timeout,
self.backoff,
self.backoff_factor,
)
print("Contract verified!")

@property
def backoff(self):
return timedelta(milliseconds=self.backoff_ms)

def is_verified(self, identifier: str) -> bool:
api_key = self.api_key or ""
url = f"{self.uri}?module=contract&action=checkverifystatus"
url += f"&guid={identifier}&apikey={api_key}"

response = SESSION.get(url)
response.raise_for_status()
response_json = response.json()
if (
response_json.get("message") == "NOTOK"
and "Pending in queue" not in response_json["result"]
):
raise ValueError(f"Failed to verify: {response_json['result']}")
return response_json.get("status") == "1"

def __post_init__(self):
if self.uri is None:
Expand Down
91 changes: 64 additions & 27 deletions boa/verifiers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,38 @@
from dataclasses import dataclass
from datetime import datetime, timedelta
from http import HTTPStatus
from typing import Optional
from typing import Callable, Generic, Optional, TypeVar

import requests

from boa.util.abi import Address
from boa.util.open_ctx import Open

DEFAULT_BLOCKSCOUT_URI = "https://eth.blockscout.com"
T = TypeVar("T")


class ContractVerifier(Generic[T]):
def verify(
self,
address: Address,
contract_name: str,
solc_json: dict,
constructor_calldata: bytes,
license_type: str = "1",
wait: bool = False,
) -> Optional["VerificationResult[T]"]:
raise NotImplementedError

def wait_for_verification(self, identifier: T) -> None:
raise NotImplementedError

def is_verified(self, identifier: T) -> bool:
raise NotImplementedError


@dataclass
class Blockscout:
class Blockscout(ContractVerifier[Address]):
"""
Allows users to verify contracts on Blockscout.
This is independent of Vyper contracts, and can be used to verify any smart contract.
Expand All @@ -37,14 +57,16 @@ def verify(
address: Address,
contract_name: str,
solc_json: dict,
license_type: str = None,
constructor_calldata: bytes,
license_type: str = "1",
wait: bool = False,
) -> Optional["VerificationResult"]:
) -> Optional["VerificationResult[Address]"]:
"""
Verify the Vyper contract on Blockscout.
:param address: The address of the contract.
:param contract_name: The name of the contract.
:param solc_json: The solc_json output of the Vyper compiler.
:param constructor_calldata: The calldata for the constructor.
:param license_type: The license to use for the contract. Defaults to "none".
:param wait: Whether to return a VerificationResult immediately
or wait for verification to complete. Defaults to False
Expand Down Expand Up @@ -83,18 +105,15 @@ def wait_for_verification(self, address: Address) -> None:
Waits for the contract to be verified on Blockscout.
:param address: The address of the contract.
"""
timeout = datetime.now() + self.timeout
wait_time = self.backoff
while datetime.now() < timeout:
if self.is_verified(address):
msg = "Contract verified!"
msg += f" {self.uri}/address/{address}?tab=contract_code"
print(msg)
return
time.sleep(wait_time.total_seconds())
wait_time *= self.backoff_factor

raise TimeoutError("Timeout waiting for verification to complete")
_wait_until(
lambda: self.is_verified(address),
self.timeout,
self.backoff,
self.backoff_factor,
)
msg = "Contract verified!"
msg += f" {self.uri}/address/{address}?tab=contract_code"
print(msg)

def is_verified(self, address: Address) -> bool:
api_key = self.api_key or ""
Expand All @@ -107,19 +126,19 @@ def is_verified(self, address: Address) -> bool:
return response.json().get("is_verified", False)


_verifier = Blockscout()
_verifier: ContractVerifier = Blockscout()


@dataclass
class VerificationResult:
address: Address
verifier: Blockscout
class VerificationResult(Generic[T]):
identifier: T
verifier: ContractVerifier

def wait_for_verification(self):
self.verifier.wait_for_verification(self.address)
self.verifier.wait_for_verification(self.identifier)

def is_verified(self):
return self.verifier.is_verified(self.address)
return self.verifier.is_verified(self.identifier)


def _set_verifier(verifier):
Expand All @@ -133,7 +152,7 @@ def get_verifier():


# TODO: maybe allow like `set_verifier("blockscout", *args, **kwargs)`
def set_verifier(verifier):
def set_verifier(verifier: ContractVerifier):
return Open(get_verifier, _set_verifier, verifier)


Expand All @@ -147,14 +166,14 @@ def get_verification_bundle(contract_like):

# should we also add a `verify_deployment` function?
def verify(
contract, verifier=None, license_type: str = None, wait=False
) -> VerificationResult:
contract, verifier: ContractVerifier = None, wait=False, **kwargs
) -> VerificationResult | None:
"""
Verifies the contract on a block explorer.
:param contract: The contract to verify.
:param verifier: The block explorer verifier to use.
Defaults to get_verifier().
:param license_type: Optional license to use for the contract.
:param wait: Whether to wait for verification to complete.
"""
if verifier is None:
verifier = get_verifier()
Expand All @@ -166,6 +185,24 @@ def verify(
address=contract.address,
solc_json=bundle,
contract_name=contract.contract_name,
license_type=license_type,
constructor_calldata=contract.ctor_calldata,
wait=wait,
**kwargs,
)


def _wait_until(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the indirection here is extremely confusing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just make it part of the base contract verifier?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While it is in the verifiers file, it is definitely not specific for contract verification.
Since it doesn't use anything from self I didn't see the point of keeping it in a class

predicate: Callable[[], T],
wait_for: timedelta,
backoff: timedelta,
backoff_factor: float,
) -> T:
timeout = datetime.now() + wait_for
wait_time = backoff
while datetime.now() < timeout:
if result := predicate():
return result
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return the result?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so it could be used with more than just bools (but yeah I know you don't like truthiness)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the value is actually used to wait until the contract is created:

identifier = _wait_until(

time.sleep(wait_time.total_seconds())
wait_time *= backoff_factor

raise TimeoutError("Timeout waiting for verification to complete")
3 changes: 2 additions & 1 deletion tests/integration/fork/test_abi_contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ def test_call_trace_abi_and_vyper(crvusd):
@external
def foo(x: IERC20):
extcall x.transfer(self, 100)
"""
""",
name="VyperContract",
)
boa.env.set_balance(boa.env.eoa, 1000)
with boa.reverts():
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/network/anvil/test_network_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def test_deployment_db_overriden_contract_name():
contract_name = "test_deployment"

# contract is written to deployments db
contract = boa.loads(code, arg, contract_name=contract_name)
contract = boa.loads(code, arg, name=contract_name)

# test get_deployments()
deployment = next(db.get_deployments())
Expand Down
8 changes: 8 additions & 0 deletions tests/integration/network/sepolia/module_lib.vy
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# pragma version ~=0.4.0

@view
def throw():
raise "Error with message"

def throw_dev_reason():
raise # dev: some dev reason
Loading
Loading