Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist blocks and state transitions in the database #45

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ jobs:
docker pull ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer
docker run -d --name postgres -e POSTGRES_PASSWORD=indexer -e POSTGRES_USER=indexer -e POSTGRES_DB=indexer -p 5432:5432 postgres
sleep 10
docker run --rm -e DATABASE_URL=postgres://indexer:[email protected]:5432/indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer refinery migrate -e DATABASE_URL -p /app/migrations -t 1
docker run --rm --env-file api.env ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer refinery migrate -e DATABASE_URL -p /app/migrations -t 1
docker run -d -p 3005:3005 --restart always --env-file api.env --name platform-explorer-api ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:api
sleep 3
docker run -d --name platform-explorer-indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer cargo run
docker run -d --env-file api.env --name platform-explorer-indexer ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:indexer cargo run
4 changes: 4 additions & 0 deletions packages/indexer/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
POSTGRES_HOST=127.0.0.1
POSTGRES_DB=indexer
POSTGRES_USER=indexer
POSTGRES_PASS=indexer
21 changes: 21 additions & 0 deletions packages/indexer/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions packages/indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ mockall = { version="0.11.3", optional=true}
base64 = "0.21.2"
deadpool-postgres = "0.10.5"
refinery = { version = "0.8.10", features = ["postgres"] }
sha256 = "1.4.0"
dotenv = "0.15.0"

[features]
default = ["fixtures-and-mocks"]
Expand Down
18 changes: 17 additions & 1 deletion packages/indexer/migrations/V1__initial.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
CREATE TABLE data_contracts (
id SERIAL PRIMARY KEY,
identifier varchar(255),
CONSTRAINT production UNIQUE(identifier)
CONSTRAINT identifier_unique UNIQUE(identifier)
);

CREATE TABLE blocks (
id SERIAL PRIMARY KEY,
hash varchar(255) NOT NULL,
block_height int NOT NULL,
CONSTRAINT block_hash UNIQUE(hash)
);

CREATE TABLE state_transitions (
id SERIAL PRIMARY KEY,
hash varchar(255) NOT NULL,
data TEXT NOT NULL,
type int NOT NULL,
block_id int NOT NULL references blocks(id),
CONSTRAINT state_transition_hash UNIQUE(hash)
);
101 changes: 47 additions & 54 deletions packages/indexer/src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -1,86 +1,79 @@
use std::any::Any;
use std::cell::Cell;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use dpp::state_transition::StateTransition;
use crate::decoder::decoder::StateTransitionDecoder;
use base64::{Engine as _, engine::{general_purpose}};
use crate::models::{PlatformExplorerSearchResponse, PlatformExplorerStatusResponse};
use futures::stream;
use tokio::{task, time};
use tokio::time::{Instant, Interval};
use crate::models::{PlatformExplorerSearchResponse, PlatformExplorerStatusResponse, TDBlock, TDBlockHeader};
use crate::processor::psql::PSQLProcessor;
use crate::processor::STProcessorLike;

pub struct Indexer {
decoder: StateTransitionDecoder,
processor: PSQLProcessor,
last_block_height: Cell<i32>,
}

/**
Indexer is responsible for indexing platform chain data.
Indexer is responsible for indexing platform chain data.
It sync up with the network and sends incoming state transitions events to the lower level
**/
impl Indexer {
pub fn new() -> Indexer {
let decoder = StateTransitionDecoder::new();
let processor = PSQLProcessor::new();

return Indexer { decoder, processor };
return Indexer { processor, last_block_height: Cell::new(1) };
}

pub async fn start(&self) {
self.init().await;
println!("Indexer loop started");

println!("Indexer started");
let mut interval = time::interval(Duration::from_millis(3000));

loop {
interval.tick().await;

let current_block_height:i32 = self.last_block_height.get();
let last_block_height:i32 = self.fetch_last_block().await;

let diff = last_block_height.clone() - current_block_height.clone();

if diff > 0 {
for block_height in current_block_height..last_block_height+1 {
self.index_block(block_height).await;
}
}
}
}

async fn init(&self) {
let resp = reqwest::get("https://platform-explorer-api.rd.dash.org/status")
async fn index_block(&self, block_height: i32) {
let url = format!("https://platform-explorer-api.rd.dash.org/search?query={}", &block_height);

let resp = reqwest::get(url)
.await
.unwrap()
.json::<PlatformExplorerStatusResponse>()
.json::<PlatformExplorerSearchResponse>()
.await
.unwrap();

let txs = resp.block.block.data.txs;
let hash = resp.block.block_id.hash;

let blocks_count = resp.blocks_count.parse::<i32>().unwrap();
println!("Latest platform block: {}", &blocks_count);
let block = TDBlock{txs: txs.clone(), header: TDBlockHeader{hash: hash.clone(), block_height: block_height.clone(), tx_count: txs.len() as i32 }};

for _i in 1..blocks_count {
let url = format!("https://platform-explorer-api.rd.dash.org/search?query={}", _i);
self.processor.handle_block(block).await;

let resp = reqwest::get(url)
.await
.unwrap()
.json::<PlatformExplorerSearchResponse>()
.await
.unwrap();
self.last_block_height.set(block_height);
}

let txs = resp.block.block.data.txs;
let block_height = resp.block.block.header.height;
async fn fetch_last_block(&self) -> i32 {
let resp = reqwest::get("https://platform-explorer-api.rd.dash.org/status")
.await
.unwrap()
.json::<PlatformExplorerStatusResponse>()
.await
.unwrap();

if txs.len() == usize::try_from(0).unwrap() {
println!("No platform transactions at block height {}", &block_height);
}
let blocks_count = resp.blocks_count.parse::<i32>().unwrap();

for tx_base_64 in txs.iter() {
let bytes = general_purpose::STANDARD.decode(tx_base_64).unwrap();
let st_result = self.decoder.decode(bytes).await;

let st_type = match st_result {
Ok(st) => match st {
StateTransition::DataContractCreate(state_transition) => {
self.processor.handle_data_contract_create(state_transition).await;
"DataContractCreate"
}
StateTransition::DataContractUpdate(_) => "DataContractUpdate",
StateTransition::DocumentsBatch(_) => "DocumentsBatch",
StateTransition::IdentityCreate(_) => "IdentityCreate",
StateTransition::IdentityTopUp(_) => "IdentityTopUp",
StateTransition::IdentityCreditWithdrawal(_) => "DataContractCreate",
StateTransition::IdentityUpdate(_) => "IdentityUpdate",
StateTransition::IdentityCreditTransfer(_) => "IdentityCreditTransfer"
}
Err(e) => "UnknownStateTransition"
};
println!("{} happened at block height {}", &st_type, &block_height);
}
}
return blocks_count;
}
}

6 changes: 3 additions & 3 deletions packages/indexer/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::io;
use dotenv::dotenv;

mod indexer;
mod decoder;
Expand All @@ -7,10 +8,9 @@ mod processor;

#[tokio::main]
async fn main() {
dotenv().ok(); // This line loads the environment variables from the ".env" file

let indexer = indexer::Indexer::new();

indexer.start().await;

println!("Press enter to stop daemon");
io::stdin().read_line(&mut String::new()).unwrap();
}
17 changes: 17 additions & 0 deletions packages/indexer/src/models/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use deadpool_postgres::tokio_postgres::Row;
use serde::{Deserialize};

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -42,3 +43,19 @@ pub struct BlockWrapper {
pub struct PlatformExplorerSearchResponse {
pub block: BlockWrapper,
}

#[derive(Clone)]
pub struct TDBlockHeader {
pub hash: String,
pub block_height: i32,
pub tx_count: i32,
}

pub struct TDBlock {
pub header: TDBlockHeader,
pub txs: Vec<String>,
}

pub struct PlatformStateTransition {

}
16 changes: 0 additions & 16 deletions packages/indexer/src/processor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,5 @@
pub mod psql;

use async_trait::async_trait;

use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition;
use dpp::state_transition::data_contract_update_transition::DataContractUpdateTransition;
use dpp::state_transition::StateTransition;
use crate::processor::psql::PSQLProcessor;

pub enum STProcessorType {
PSQL,
}

pub trait STProcessorLike<T> {
fn handle(state_transition: StateTransition) -> ();
}

#[async_trait]
pub trait STProcessorHandlerSet {
async fn handle_data_contract_create(&self, state_transition: DataContractCreateTransition) -> bool;
}
103 changes: 103 additions & 0 deletions packages/indexer/src/processor/psql/dao/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
use std::env;
use deadpool_postgres::{Config, Manager, ManagerConfig, Pool, RecyclingMethod, Runtime, tokio_postgres, Transaction};
use deadpool_postgres::tokio_postgres::{Error, IsolationLevel, NoTls, Row};
use dpp::platform_value::string_encoding::Encoding;
use dpp::state_transition::data_contract_create_transition::accessors::DataContractCreateTransitionAccessorsV0;
use dpp::state_transition::data_contract_create_transition::DataContractCreateTransition;
use dpp::state_transition::{StateTransition, StateTransitionType};
use crate::models::{TDBlock, TDBlockHeader};
use sha256::{digest, try_digest};
use base64::{Engine as _, engine::{general_purpose}};

pub struct PostgresDAO {
connection_pool: Pool,
}

impl PostgresDAO {
pub fn new() -> PostgresDAO {
let mut cfg = Config::new();

let postgres_host = env::var("POSTGRES_HOST").expect("You've not set the POSTGRES_HOST");
let postgres_db = env::var("POSTGRES_DB").expect("You've not set the POSTGRES_DB");
let postgres_user = env::var("POSTGRES_USER").expect("You've not set the POSTGRES_USER");
let postgres_pass = env::var("POSTGRES_PASS").expect("You've not set the POSTGRES_HOST");

cfg.host = Some(postgres_host);
cfg.dbname = Some(postgres_db);
cfg.user = Some(postgres_user);
cfg.password = Some(postgres_pass);
cfg.manager = Some(ManagerConfig { recycling_method: RecyclingMethod::Fast });

let connection_pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();

return PostgresDAO { connection_pool };
}

pub async fn create_state_transition(&self, block_id: i32, st_type: i32, bytes: Vec<u8>) {
let data = general_purpose::STANDARD.encode(&bytes);
let hash = digest(bytes.clone()).to_uppercase();

let query = "INSERT INTO state_transitions(hash, data, type, block_id) VALUES ($1, $2, $3, $4);";

let client = self.connection_pool.get().await.unwrap();
let stmt = client.prepare_cached(query).await.unwrap();

client.query(&stmt, &[&hash, &data, &st_type, &block_id]).await.unwrap();
}

pub async fn create_data_contract(&self, state_transition: DataContractCreateTransition) {
let id = state_transition.data_contract().id();
let id_str = id.to_string(Encoding::Base58);

let query = "INSERT INTO data_contracts(identifier) VALUES ($1);";

let client = self.connection_pool.get().await.unwrap();
let stmt = client.prepare_cached(query).await.unwrap();
client.query(&stmt, &[&id_str]).await.unwrap();
}

pub async fn get_latest_block(&self) -> i32 {
return 0;
}

pub async fn get_block_header_by_height(&self, block_height: i32) -> Option<TDBlockHeader> {
let client = self.connection_pool.get().await.unwrap();

let stmt = client.prepare_cached("SELECT hash,block_height FROM blocks where block_height = $1;").await.unwrap();

let rows: Vec<Row> = client.query(&stmt, &[&block_height])
.await.unwrap();

let blocks: Vec<TDBlockHeader> = rows
.into_iter()
.map(|row| {
row.into()
}).collect::<Vec<TDBlockHeader>>();

let block = blocks.first();

return block.cloned();
}

pub async fn create_block(&self, block_header: TDBlockHeader) -> i32 {
let client = self.connection_pool.get().await.unwrap();

let stmt = client.prepare_cached("INSERT INTO blocks(block_height, hash) VALUES ($1, $2) RETURNING id;").await.unwrap();

let rows = client.query(&stmt, &[&block_header.block_height, &block_header.hash]).await.unwrap();

let block_id: i32 = rows[0].get(0);

return block_id;
}
}

impl From<Row> for TDBlockHeader {
fn from(row: Row) -> Self {
// id,hash,block_height
let hash: String = row.get(0);
let block_height: i32 = row.get(1);

return TDBlockHeader { hash, block_height, tx_count: 0 };
}
}
Loading