Skip to content

Commit fabc1aa

Browse files
Add EIP-1559 fields (cli, airflow, dataflow) (#96)
* Add fields introduced by EIP-1559 This commit is similar to the EIP-1559 pull request for ethereum-etl: blockchain-etl/ethereum-etl#256 Fields added: - base_fee_per_gas (block) - base fee per gas in protocol, which can move up or down each block according to a formula which is a function of gas used in parent block and gas target (block gas limit divided by elasticity multiplier) of parent block. - max_fee_per_gas (tx) - total fee which covers both the priority fee and the block's network fee per gas - max_priority_fee_per_gas (tx) - maximum fee per gas tx senders are willing to give to miners to incentivize them to include their transaction - transaction_type (tx) - an envelope for future transaction types - effective_gas_price (receipt) - a replacement for gasUsed field * update 'miner' field in web3_response * update existing test data for EIP-1559 fields * add cli tests for EIP-1559 block * Airflow changes for EIP-1559 * Dataflow changes for EIP-1559 * update balances calculation to match ethereum-etl * bump package version * fix effective_gas_price -> receipt_effective_gas_price * delete unused folder `dags/resources/stages/load/` * fix missing `pytz` dependency error * add `block_timestamp` to raw transactions schema * Fix/add ds suffix Based on nansen-ai/evmchain-etl/pull/55 Add ds postfix for load and enrich tasks Add expiration for temp raw tables Remove unused enrichment sql Support load_all_partitions for export files missing EIP-1559 fields Co-authored-by: Jerry <[email protected]>
1 parent b5481fa commit fabc1aa

File tree

79 files changed

+862
-620
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+862
-620
lines changed

airflow/dags/polygonetl_airflow/build_load_dag.py

+31-4
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def add_load_tasks(task, file_format, allow_quoted_newlines=False):
106106
dag=dag
107107
)
108108

109-
def load_task():
109+
def load_task(ds, **kwargs):
110110
client = bigquery.Client()
111111
job_config = bigquery.LoadJobConfig()
112112
schema_path = os.path.join(dags_folder, 'resources/stages/raw/schemas/{task}.json'.format(task=task))
@@ -119,13 +119,35 @@ def load_task():
119119
job_config.ignore_unknown_values = True
120120

121121
export_location_uri = 'gs://{bucket}/export'.format(bucket=output_bucket)
122-
uri = '{export_location_uri}/{task}/*.{file_format}'.format(
123-
export_location_uri=export_location_uri, task=task, file_format=file_format)
124-
table_ref = client.dataset(dataset_name_raw).table(task)
122+
if load_all_partitions:
123+
# Support export files that are missing EIP-1559 fields (exported before EIP-1559 upgrade)
124+
job_config.allow_jagged_rows = True
125+
126+
uri = "{export_location_uri}/{task}/*.{file_format}".format(
127+
export_location_uri=export_location_uri,
128+
task=task,
129+
file_format=file_format,
130+
)
131+
table_ref = client.dataset(dataset_name_raw).table(task)
132+
else:
133+
uri = "{export_location_uri}/{task}/block_date={ds}/*.{file_format}".format(
134+
export_location_uri=export_location_uri,
135+
task=task,
136+
ds=ds,
137+
file_format=file_format,
138+
)
139+
table_name = f'{task}_{ds.replace("-", "_")}'
140+
table_ref = client.dataset(dataset_name_raw).table(table_name)
141+
125142
load_job = client.load_table_from_uri(uri, table_ref, job_config=job_config)
126143
submit_bigquery_job(load_job, job_config)
127144
assert load_job.state == 'DONE'
128145

146+
if not load_all_partitions:
147+
table = client.get_table(table_ref)
148+
table.expires = datetime.now() + timedelta(days=3)
149+
client.update_table(table, ["expires"])
150+
129151
load_operator = PythonOperator(
130152
task_id='load_{task}'.format(task=task),
131153
python_callable=load_task,
@@ -142,6 +164,11 @@ def enrich_task(ds, **kwargs):
142164
template_context['ds'] = ds
143165
template_context['params'] = environment
144166

167+
if load_all_partitions or always_load_all_partitions:
168+
template_context["params"]["ds_postfix"] = ""
169+
else:
170+
template_context["params"]["ds_postfix"] = "_" + ds.replace("-", "_")
171+
145172
client = bigquery.Client()
146173

147174
# Need to use a temporary table because bq query sets field modes to NULLABLE and descriptions to null

airflow/dags/resources/stages/enrich/schemas/blocks.json

+6-1
Original file line numberDiff line numberDiff line change
@@ -92,5 +92,10 @@
9292
"name": "transaction_count",
9393
"type": "INT64",
9494
"description": "The number of transactions in the block"
95+
},
96+
{
97+
"name": "base_fee_per_gas",
98+
"type": "INT64",
99+
"description": "Protocol base fee per gas, which can move up or down"
95100
}
96-
]
101+
]

airflow/dags/resources/stages/enrich/schemas/transactions.json

+21-1
Original file line numberDiff line numberDiff line change
@@ -90,5 +90,25 @@
9090
"type": "STRING",
9191
"mode": "REQUIRED",
9292
"description": "Hash of the block where this transaction was in"
93+
},
94+
{
95+
"name": "max_fee_per_gas",
96+
"type": "INT64",
97+
"description": "Total fee that covers both base and priority fees"
98+
},
99+
{
100+
"name": "max_priority_fee_per_gas",
101+
"type": "INT64",
102+
"description": "Fee given to miners to incentivize them to include the transaction"
103+
},
104+
{
105+
"name": "transaction_type",
106+
"type": "INT64",
107+
"description": "Transaction type. One of 0 (Legacy), 1 (Legacy), 2 (EIP-1559)"
108+
},
109+
{
110+
"name": "receipt_effective_gas_price",
111+
"type": "INT64",
112+
"description": "The actual value per gas deducted from the senders account. Replacement of gas_price after EIP-1559"
93113
}
94-
]
114+
]

airflow/dags/resources/stages/enrich/sqls/amended_tokens.sql

-30
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
with double_entry_book as (
22
-- debits
33
select to_address as address, CAST(value AS FLOAT64) as value
4-
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces`
4+
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces{{params.ds_postfix}}`
55
where true
66
and date(block_timestamp) <= '{{ds}}'
77
and to_address is not null
@@ -10,27 +10,31 @@ with double_entry_book as (
1010
union all
1111
-- credits
1212
select from_address as address, -CAST(value AS FLOAT64) as value
13-
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces`
13+
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.traces{{params.ds_postfix}}`
1414
where true
1515
and date(block_timestamp) <= '{{ds}}'
1616
and from_address is not null
1717
and status = 1
1818
and (call_type not in ('delegatecall', 'callcode', 'staticcall') or call_type is null)
1919
union all
2020
-- transaction fees debits
21-
select miner as address, sum(cast(receipt_gas_used as numeric) * cast(gas_price as numeric)) as value
22-
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions` as transactions
23-
join `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.blocks` as blocks on blocks.number = transactions.block_number
21+
select
22+
miner as address,
23+
sum(cast(receipt_gas_used as numeric) * cast((receipt_effective_gas_price - coalesce(base_fee_per_gas, 0)) as numeric)) as value
24+
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions{{params.ds_postfix}}` as transactions
25+
join `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.blocks{{params.ds_postfix}}` as blocks on blocks.number = transactions.block_number
2426
where true
2527
and date(transactions.block_timestamp) <= '{{ds}}'
26-
group by blocks.miner
28+
group by blocks.number, blocks.miner
2729
union all
2830
-- transaction fees credits
29-
select from_address as address, -(cast(receipt_gas_used as numeric) * cast(gas_price as numeric)) as value
30-
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions`
31+
select
32+
from_address as address,
33+
-(cast(receipt_gas_used as numeric) * cast(receipt_effective_gas_price as numeric)) as value
34+
from `{{params.destination_dataset_project_id}}.{{params.dataset_name}}.transactions{{params.ds_postfix}}`
3135
where true
3236
and date(block_timestamp) <= '{{ds}}'
3337
)
3438
select address, sum(value) as eth_balance
3539
from double_entry_book
36-
group by address
40+
group by address

airflow/dags/resources/stages/enrich/sqls/blocks.sql

+3-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,9 @@ SELECT
1616
blocks.extra_data,
1717
blocks.gas_limit,
1818
blocks.gas_used,
19-
blocks.transaction_count
20-
FROM {{params.dataset_name_raw}}.blocks AS blocks
19+
blocks.transaction_count,
20+
blocks.base_fee_per_gas
21+
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
2122
where true
2223
{% if not params.load_all_partitions %}
2324
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'

airflow/dags/resources/stages/enrich/sqls/contracts.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ SELECT
77
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
88
blocks.number AS block_number,
99
blocks.hash AS block_hash
10-
FROM {{params.dataset_name_raw}}.contracts AS contracts
11-
JOIN {{params.dataset_name_raw}}.blocks AS blocks ON contracts.block_number = blocks.number
10+
FROM {{params.dataset_name_raw}}.contracts{{params.ds_postfix}} AS contracts
11+
JOIN {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks ON contracts.block_number = blocks.number
1212
where true
1313
{% if not params.load_all_partitions %}
1414
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'

airflow/dags/resources/stages/enrich/sqls/logs.sql

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ SELECT
88
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
99
blocks.number AS block_number,
1010
blocks.hash AS block_hash
11-
FROM {{params.dataset_name_raw}}.blocks AS blocks
12-
JOIN {{params.dataset_name_raw}}.logs AS logs ON blocks.number = logs.block_number
11+
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
12+
JOIN {{params.dataset_name_raw}}.logs{{params.ds_postfix}} AS logs ON blocks.number = logs.block_number
1313
where true
1414
{% if not params.load_all_partitions %}
1515
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'
16-
{% endif %}
16+
{% endif %}

airflow/dags/resources/stages/enrich/sqls/merge/merge_blocks.sql

+4-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ insert (
2020
extra_data,
2121
gas_limit,
2222
gas_used,
23-
transaction_count
23+
transaction_count,
24+
base_fee_per_gas
2425
) values (
2526
timestamp,
2627
number,
@@ -39,7 +40,8 @@ insert (
3940
extra_data,
4041
gas_limit,
4142
gas_used,
42-
transaction_count
43+
transaction_count,
44+
base_fee_per_gas
4345
)
4446
when not matched by source and date(timestamp) = '{{ds}}' then
4547
delete

airflow/dags/resources/stages/enrich/sqls/merge/merge_transactions.sql

+10-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ insert (
1919
receipt_status,
2020
block_timestamp,
2121
block_number,
22-
block_hash
22+
block_hash,
23+
max_fee_per_gas,
24+
max_priority_fee_per_gas,
25+
transaction_type,
26+
receipt_effective_gas_price
2327
) values (
2428
`hash`,
2529
nonce,
@@ -37,7 +41,11 @@ insert (
3741
receipt_status,
3842
block_timestamp,
3943
block_number,
40-
block_hash
44+
block_hash,
45+
max_fee_per_gas,
46+
max_priority_fee_per_gas,
47+
transaction_type,
48+
receipt_effective_gas_price
4149
)
4250
when not matched by source and date(block_timestamp) = '{{ds}}' then
4351
delete

airflow/dags/resources/stages/enrich/sqls/token_transfers.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,8 @@ SELECT
88
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
99
blocks.number AS block_number,
1010
blocks.hash AS block_hash
11-
FROM {{params.dataset_name_raw}}.blocks AS blocks
12-
JOIN {{params.dataset_name_raw}}.token_transfers AS token_transfers ON blocks.number = token_transfers.block_number
11+
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
12+
JOIN {{params.dataset_name_raw}}.token_transfers{{params.ds_postfix}} AS token_transfers ON blocks.number = token_transfers.block_number
1313
where true
1414
{% if not params.load_all_partitions %}
1515
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'

airflow/dags/resources/stages/enrich/sqls/tokens.sql

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ SELECT
77
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
88
blocks.number AS block_number,
99
blocks.hash AS block_hash
10-
FROM {{params.dataset_name_raw}}.blocks AS blocks
11-
JOIN {{params.dataset_name_raw}}.tokens AS tokens ON blocks.number = tokens.block_number
10+
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
11+
JOIN {{params.dataset_name_raw}}.tokens{{params.ds_postfix}} AS tokens ON blocks.number = tokens.block_number
1212
where true
1313
{% if not params.load_all_partitions %}
1414
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'

airflow/dags/resources/stages/enrich/sqls/traces.sql

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,9 @@ SELECT
1919
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
2020
blocks.number AS block_number,
2121
blocks.hash AS block_hash
22-
FROM {{params.dataset_name_raw}}.blocks AS blocks
23-
JOIN {{params.dataset_name_raw}}.traces AS traces ON blocks.number = traces.block_number
24-
JOIN {{params.dataset_name_raw}}.transactions AS transactions
22+
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
23+
JOIN {{params.dataset_name_raw}}.traces{{params.ds_postfix}} AS traces ON blocks.number = traces.block_number
24+
JOIN {{params.dataset_name_raw}}.transactions{{params.ds_postfix}} AS transactions
2525
ON traces.transaction_index = transactions.transaction_index
2626
and traces.block_number = transactions.block_number
2727
where true

airflow/dags/resources/stages/enrich/sqls/transactions.sql

+8-4
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@ SELECT
1515
receipts.status AS receipt_status,
1616
TIMESTAMP_SECONDS(blocks.timestamp) AS block_timestamp,
1717
blocks.number AS block_number,
18-
blocks.hash AS block_hash
19-
FROM {{params.dataset_name_raw}}.blocks AS blocks
20-
JOIN {{params.dataset_name_raw}}.transactions AS transactions ON blocks.number = transactions.block_number
21-
JOIN {{params.dataset_name_raw}}.receipts AS receipts ON transactions.hash = receipts.transaction_hash
18+
blocks.hash AS block_hash,
19+
transactions.max_fee_per_gas,
20+
transactions.max_priority_fee_per_gas,
21+
transactions.transaction_type,
22+
receipts.effective_gas_price as receipt_effective_gas_price
23+
FROM {{params.dataset_name_raw}}.blocks{{params.ds_postfix}} AS blocks
24+
JOIN {{params.dataset_name_raw}}.transactions{{params.ds_postfix}} AS transactions ON blocks.number = transactions.block_number
25+
JOIN {{params.dataset_name_raw}}.receipts{{params.ds_postfix}} AS receipts ON transactions.hash = receipts.transaction_hash
2226
where true
2327
{% if not params.load_all_partitions %}
2428
and date(timestamp_seconds(blocks.timestamp)) = '{{ds}}'

0 commit comments

Comments
 (0)