diff --git a/.github/workflows/pull_request.yaml b/.github/workflows/pull_request.yaml index e14548a..35af45f 100644 --- a/.github/workflows/pull_request.yaml +++ b/.github/workflows/pull_request.yaml @@ -6,8 +6,29 @@ on: jobs: python: runs-on: ubuntu-latest + services: + postgres: + image: postgres + env: + POSTGRES_PASSWORD: postgres + POSTGRES_DB: mainnet + # Set health checks to wait until postgres has started + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 steps: - uses: actions/checkout@v3 + - name: Initialize database + run: | + for file in ${{ github.workspace }}/database/*.sql; do + psql -h localhost -U postgres -d mainnet -f "$file" + done + env: + PGPASSWORD: postgres - name: Setup Python 3.12 uses: actions/setup-python@v3 with: @@ -26,7 +47,8 @@ jobs: - name: Type Check (mypy) run: mypy src - name: Tests - run: pytest tests/e2e/test_blockchain_data.py tests/e2e/test_imbalances_script.py + run: pytest tests/unit/ tests/e2e/ env: NODE_URL: ${{ secrets.NODE_URL }} + SOLVER_SLIPPAGE_DB_URL: postgres:postgres@localhost:5432/mainnet CHAIN_SLEEP_TIME: 1 diff --git a/Dockerfile.test_db b/Dockerfile.test_db index 0682417..a143cef 100644 --- a/Dockerfile.test_db +++ b/Dockerfile.test_db @@ -1,4 +1,3 @@ FROM postgres ENV POSTGRES_PASSWORD=postgres ENV POSTGRES_DB=mainnet -COPY ./database/01_table_creation.sql /docker-entrypoint-initdb.d/ diff --git a/Makefile b/Makefile index 0880e0e..0f8fb8d 100644 --- a/Makefile +++ b/Makefile @@ -13,7 +13,7 @@ daemon: test_db: docker build -t $(DOCKER_IMAGE_NAME) -f Dockerfile.test_db . - docker run -d --name $(DOCKER_CONTAINER_NAME) -p $(DB_PORT):$(DB_PORT) $(DOCKER_IMAGE_NAME) + docker run -d --name $(DOCKER_CONTAINER_NAME) -p $(DB_PORT):$(DB_PORT) -v ${PWD}/database/00_legacy_tables.sql:/docker-entrypoint-initdb.d/00_legacy_tables.sql -v ${PWD}/database/01_table_creation.sql:/docker-entrypoint-initdb.d/01_table_creation.sql $(DOCKER_IMAGE_NAME) stop_test_db: docker stop $(DOCKER_CONTAINER_NAME) || true diff --git a/database/00_legacy_tables.sql b/database/00_legacy_tables.sql new file mode 100644 index 0000000..67e82b5 --- /dev/null +++ b/database/00_legacy_tables.sql @@ -0,0 +1,36 @@ +-- Database Schema for Token Imbalances and Slippage + +-- Table: raw_token_imbalances (for storing raw token imbalances) +CREATE TABLE raw_token_imbalances ( + auction_id BIGINT NOT NULL, + chain_name VARCHAR(50) NOT NULL, + block_number BIGINT NOT NULL, + tx_hash BYTEA NOT NULL, + token_address BYTEA NOT NULL, + imbalance NUMERIC(78,0), + PRIMARY KEY (tx_hash, token_address) +); + +-- Table: slippage_prices (for storing per unit token prices in ETH) +CREATE TABLE slippage_prices ( + chain_name VARCHAR(50) NOT NULL, + source VARCHAR(50) NOT NULL, + block_number BIGINT NOT NULL, + tx_hash BYTEA NOT NULL, + token_address BYTEA NOT NULL, + price NUMERIC(42,18), + PRIMARY KEY (tx_hash, token_address) +); + +-- Table: Stores fees (i.e. protocol fee, network fee on per token basis) +CREATE TABLE fees_new ( + chain_name VARCHAR(50) NOT NULL, + auction_id BIGINT NOT NULL, + block_number BIGINT NOT NULL, + tx_hash BYTEA NOT NULL, + order_uid BYTEA NOT NULL, + token_address BYTEA NOT NULL, + fee_amount NUMERIC(78,0) NOT NULL, + fee_type VARCHAR(50) NOT NULL, -- e.g. "protocol" or "network" + PRIMARY KEY (tx_hash, order_uid, token_address, fee_type) +); diff --git a/database/01_table_creation.sql b/database/01_table_creation.sql index b7c4255..6d43302 100644 --- a/database/01_table_creation.sql +++ b/database/01_table_creation.sql @@ -1,4 +1,4 @@ -CREATE TABLE transaction_timestamps ( +CREATE TABLE transaction_timestamp ( tx_hash bytea PRIMARY KEY, time timestamp NOT NULL ); @@ -15,7 +15,7 @@ CREATE TYPE PriceSource AS ENUM ('coingecko', 'moralis', 'dune', 'native'); CREATE TABLE prices ( token_address bytea NOT NULL, time timestamp NOT NULL, - price numeric(60, 18) NOT NULL, + price numeric(78, 18) NOT NULL, source PriceSource NOT NULL, PRIMARY KEY (token_address, time, source) diff --git a/requirements.in b/requirements.in index 4030533..c95bacb 100644 --- a/requirements.in +++ b/requirements.in @@ -1,11 +1,8 @@ dune-client moralis -pandas -pandas-stubs -psycopg2 +psycopg python-dotenv requests -types-psycopg2 types-requests SQLAlchemy web3 diff --git a/requirements.txt b/requirements.txt index c8f59cb..b844005 100644 --- a/requirements.txt +++ b/requirements.txt @@ -116,19 +116,11 @@ mypy-extensions==1.0.0 # typing-inspect ndjson==0.3.1 # via dune-client -numpy==1.26.4 - # via - # pandas - # pandas-stubs packaging==24.1 # via # black # marshmallow # pytest -pandas==2.2.1 - # via -r requirements.in -pandas-stubs==2.2.2.240909 - # via -r requirements.in parsimonious==0.10.0 # via eth-abi pathspec==0.12.1 @@ -139,7 +131,7 @@ platformdirs==4.3.6 # pylint pluggy==1.5.0 # via pytest -psycopg2==2.9.9 +psycopg==3.2.3 # via -r requirements.in pycryptodome==3.20.0 # via @@ -159,11 +151,8 @@ python-dateutil==2.9.0.post0 # via # dune-client # moralis - # pandas python-dotenv==1.0.0 # via -r requirements.in -pytz==2024.2 - # via pandas pyunormalize==16.0.0 # via web3 regex==2024.9.11 @@ -187,12 +176,8 @@ toolz==0.12.1 # via cytoolz types-deprecated==1.2.9.20240311 # via dune-client -types-psycopg2==2.9.21.20240819 - # via -r requirements.in types-python-dateutil==2.9.0.20240906 # via dune-client -types-pytz==2024.2.0.20240913 - # via pandas-stubs types-pyyaml==6.0.12.20240917 # via dune-client types-requests==2.32.0.20240914 @@ -207,6 +192,7 @@ typing-extensions==4.12.2 # eth-typing # moralis # mypy + # psycopg # pydantic # pydantic-core # sqlalchemy @@ -214,8 +200,6 @@ typing-extensions==4.12.2 # web3 typing-inspect==0.9.0 # via dataclasses-json -tzdata==2024.1 - # via pandas urllib3==2.2.3 # via # moralis diff --git a/src/helpers/blockchain_data.py b/src/helpers/blockchain_data.py index df6028d..072e9c4 100644 --- a/src/helpers/blockchain_data.py +++ b/src/helpers/blockchain_data.py @@ -1,5 +1,8 @@ from hexbytes import HexBytes from web3 import Web3 +from web3.types import HexStr + +from contracts.erc20_abi import erc20_abi from src.helpers.config import logger from src.constants import SETTLEMENT_CONTRACT_ADDRESS, INVALIDATED_ORDER_TOPIC @@ -71,3 +74,31 @@ def get_auction_id(self, tx_hash: str) -> int: # convert bytes to int auction_id = int.from_bytes(call_data_bytes[-8:], byteorder="big") return auction_id + + def get_transaction_timestamp(self, tx_hash: str) -> tuple[str, int]: + receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash)) + block_number = receipt["blockNumber"] + block = self.web3.eth.get_block(block_number) + timestamp = block["timestamp"] + + return tx_hash, timestamp + + def get_transaction_tokens(self, tx_hash: str) -> list[tuple[str, str]]: + receipt = self.web3.eth.get_transaction_receipt(HexStr(tx_hash)) + + transfer_topic = self.web3.keccak(text="Transfer(address,address,uint256)") + + token_addresses: set[str] = set() + for log in receipt["logs"]: + if log["topics"] and log["topics"][0] == transfer_topic: + token_address = log["address"] + token_addresses.add(token_address) + + return [(tx_hash, token_address) for token_address in token_addresses] + + def get_token_decimals(self, token_address: str) -> int: + """Get number of decimals for a token.""" + contract = self.web3.eth.contract( + address=Web3.to_checksum_address(token_address), abi=erc20_abi + ) + return contract.functions.decimals().call() diff --git a/src/helpers/config.py b/src/helpers/config.py index 93ea485..5715635 100644 --- a/src/helpers/config.py +++ b/src/helpers/config.py @@ -51,7 +51,7 @@ def create_db_connection(db_type: str) -> Engine: if not db_url: raise ValueError(f"{db_type} database URL not found in environment variables.") - return create_engine(f"postgresql+psycopg2://{db_url}") + return create_engine(f"postgresql+psycopg://{db_url}") def check_db_connection(engine: Engine, db_type: str) -> Engine: diff --git a/src/helpers/database.py b/src/helpers/database.py index 23635e2..a3c4bd1 100644 --- a/src/helpers/database.py +++ b/src/helpers/database.py @@ -1,5 +1,10 @@ -from sqlalchemy import text +from datetime import datetime + +from hexbytes import HexBytes +import psycopg +from sqlalchemy import text, insert, Table, Column, Integer, LargeBinary, MetaData from sqlalchemy.engine import Engine + from src.helpers.config import check_db_connection, logger from src.helpers.helper_functions import read_sql_file from src.constants import NULL_ADDRESS_STRING @@ -124,3 +129,99 @@ def write_fees( "fee_recipient": final_recipient, }, ) + + def write_transaction_timestamp( + self, transaction_timestamp: tuple[str, int] + ) -> None: + """Writes the transaction timestamp to database.""" + query = ( + "INSERT INTO transaction_timestamp (tx_hash, time) " + "VALUES (:tx_hash, :time);" + ) + self.execute_and_commit( + query, + { + "tx_hash": bytes.fromhex(transaction_timestamp[0][2:]), + "time": datetime.fromtimestamp(transaction_timestamp[1]), + }, + ) + + def write_transaction_tokens( + self, transaction_tokens: list[tuple[str, str]] + ) -> None: + """Writes the transaction tokens to the database.""" + query = ( + "INSERT INTO transaction_tokens (tx_hash, token_address) " + "VALUES (:tx_hash, :token_address);" + ) + for tx_hash, token_address in transaction_tokens: + self.execute_and_commit( + query, + { + "tx_hash": bytes.fromhex(tx_hash[2:]), + "token_address": bytes.fromhex(token_address[2:]), + }, + ) + + def write_prices_new(self, prices: list[tuple[str, int, float, str]]) -> None: + """Write prices to database.""" + query = ( + "INSERT INTO prices (token_address, time, price, source) " + "VALUES (:token_address, :time, :price, :source);" + ) + for token_address, time, price, source in prices: + try: + self.execute_and_commit( + query, + { + "token_address": bytes.fromhex(token_address[2:]), + "time": datetime.fromtimestamp(time), + "price": price, + "source": source, + }, + ) + except psycopg.errors.NumericValueOutOfRange: + logger.warning( + f"Error while writing price data. token: {token_address}, " + f"time: {time}, price: {price}, source: {source}" + ) + + def get_latest_transaction(self) -> str | None: + """Get latest transaction hash. + If no transaction is found, return None.""" + query = "SELECT tx_hash FROM transaction_timestamp ORDER BY time DESC LIMIT 1;" + result = self.execute_query(query, {}).fetchone() + + if result is None: + return None + + latest_tx_hash = HexBytes(result[0]).to_0x_hex() + return latest_tx_hash + + def get_tokens_without_decimals(self) -> list[str]: + """Get tokens without decimals.""" + query = ( + "SELECT token_address FROM transaction_tokens " + "WHERE token_address not in (SELECT token_address FROM token_decimals);" + ) + result = self.execute_query(query, {}).fetchall() + return list({HexBytes(row[0]).to_0x_hex() for row in result}) + + def write_token_decimals(self, token_decimals: list[tuple[str, int]]) -> None: + self.engine = check_db_connection(self.engine, "solver_slippage") + + # Define the table without creating a model class + token_decimals_table = Table( + "token_decimals", MetaData(), autoload_with=self.engine + ) + + # Prepare the data + records = [ + {"token_address": bytes.fromhex(token_address[2:]), "decimals": decimals} + for token_address, decimals in token_decimals + ] + + # Execute the bulk insert + with self.engine.connect() as conn: + conn.execute(token_decimals_table.insert(), records) + conn.commit() diff --git a/src/imbalances_script.py b/src/imbalances_script.py index ec91f63..baf276a 100644 --- a/src/imbalances_script.py +++ b/src/imbalances_script.py @@ -24,7 +24,7 @@ from web3 import Web3 from web3.datastructures import AttributeDict -from web3.types import TxReceipt +from web3.types import HexStr, TxReceipt from src.helpers.config import CHAIN_RPC_ENDPOINTS, logger from src.constants import ( SETTLEMENT_CONTRACT_ADDRESS, diff --git a/src/price_providers/coingecko_pricing.py b/src/price_providers/coingecko_pricing.py index f296180..f2e0b48 100644 --- a/src/price_providers/coingecko_pricing.py +++ b/src/price_providers/coingecko_pricing.py @@ -1,8 +1,9 @@ import os import time + import requests -import json from web3 import Web3 + from src.price_providers.pricing_model import AbstractPriceProvider from src.helpers.config import logger, get_web3_instance from src.helpers.helper_functions import get_finalized_block_number, extract_params @@ -29,13 +30,16 @@ def __init__(self) -> None: @property def name(self) -> str: - return "Coingecko" + return "coingecko" - def fetch_coingecko_list(self) -> list[dict]: + def fetch_coingecko_list(self) -> list[dict] | None: """ Fetch and filter the list of tokens (currently filters only Ethereum) from the Coingecko API. """ + if not coingecko_api_key: + logger.warning("Coingecko API key is not set.") + return None url = ( f"https://pro-api.coingecko.com/api/v3/coins/" f"list?include_platform=true&status=active" @@ -47,7 +51,7 @@ def fetch_coingecko_list(self) -> list[dict]: headers["x-cg-pro-api-key"] = coingecko_api_key response = requests.get(url, headers=headers) - tokens_list = json.loads(response.text) + tokens_list = response.json() return [ {"id": item["id"], "platforms": {"ethereum": item["platforms"]["ethereum"]}} for item in tokens_list @@ -71,9 +75,12 @@ def get_token_id_by_address(self, token_address: str) -> str | None: self.last_reload_time = ( time.time() ) # update the last reload time to current time - for token in self.filtered_token_list: - if token["platforms"].get("ethereum") == token_address: - return token["id"] + if ( + self.filtered_token_list is not None + ): # TODO: handle missing keys more systematically + for token in self.filtered_token_list: + if token["platforms"].get("ethereum") == token_address: + return token["id"] return None def fetch_api_price( @@ -82,9 +89,6 @@ def fetch_api_price( """ Makes call to Coingecko API to fetch price, between a start and end timestamp. """ - if not coingecko_api_key: - logger.warning("Coingecko API key is not set.") - return None # price of token is returned in ETH url = ( f"https://pro-api.coingecko.com/api/v3/coins/{token_id}/market_chart/range" @@ -122,6 +126,9 @@ def get_price(self, price_params: dict) -> float | None: Function returns coingecko price for a token address, closest to and at least as large as the block timestamp for a given tx hash. """ + if not coingecko_api_key: + logger.warning("Coingecko API key is not set.") + return None token_address, block_number = extract_params(price_params, is_block=True) block_start_timestamp = self.web3.eth.get_block(block_number)["timestamp"] if self.price_not_retrievable(block_start_timestamp): diff --git a/src/price_providers/dune_pricing.py b/src/price_providers/dune_pricing.py index 5954ec0..39c36f6 100644 --- a/src/price_providers/dune_pricing.py +++ b/src/price_providers/dune_pricing.py @@ -23,7 +23,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "Dune" + return "dune" def initialize_dune_client(self) -> DuneClient | None: """ diff --git a/src/price_providers/endpoint_auction_pricing.py b/src/price_providers/endpoint_auction_pricing.py index f525bee..ff4012a 100644 --- a/src/price_providers/endpoint_auction_pricing.py +++ b/src/price_providers/endpoint_auction_pricing.py @@ -1,13 +1,16 @@ import requests + from src.price_providers.pricing_model import AbstractPriceProvider -from src.helpers.config import logger -from src.helpers.helper_functions import extract_params, get_token_decimals +from src.helpers.blockchain_data import BlockchainData +from src.helpers.config import get_web3_instance, logger +from src.helpers.helper_functions import extract_params class AuctionPriceProvider(AbstractPriceProvider): """Fetch auction prices.""" def __init__(self) -> None: + self.blockchain = BlockchainData(get_web3_instance()) self.endpoint_urls = { "prod": f"https://api.cow.fi/mainnet/api/v1/solver_competition/by_tx_hash/", "barn": f"https://barn.api.cow.fi/mainnet/api/v1/solver_competition/by_tx_hash/", @@ -15,7 +18,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "AuctionPrices" + return "native" def get_price(self, price_params: dict) -> float | None: """Function returns Auction price from endpoint for a token address.""" @@ -39,7 +42,7 @@ def get_price(self, price_params: dict) -> float | None: return None # calculation for converting auction price from endpoint to ETH equivalent per token unit price_in_eth = (float(price) / 10**18) * ( - 10 ** get_token_decimals(token_address) / 10**18 + 10 ** self.blockchain.get_token_decimals(token_address) / 10**18 ) return price_in_eth diff --git a/src/price_providers/moralis_pricing.py b/src/price_providers/moralis_pricing.py index 9cdd968..affc793 100644 --- a/src/price_providers/moralis_pricing.py +++ b/src/price_providers/moralis_pricing.py @@ -1,11 +1,14 @@ +import os + +from dotenv import load_dotenv from moralis import evm_api + from src.helpers.config import get_logger -import os, dotenv from src.price_providers.pricing_model import AbstractPriceProvider from src.helpers.helper_functions import extract_params -dotenv.load_dotenv() +load_dotenv() class MoralisPriceProvider(AbstractPriceProvider): @@ -18,7 +21,7 @@ def __init__(self) -> None: @property def name(self) -> str: - return "Moralis" + return "moralis" @staticmethod def wei_to_eth(price: str) -> float | None: @@ -34,6 +37,9 @@ def get_price(self, price_params: dict) -> float | None: Price returned is closest to and at least as large as block timestamp. """ try: + if os.getenv("MORALIS_API_KEY") is None: + self.logger.warning("Moralis API key is not set.") + return None token_address, block_number = extract_params(price_params, is_block=True) params = { "chain": "eth", diff --git a/src/price_providers/price_feed.py b/src/price_providers/price_feed.py index afed5ee..c8b96ec 100644 --- a/src/price_providers/price_feed.py +++ b/src/price_providers/price_feed.py @@ -1,13 +1,21 @@ +from web3.types import HexStr + from src.price_providers.coingecko_pricing import CoingeckoPriceProvider from src.price_providers.dune_pricing import DunePriceProvider from src.price_providers.moralis_pricing import MoralisPriceProvider from src.price_providers.endpoint_auction_pricing import AuctionPriceProvider from src.helpers.config import logger +NATIVE_TOKEN = HexStr("0xEeeeeEeeeEeEeeEeEeEeeEEEeeeeEeeeeeeeEEeE") + +# pylint: disable=logging-fstring-interpolation + class PriceFeed: """Class encapsulating the different price providers.""" + # pylint: disable=too-few-public-methods + def __init__(self, activate: bool): if activate: self.providers = [ @@ -18,13 +26,17 @@ def __init__(self, activate: bool): else: self.providers = [] - def get_price(self, price_params: dict) -> tuple[float, str] | None: + def get_price(self, price_params: dict) -> list[tuple[float, str]]: """Function iterates over list of price provider objects and attempts to get a price.""" + prices: list[tuple[float, str]] = [] for provider in self.providers: try: - price = provider.get_price(price_params) + if HexStr(price_params["token_address"]) == NATIVE_TOKEN: + price: float | None = 1.0 + else: + price = provider.get_price(price_params) if price is not None: - return price, provider.name + prices.append((price, provider.name)) except Exception as e: logger.error(f"Error getting price from provider {provider.name}: {e}") - return None + return prices diff --git a/src/token_decimals.py b/src/token_decimals.py new file mode 100644 index 0000000..70b9e29 --- /dev/null +++ b/src/token_decimals.py @@ -0,0 +1,31 @@ +from os import getenv + +from dotenv import load_dotenv +from sqlalchemy import create_engine +from web3 import Web3 + +from src.helpers.blockchain_data import BlockchainData +from src.helpers.database import Database + + +load_dotenv() + + +def update_token_decimals(database: Database, blockchain: BlockchainData) -> None: + token_addresses = database.get_tokens_without_decimals() + + token_decimals = [ + (token_address, blockchain.get_token_decimals(token_address)) + for token_address in token_addresses + ] + if token_decimals: + database.write_token_decimals(token_decimals) + + +if __name__ == "__main__": + engine = create_engine(f"postgresql+psycopg://{getenv('SOLVER_SLIPPAGE_DB_URL')}") + database = Database(engine, "mainnet") + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + blockchain_data = BlockchainData(web3) + + update_token_decimals(database, blockchain_data) diff --git a/src/transaction_processor.py b/src/transaction_processor.py index c695b92..af42ace 100644 --- a/src/transaction_processor.py +++ b/src/transaction_processor.py @@ -1,18 +1,25 @@ +import time + from hexbytes import HexBytes from web3 import Web3 + +from src.fees.compute_fees import compute_all_fees_of_batch from src.helpers.blockchain_data import BlockchainData +from src.helpers.config import CHAIN_SLEEP_TIME, logger from src.helpers.database import Database +from src.helpers.helper_functions import read_sql_file, set_params from src.imbalances_script import RawTokenImbalances from src.price_providers.price_feed import PriceFeed -from src.helpers.helper_functions import read_sql_file, set_params -from src.helpers.config import CHAIN_SLEEP_TIME, logger -from src.fees.compute_fees import compute_all_fees_of_batch -import time +from src.token_decimals import update_token_decimals + +# pylint: disable=logging-fstring-interpolation class TransactionProcessor: """Class processes transactions for the slippage project.""" + # pylint: disable=too-many-instance-attributes, too-many-arguments + def __init__( self, blockchain_data: BlockchainData, @@ -43,29 +50,37 @@ def get_start_block(self) -> int: If no entries are present, fallback to get_finalized_block_number(). """ try: - # Query for the maximum block number - query_max_block = read_sql_file("src/sql/select_max_block.sql") - result = self.db.execute_query( - query_max_block, {"chain_name": self.chain_name} - ) - row = result.fetchone() - max_block = row[0] if row is not None else None - blockchain_latest_block = self.blockchain_data.get_latest_block() - - # If no entries present, fallback to get_latest_block() - if max_block is None: - return blockchain_latest_block - - logger.info("Fetched max block number from database: %d", max_block) - if max_block > blockchain_latest_block - 7200: - return max_block + 1 + # 1) get latest block on chain + block_number_latest = self.blockchain_data.get_latest_block() + # 2) get last transaction from DB + latest_tx_hash = self.db.get_latest_transaction() + # 3) get block of that transaction + if latest_tx_hash: + block_number_db = int( + self.blockchain_data.web3.eth.get_transaction_receipt( + HexBytes(latest_tx_hash) + )["blockNumber"] + ) + if block_number_db < block_number_latest - 7200: + # TODO: Remove this rule before moving to production. + logger.warning( + "Only old transactions found in database, latest was on block" + f"{block_number_db}. Using recent block instead." + ) + start_block = block_number_latest + else: + start_block = block_number_db + 1 else: - # TODO: Remove this rule before moving to production. - return blockchain_latest_block + logger.warning( + "No transaction found in database. Using recent block instead." + ) + start_block = block_number_latest except Exception as e: - logger.error("Error fetching start block from database: %s", e) + logger.error(f"Error fetching start block: {e}") raise + return start_block + def process(self, start_block: int) -> None: """Main Daemon loop that finds imbalances for txs and prices.""" previous_block = start_block @@ -103,51 +118,82 @@ def process_single_transaction( """Function processes a single tx to find imbalances, fees, prices including writing to database.""" self.log_message = [] try: - # Compute Raw Token Imbalances - if self.process_imbalances: - token_imbalances = self.process_token_imbalances( - tx_hash, auction_id, block_number - ) + # compute raw token imbalances + token_imbalances = self.process_token_imbalances( + tx_hash, auction_id, block_number + ) - # Compute Fees - if self.process_fees: - ( - protocol_fees, - partner_fees, - network_fees, - ) = self.process_fees_for_transaction(tx_hash) - - # Compute Prices - if self.process_prices and self.process_imbalances: - prices = self.process_prices_for_tokens( - token_imbalances, block_number, tx_hash - ) + # get transaction timestamp + transaction_timestamp = self.blockchain_data.get_transaction_timestamp( + tx_hash + ) + # store transaction timestamp + self.db.write_transaction_timestamp(transaction_timestamp) + + # get transaction tokens + # transaction_tokens = self.blockchain_data.get_transaction_tokens(tx_hash) + # store transaction tokens + transaction_tokens = [] + for token_address, imbalance in token_imbalances.items(): + if imbalance != 0: + transaction_tokens.append((tx_hash, token_address)) + self.db.write_transaction_tokens(transaction_tokens) + + # update token decimals + update_token_decimals(self.db, self.blockchain_data) + + # get prices + prices_new = self.get_prices_for_tokens( + transaction_timestamp, transaction_tokens + ) + # store prices + self.db.write_prices_new(prices_new) + + # Compute Raw Token Imbalances + # if self.process_imbalances: + # token_imbalances = self.process_token_imbalances( + # tx_hash, auction_id, block_number + # ) + + # # Compute Fees + # if self.process_fees: + # ( + # protocol_fees, + # partner_fees, + # network_fees, + # ) = self.process_fees_for_transaction(tx_hash) + + # # Compute Prices + # if self.process_prices and self.process_imbalances: + # prices = self.process_prices_for_tokens( + # token_imbalances, block_number, tx_hash + # ) # Write to database iff no errors in either computations - if ( - (not self.process_imbalances) - and (not self.process_fees) - and (not self.process_prices) - ): - return + # if ( + # (not self.process_imbalances) + # and (not self.process_fees) + # and (not self.process_prices) + # ): + # return if self.process_imbalances and token_imbalances: self.handle_imbalances( token_imbalances, tx_hash, auction_id, block_number ) - if self.process_fees: - self.handle_fees( - protocol_fees, - partner_fees, - network_fees, - auction_id, - block_number, - tx_hash, - ) + # if self.process_fees: + # self.handle_fees( + # protocol_fees, + # partner_fees, + # network_fees, + # auction_id, + # block_number, + # tx_hash, + # ) - if self.process_prices and prices: - self.handle_prices(prices, tx_hash, block_number) + # if self.process_prices and prices: + # self.handle_prices(prices, tx_hash, block_number) logger.info("\n".join(self.log_message)) @@ -188,6 +234,39 @@ def process_fees_for_transaction( logger.error(f"Failed to process fees for transaction {tx_hash}: {e}") return {}, {}, {} + def get_prices_for_tokens( + self, + transaction_timestamp: tuple[str, int], + transaction_tokens: list[tuple[str, str]], + ) -> list[tuple[str, int, float, str]]: + """Fetch prices for all transferred tokens.""" + prices: list[tuple[str, int, float, str]] = [] + tx_hash = transaction_timestamp[0] + timestamp = transaction_timestamp[1] + token_addresses = [token_address for _, token_address in transaction_tokens] + block_number = self.blockchain_data.web3.eth.get_transaction_receipt( + HexBytes(tx_hash) + )["blockNumber"] + try: + for token_address in token_addresses: + price_data = self.price_providers.get_price( + set_params(token_address, block_number, tx_hash) + ) + if price_data: + prices += [ + (token_address, timestamp, price, source) + for price, source in price_data + ] + else: + logger.warning( + f"Failed to fetch price for token {token_address} and" + f"transaction {tx_hash}." + ) + except Exception as e: + logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") + + return prices + def process_prices_for_tokens( self, token_imbalances: dict[str, int], @@ -202,7 +281,7 @@ def process_prices_for_tokens( set_params(token_address, block_number, tx_hash) ) if price_data: - price, source = price_data + price, source = price_data[0] prices[token_address] = (price, source) except Exception as e: logger.error(f"Failed to process prices for transaction {tx_hash}: {e}") diff --git a/tests/e2e/test_blockchain_data.py b/tests/e2e/test_blockchain_data.py index 0c21f64..370b33d 100644 --- a/tests/e2e/test_blockchain_data.py +++ b/tests/e2e/test_blockchain_data.py @@ -15,3 +15,32 @@ def tests_get_tx_hashes_blocks(): "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", 20892118, ) + + +def test_get_transaction_timestamp(): + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + blockchain = BlockchainData(web3) + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_timestamp = blockchain.get_transaction_timestamp(tx_hash) + + assert transaction_timestamp == (tx_hash, 1728044411) + + +def test_get_transaction_tokens(): + web3 = Web3(Web3.HTTPProvider(getenv("NODE_URL"))) + blockchain = BlockchainData(web3) + tx_hash = "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + + transaction_tokens = blockchain.get_transaction_tokens(tx_hash) + + assert all(h == tx_hash for h, _ in transaction_tokens) + assert set(token_address for _, token_address in transaction_tokens) == { + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + "0x812Ba41e071C7b7fA4EBcFB62dF5F45f6fA853Ee", + "0xC02aaA39b223FE8D0A0e5C4F27eAD9083C756Cc2", + "0xdAC17F958D2ee523a2206206994597C13D831ec7", + "0xEE2a03Aa6Dacf51C18679C516ad5283d8E7C2637", + "0x72e4f9F808C49A2a61dE9C5896298920Dc4EEEa9", + } diff --git a/tests/e2e/test_transaction_processor.py b/tests/e2e/test_transaction_processor.py new file mode 100644 index 0000000..8c602ef --- /dev/null +++ b/tests/e2e/test_transaction_processor.py @@ -0,0 +1,31 @@ +from os import getenv, environ +from unittest.mock import Mock + +from src.helpers.config import initialize_connections +from src.transaction_processor import TransactionProcessor +from src.helpers.database import Database +from src.helpers.blockchain_data import BlockchainData + + +def tests_process_single_transaction(): + chain_name = "mainnet" + web3, db_engine = initialize_connections() + blockchain = BlockchainData(web3) + db = Database(db_engine, chain_name) + + process_imbalances = True + process_fees = False + process_prices = True + + processor = TransactionProcessor( + blockchain, db, chain_name, process_imbalances, process_fees, process_prices + ) + + # delete data + + # process hash + processor.process_single_transaction( + "0x68e7183363be7460642e78ab09a2898c8aeac6657c2434f7b318f54590c46299", + 9481594, + 20935017, + ) diff --git a/tests/basic_test.py b/tests/legacy/basic_test.py similarity index 100% rename from tests/basic_test.py rename to tests/legacy/basic_test.py diff --git a/tests/compare_imbalances.py b/tests/legacy/compare_imbalances.py similarity index 100% rename from tests/compare_imbalances.py rename to tests/legacy/compare_imbalances.py diff --git a/tests/e2e/test_fees.py b/tests/legacy/test_fees.py similarity index 100% rename from tests/e2e/test_fees.py rename to tests/legacy/test_fees.py diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/test_database.py b/tests/unit/test_database.py new file mode 100644 index 0000000..18d1302 --- /dev/null +++ b/tests/unit/test_database.py @@ -0,0 +1,147 @@ +from datetime import datetime + +from hexbytes import HexBytes +from sqlalchemy import create_engine, text + +from src.helpers.database import Database + + +def tests_write_transaction_timestamp(): + engine = create_engine( + "postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE transaction_timestamp")) + conn.commit() + # write data + db.write_transaction_timestamp( + ( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", + 1728044411, + ) + ) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT tx_hash, time FROM transaction_timestamp") + ).one() + assert ( + "0x" + bytes(res[0]).hex() + == "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c" + ) + assert res[1].timestamp() == 1728044411 + + +def tests_write_transaction_tokens(): + # import has to happen after patching environment variable + + engine = create_engine( + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + transaction_tokens = [ + ( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", + "0x7Fc66500c84A76Ad7e9c93437bFc5Ac33E2DDaE9", + ), + ( + "0xb75e03b63d4f06c56549effd503e1e37f3ccfc3c00e6985a5aacc9b0534d7c5c", + "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + ), + ] + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE transaction_tokens")) + conn.commit() + # write data + db.write_transaction_tokens(transaction_tokens) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT tx_hash, token_address FROM transaction_tokens") + ).all() + for i, (tx_hash, token_address) in enumerate(transaction_tokens): + assert HexBytes(res[i][0]) == HexBytes(tx_hash) + assert HexBytes(res[i][1]) == HexBytes(token_address) + + +def tests_write_prices(): + engine = create_engine( + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + token_prices = [ + ( + "0xA0B86991C6218B36C1D19D4A2E9EB0CE3606EB48", + int(datetime.fromisoformat("2024-10-10 16:48:47.000000").timestamp()), + 0.000420454193230350, + "coingecko", + ), + ( + "0x68BBED6A47194EFF1CF514B50EA91895597FC91E", + int(datetime.fromisoformat("2024-10-10 16:49:47.000000").timestamp()), + 0.000000050569218629, + "moralis", + ), + ] + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE prices")) + conn.commit() + # write data + db.write_prices_new(token_prices) + # read data + with engine.connect() as conn: + res = conn.execute( + text("SELECT token_address, time, price, source FROM prices") + ).all() + for i, (token_address, time, price, source) in enumerate(token_prices): + assert HexBytes(res[i][0]) == HexBytes(token_address) + assert res[i][1].timestamp() == time + assert float(res[i][2]) == price + assert res[i][3] == source + + +def test_get_latest_transaction(): + # import has to happen after patching environment variable + from src.helpers.database import Database + + engine = create_engine( + f"postgresql+psycopg://postgres:postgres@localhost:5432/mainnet" + ) + db = Database(engine, "mainnet") + # truncate table + with engine.connect() as conn: + conn.execute(text("TRUNCATE transaction_timestamp")) + conn.commit() + # check that empty table returns None + assert db.get_latest_transaction() is None + # write data + db.write_transaction_timestamp( + ( + "0x99F10B2DE2B04DFC729B6C46FC5510C44424C213106ED77C80691FA0DD08F3CF", + 1728459935, + ) + ) + db.write_transaction_timestamp( + ( + "0xDFBB14E8F0E47FFC105A16043B2ECF536B323AC3B3B1D319A2D635E392E75BB9", + 1728459995, # latest time stamp + ) + ) + db.write_transaction_timestamp( + ( + "0xF153C9EF2D54C656182B9BD0484B4C1C1A317781656EAF615FA0A92D7C3AFDF7", + 1728459959, + ) + ) + # read data + tx_hash = db.get_latest_transaction() + assert ( + tx_hash + == HexBytes( + "0xDFBB14E8F0E47FFC105A16043B2ECF536B323AC3B3B1D319A2D635E392E75BB9" + ).to_0x_hex() + )