Skip to content

Commit d5710e7

Browse files
authored
Merge pull request #1 from BCWResearch/main
Updates from maintenance
2 parents 7bcdb11 + 92668b7 commit d5710e7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+10550
-449
lines changed
+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
name: extractor-build
2+
on:
3+
workflow_call:
4+
5+
jobs:
6+
build:
7+
runs-on: ubuntu-22.04
8+
env:
9+
GH_TOKEN: ${{ secrets.RELEASE_TOKEN }}
10+
steps:
11+
- name: Install required tools
12+
run: |
13+
sudo apt-get update
14+
sudo apt-get install -y git unzip protobuf-compiler build-essential curl
15+
16+
- uses: actions/checkout@v4
17+
with:
18+
submodules: recursive
19+
20+
- uses: arduino/setup-protoc@v1
21+
- uses: actions-rs/toolchain@v1
22+
with:
23+
toolchain: stable
24+
25+
- name: Build
26+
run: |
27+
export RUSTFLAGS="-C target-cpu=x86-64"
28+
cargo build --release --features "SOLANA,RABBITMQ_CLASSIC,SINGLE_PUBLISHER"
29+
30+
- name: Upload Release Asset
31+
env:
32+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
33+
run: |
34+
gh release upload ${{ github.event.release.tag_name }} target/release/blockchain_etl_indexer --clobber

.github/workflows/inserter-build.yaml

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
name: inserter-build
2+
on:
3+
workflow_call:
4+
5+
env:
6+
GOMOD: ${{ github.workspace }}/storage-write/deprecated/go.mod
7+
GOMODULE_DIR: ${{ github.workspace }}/storage-write/deprecated
8+
GO_VERSION: '1.20.x'
9+
PROTOC_VERSION: '23.2'
10+
11+
jobs:
12+
build:
13+
runs-on: ubuntu-22.04
14+
steps:
15+
- uses: actions/checkout@v4
16+
- name: Setup Go
17+
uses: actions/setup-go@v5
18+
with:
19+
go-version: ${{ env.GO_VERSION }}
20+
21+
- name: Install dependencies
22+
run: |
23+
sudo apt-get update && sudo apt-get install -y g++
24+
go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
25+
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest
26+
cd ${{ env.GOMODULE_DIR }}
27+
go mod tidy
28+
go mod download
29+
30+
- name: Download and install protoc
31+
run: |
32+
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v${{ env.PROTOC_VERSION }}/protoc-${{ env.PROTOC_VERSION }}-linux-x86_64.zip
33+
unzip protoc-${{ env.PROTOC_VERSION }}-linux-x86_64.zip -d $HOME/.local
34+
35+
- name: Build
36+
env:
37+
CGO_ENABLED: 0
38+
GOOS: linux
39+
run: |
40+
cd ${{ env.GOMODULE_DIR }}
41+
go generate
42+
go build -o ${{ github.workspace }}/target/release/blockchain_etl_inserter main.go
43+
44+
- name: Upload Release Asset
45+
env:
46+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
47+
run: |
48+
gh release upload ${{ github.event.release.tag_name }} target/release/blockchain_etl_inserter --clobber
49+

.github/workflows/pipeline.yaml

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
name: main-build
2+
on:
3+
release:
4+
types: [created]
5+
6+
jobs:
7+
determine-job:
8+
runs-on: ubuntu-latest
9+
outputs:
10+
run-extractor-job: ${{ steps.check-ref.outputs.run_extractor }}
11+
run-inserter-job: ${{ steps.check-ref.outputs.run_inserter }}
12+
steps:
13+
- name: Check tag pattern
14+
id: check-ref
15+
run: |
16+
if [[ "${GITHUB_REF##*/}" == extractor* ]]; then
17+
echo "::set-output name=run_extractor::true"
18+
else
19+
echo "::set-output name=run_extractor::false"
20+
fi
21+
if [[ "${GITHUB_REF##*/}" == inserter* ]]; then
22+
echo "::set-output name=run_inserter::true"
23+
else
24+
echo "::set-output name=run_inserter::false"
25+
fi
26+
27+
trigger-extractor-build:
28+
needs: determine-job
29+
if: needs.determine-job.outputs.run-extractor-job == 'true'
30+
uses: ./.github/workflows/extractor-build.yaml
31+
32+
33+
trigger-inserter-build:
34+
needs: determine-job
35+
if: needs.determine-job.outputs.run-inserter-job == 'true'
36+
uses: ./.github/workflows/inserter-build.yaml
37+

Cargo.toml

-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ prost-build = { version = "0.12.1" }
6868
lto = true
6969

7070
[features]
71-
default = ["SOLANA", "SEPARATE_PUBLISHERS"]
7271
# Solana features
7372
SOLANA_BIGTABLE = ["SOLANA", "dep:solana-storage-bigtable"]
7473
SOLANA = [

src/README.md

+161
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
# ETL-Core Documentation
2+
Here we should explain what the ETL-Core is and how it works.
3+
4+
## ETL Infrastructure Architecture
5+
### Architecture Framework
6+
The `etl-core` repository will serve as the primary engine for ETL actions, operating at the network level and service level, and can accept custom configurations. Developers will be able to set up custom configurations within `etl-core`. Once the network and export service is selected, users can use `etl-core` to export the desired blockchain data.
7+
8+
9+
Currently, the Solana blockchain is supported in [etl-solana-config](https://github.com/BCWResearch/etl-solana-config).
10+
11+
### Macro Infrastructure
12+
An RPC node is expected to serve requests. Blocks are continually requested using the node, and if necessary, other data such as accounts may be requested as well. Upon response, the data is converted into a Protocol Buffers data format and sent to a streaming queue, such as Google Cloud Pub/Sub or RabbitMQ. You will need a transformer and loader that listens for the messages, transforms them to match the table schema, and inserts them into BigQuery.
13+
14+
## Response Deserialization
15+
To deserialize JSON responses from the blockchain node, we expect the blockchain configuration to specify the structure of the response in a Rust `struct` and annotate it with the `Deserialize` macro from the `serde` library. This macro generates deserialization code for the developer which eases development, but more importantly allows us to deserialize it with the `simd-json` library.
16+
17+
The `simd-json` library uses CPU vector extensions for accelerated JSON deserialization. Currently, the library supports x86 and ARM vector extensions, but falls back to standard deserialization if used on a system that doesn't support SIMD.
18+
* Since x86's AVX2 is 256-bit, while ARM's NEON is 128-bit, *you can expect best performance on x86*.
19+
* This library is only used when compiled in the `release` profile, because its error messages are less descriptive. For development, it is recommended that you compile in debug mode (the default profile), which will use the `serde` deserializer, thus providing more descriptive errors.
20+
21+
## Environmental Variables
22+
### Synopsis
23+
24+
You can define enviornmental variables in a `.env` file. Examples are illustrated in `.env.example.`
25+
26+
### Variables
27+
- `ENDPOINT`
28+
**Required**. Specifies the address to use for json RPC requests.
29+
30+
- `FALLBACK_ENDPOINT`
31+
**Required**. Specifies the address to use for json RPC requests, when the primary endpoint is failing. This value can be the same `ENDPOINT`.
32+
33+
- `NUM_EXTRACTOR_THREADS`
34+
**Required**. Specifies the number of concurrent threads to run an extract job.
35+
36+
- `ENABLE_METRICS`
37+
**Required**. This variable determines whether to launch a metrics server to collect metrics for Prometheus.
38+
39+
- `METRICS_ADDRESS`
40+
Optional. Required only if `ENABLE_METRICS` is true. Specifies the address of the metrics server.
41+
42+
- `METRICS_PORT`
43+
Optional. Required only if `ENABLE_METRICS` is true. Specifies the port of the metrics server.
44+
45+
- `RABBITMQ_ADDRESS`
46+
Optional. Required only if _STREAM_EXPORTER_ is set to `RABBITMQ_STREAM`. Specifies the address of RabbitMQ.
47+
48+
- `RABBITMQ_PORT`
49+
Optional. Required only if _STREAM_EXPORTER_ is set to `RABBITMQ_STREAM`. Specifies the port of RabbitMQ.
50+
51+
- `BIGTABLE_CRED`
52+
Optional. Specifies the file path of the credential file required to access GCP Bigtable.
53+
54+
- `GCP_CREDENTIALS_JSON_PATH`
55+
Optional. Required only if _STREAM_EXPORTER_ is set to `GOOGLE_PUBSUB`. Specifies the file path of the credential file required to access Google Pubsub.
56+
57+
- `GOOGLE_PUBSUB_TOPIC`
58+
Optional. Required only if _STREAM_EXPORTER_ is set to `GOOGLE_PUBSUB`. Specifies the Google Pubsub topic to be used during exporting. It is assumed that the PubSub Topic is already created.
59+
60+
## Data Extraction
61+
62+
All RPC requests are retried with backoff upon failure, with failures logged at the `warning` level.
63+
64+
Blocks are requested from the node by the `call_getBlock()` function.
65+
66+
The `call_getBlockHeight()` function requests the current block height.
67+
68+
The `call_getMultipleAccounts()` function requests account data for a list of pubkeys. These pubkeys come from the created accounts and token mints in the block data.
69+
70+
The blockchain configuration is expected to define the HTTP requests that these functions make in a `<BLOCKCHAIN_CONFIG>/types/request_types.rs` file. These requests should be specified using `struct`s called `BlockHeightRequest` and `BlockRequest`, and should implement `serde::Serialize`. It is recommended that you annotate the struct with `#[derive(serde::Serialize)]` to simplify this process and generate the code.
71+
72+
### Concurrency
73+
74+
The master thread continually sends slot values to a concurrent queue for worker threads to index.
75+
76+
Long-lived threads are created at the start of runtime by the master thread, and continually pull tasks (slot values) from the concurrent queue. Each thread makes requests to the node for the block data at that slot, then deserializes the response, and transmits the data to a stream queue.
77+
* For communication with the stream queue (which supports concurrent producers), each thread serializes its data using the protocol buffers interface, and transmits the information.
78+
79+
## Features
80+
81+
### Synopsis
82+
83+
You can either define `--features` in the `Cargo.toml` file inside the `etl-core` repository or specify them as part of a command.
84+
85+
`cargo build --features ARGS...`
86+
`cargo run --features ARGS...`
87+
88+
The `--features` option is required to build or run the ETL project.
89+
90+
### Arguments
91+
92+
Currently, the following blockchains are supported:
93+
- `SOLANA`
94+
95+
A message queue is required to be specified:
96+
- `RABBITMQ` - a classic RabbitMQ queue
97+
- `RABBITMQ_STREAM` - a RabbitMQ with Stream Queue plugin
98+
- `GOOGLE_PUBSUB` - Google Cloud Pub/Sub
99+
100+
### Examples
101+
102+
1. Build the local project and its dependencies for the _SOLANA_ blockchain
103+
```
104+
cargo build --release --features SOLANA,RABBITMQ_STREAM
105+
```
106+
107+
2. Run the local project and its dependencies for the _SOLANA_blockchain and _RABBITMQ_STREAM_ exporter
108+
```
109+
cargo run --features SOLANA,RABBITMQ_STREAM
110+
```
111+
112+
## Limitations
113+
- Only limited number of `Token-2022 Program` information is extracted.
114+
- `SOLANA_BIGTABLE` feature can only request 1000 confirmed slots each time.
115+
116+
## Project Progress
117+
118+
### Deployment Method
119+
| Metrics | Development Status |
120+
| ---------------------- | ------------------ |
121+
| Dockerfile | In Development |
122+
| Helm Chart | In Development |
123+
124+
### Export Method
125+
| Metrics | Development Status |
126+
| ---------------------- | ------------------ |
127+
| CSV | Completed |
128+
| Google PubSub | Completed |
129+
| RabbitMQ | Completed |
130+
131+
### Extraction Source
132+
| Metrics | Development Status |
133+
| ---------------------- | ------------------ |
134+
| Bigtable | Completed |
135+
| JSON RPC | Completed |
136+
137+
### Metrics Collection
138+
| Metrics | Development Status |
139+
| ---------------------- | ------------------ |
140+
| Block Request Count | In Development |
141+
| Failed Block Count | Not Started |
142+
143+
### Tables
144+
| Table | Development Status |
145+
| ---------------- | ------------------ |
146+
| Accounts | Completed |
147+
| Blocks | Completed |
148+
| Instructions | Completed |
149+
| Tokens | Completed |
150+
| Token Transfers | Completed |
151+
| Transactions | Completed |
152+
153+
154+
## Protocol Buffers
155+
156+
We use protocol buffers to serialize our data for transmission to a pub/sub system like RabbitMQ or Google Cloud Pub/Sub.
157+
158+
Some blockchains provide their own protobuf interfaces, so when possible, we will attempt to use those.
159+
160+
### Codegen
161+
To generate Rust code from our protobuf interface, we use the `PROST` library. This is a popular library for Rust, and is used by the Solana blockchain with their official "storage" protobuf. We perform this codegen at compile time, using a custom Rust build script: `build_proto.rs`. This script uses the `include!` macro to import the protobuf build script from the blockchain-specific configuration. It is expected that each blockchain config will define its own protobuf build script.

src/features.rs

+7-13
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
/// This module consists of feature checks during compilation that will raise compiler errors if the feature
2-
/// selection is invalid.This module will raise a compiler error for commonly known feature selection
3-
/// contradictions (like using `RABBITMQ_QUEUE` and `RABBITMQ_STREAM` and when a key feature selection is
2+
/// selection is invalid.This module will raise a compiler error for commonly known feature selection
3+
/// contradictions (like using `RABBITMQ_QUEUE` and `RABBITMQ_STREAM` and when a key feature selection is
44
/// missing (i.e. no block chain feature selected like `SOLANA`).
5-
///
5+
///
66
/// Feature contradiction / requirements should be added to this module as they are created.
77
88
#[cfg(all(feature = "RPC", feature = "REST"))]
@@ -19,21 +19,15 @@ compile_error!("Features `GOOGLE_PUBSUB` and `RABBITMQ_CLASSIC` are mutually exc
1919
#[cfg(all(feature = "RABBITMQ_CLASSIC", feature = "RABBITMQ_STREAM"))]
2020
compile_error!("Features `RABBITMQ_CLASSIC` and `RABBITMQ_STREAM` are mutually exclusive. Please select only one.");
2121

22-
#[cfg(not(any(
23-
feature = "GOOGLE_PUBSUB",
24-
feature = "RABBITMQ_STREAM",
25-
feature = "RABBITMQ_CLASSIC",
26-
feature = "JSONL",
27-
feature = "JSON"
28-
)))]
29-
compile_error!("Either `JSONL`, `JSON`, `GOOGLE_PUBSUB`, `RABBITMQ_STREAM`, or `RABBITMQ_CLASSIC` must be enabled.");
22+
#[cfg(not(any(feature = "GOOGLE_PUBSUB", feature = "RABBITMQ_STREAM", feature = "RABBITMQ_CLASSIC")))]
23+
compile_error!("Either `GOOGLE_PUBSUB`, `RABBITMQ_STREAM`, or `RABBITMQ_CLASSIC` must be enabled.");
3024

3125
// Makes sure we either have one or multiple publishers
3226

33-
#[cfg(all(feature = "SINGLE_PUBLISHER", feature = "SEPARATE_PUBLISHERS"))]
27+
#[cfg(all(feature = "SINGLE_PUBLISHER", feature="SEPARATE_PUBLISHERS"))]
3428
compile_error!("Features `SINGLE_PUBLISHER` and `SEPARATE_PUBLISHERS` are mutually exclusive. Please select only one.");
3529

36-
#[cfg(not(any(feature = "SINGLE_PUBLISHER", feature = "SEPARATE_PUBLISHERS")))]
30+
#[cfg(not(any(feature="SINGLE_PUBLISHER", feature="SEPARATE_PUBLISHERS")))]
3731
compile_error!("Either `SINGLE_PUBLISHER` or `SEPARATE_PUBLISHERS` must be enabled");
3832

3933
// for now, solana is the only supported blockchain.

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
#![doc = include_str!("README.md")]
2+
13
mod constants;
24
pub mod metrics;
35
pub mod output;

src/main.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ fn read_block_list_csv(file_path: &Path) -> Box<dyn Iterator<Item = u64>> {
105105
first_line
106106
.split(',')
107107
.all(|field| field.parse::<u64>().is_err())
108+
&& first_line.trim().parse::<u64>().is_err()
108109
};
109110

110111
// create the csv reader with the apparent header setting
@@ -353,7 +354,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
353354
.await
354355
.unwrap();
355356

356-
#[cfg(not(any(feature = "JSON", feature = "JSONL")))]
357357
publisher.disconnect().await;
358358
}
359359
}
@@ -397,8 +397,6 @@ async fn main() -> Result<(), Box<dyn Error>> {
397397
)
398398
.await
399399
.unwrap();
400-
401-
#[cfg(not(any(feature = "JSON", feature = "JSONL")))]
402400
publisher.disconnect().await;
403401
}
404402
}

src/output/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Output Publishers
22

3-
Here we define structs to represent an output publisher (like Google Cloud Pub/Sub, RabbitMQ, JSON, etc). Each one is represented by the same struct, however depending on which feature is activated, we implement the necessary methods to run the publisher.
3+
Here we define structs to represent an output publisher (like BigQuerry, RabbitMQ, etc). Each one is represnted by the same struct, however depending on which feature is activated, we implement the necessary methods to run the publisher.

src/output/google_pubsub.rs

+4-8
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ use google_cloud_pubsub::{
1414
publisher::Publisher,
1515
};
1616

17-
use prost::Message;
18-
1917
use super::publish::{StreamPublisherConnection, StreamPublisherConnectionClient};
2018

2119
/// Establishes the connection to the Google Cloud Pub/Sub extracting the credentials
@@ -160,14 +158,12 @@ async fn publish_batch_with_backoff(publisher: &Publisher, messages: Vec<PubsubM
160158

161159
impl StreamPublisherConnection {
162160
/// Sends the message to the client
163-
pub async fn publish<T: Message>(&self, msg: T) {
164-
self.client.publish(msg.encode_to_vec()).await;
161+
pub async fn publish(&self, msg: Vec<u8>) {
162+
self.client.publish(msg).await;
165163
}
166164
/// Sends the messages to the client
167-
pub async fn publish_batch<T: Message>(&self, msgs: Vec<T>) {
168-
self.client
169-
.publish_batch(msgs.iter().map(|msg| msg.encode_to_vec()).collect())
170-
.await;
165+
pub async fn publish_batch(&self, msgs: Vec<Vec<u8>>) {
166+
self.client.publish_batch(msgs).await;
171167
}
172168

173169
/// Sends the message to the client

0 commit comments

Comments
 (0)