Skip to content

Commit f6d9f2c

Browse files
committed
integrate watch-api and sanakirja crate
1 parent 67518cf commit f6d9f2c

File tree

6 files changed

+153
-139
lines changed

6 files changed

+153
-139
lines changed

explorer/src/api/graphql/certificates.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use super::{
33
scalars::{PayloadType, VotePlanId},
44
BlockDate, Proposal,
55
};
6-
use crate::db::{self, chain_storable::VotePlanMeta, schema::Txn};
76
use async_graphql::{FieldResult, Object, Union};
7+
use chain_explorer::{self, chain_storable, chain_storable::VotePlanMeta, schema::Txn};
88
use std::sync::Arc;
99
use tokio::sync::Mutex;
1010

@@ -17,17 +17,17 @@ pub enum Certificate {
1717
}
1818

1919
pub struct VotePlanCertificate {
20-
pub data: db::chain_storable::StorableHash,
20+
pub data: chain_storable::StorableHash,
2121
pub txn: Arc<Txn>,
2222
pub meta: Mutex<Option<VotePlanMeta>>,
2323
}
2424

2525
pub struct PublicVoteCastCertificate {
26-
pub data: db::chain_storable::PublicVoteCast,
26+
pub data: chain_storable::PublicVoteCast,
2727
}
2828

2929
pub struct PrivateVoteCastCertificate {
30-
pub data: db::chain_storable::PrivateVoteCast,
30+
pub data: chain_storable::PrivateVoteCast,
3131
}
3232

3333
impl VotePlanCertificate {
@@ -75,8 +75,8 @@ impl VotePlanCertificate {
7575

7676
pub async fn payload_type(&self) -> FieldResult<PayloadType> {
7777
match self.get_meta().await?.payload_type {
78-
db::chain_storable::PayloadType::Public => Ok(PayloadType::Public),
79-
db::chain_storable::PayloadType::Private => Ok(PayloadType::Private),
78+
chain_explorer::chain_storable::PayloadType::Public => Ok(PayloadType::Public),
79+
chain_explorer::chain_storable::PayloadType::Private => Ok(PayloadType::Private),
8080
}
8181
}
8282

explorer/src/api/graphql/mod.rs

+65-68
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,22 @@ use self::{
1515
TransactionCount, TransactionInputCount, TransactionOutputCount, Value, VoteOptionRange,
1616
},
1717
};
18-
use crate::db::{
19-
self,
20-
chain_storable::{BlockId, CertificateTag},
21-
schema::{BlockMeta, Txn},
22-
ExplorerDb,
23-
};
24-
use async_graphql::{
25-
connection::{query, Connection, Edge, EmptyFields},
26-
FieldError,
27-
};
18+
use async_graphql::connection::{query, Connection, Edge, EmptyFields};
2819
use async_graphql::{
2920
Context, EmptyMutation, FieldResult, Object, SimpleObject, Subscription, Union,
3021
};
31-
use chain_impl_mockchain::certificate;
32-
use chain_impl_mockchain::{block::HeaderId as HeaderHash, transaction};
22+
use chain_explorer::chain_storable::{self, BlockId};
23+
use chain_explorer::schema::{BlockMeta, Txn};
24+
use chain_explorer::ExplorerDb;
25+
use chain_impl_mockchain::block::HeaderId as HeaderHash;
26+
use chain_impl_mockchain::{certificate, transaction};
3327
use std::convert::{TryFrom, TryInto};
3428
use std::str::FromStr;
3529
use std::sync::Arc;
3630
use tokio::sync::Mutex;
3731

3832
pub struct Branch {
39-
id: db::chain_storable::BlockId,
33+
id: chain_explorer::chain_storable::BlockId,
4034
txn: Arc<Txn>,
4135
}
4236

@@ -181,14 +175,14 @@ impl Branch {
181175
}
182176

183177
pub struct Block {
184-
hash: db::chain_storable::BlockId,
178+
hash: chain_explorer::chain_storable::BlockId,
185179
txn: Arc<Txn>,
186180
block_meta: Mutex<Option<BlockMeta>>,
187181
}
188182

189183
impl Block {
190184
async fn from_string_hash(hash: String, txn: Arc<Txn>) -> FieldResult<Block> {
191-
let hash: db::chain_storable::BlockId = HeaderHash::from_str(&hash)?.into();
185+
let hash: chain_storable::BlockId = HeaderHash::from_str(&hash)?.into();
192186

193187
if let Some(block_meta) = Self::try_get_block_meta(hash.clone(), &txn).await? {
194188
let block = Block {
@@ -202,7 +196,7 @@ impl Block {
202196
}
203197
}
204198

205-
fn from_valid_hash(hash: db::chain_storable::BlockId, txn: Arc<Txn>) -> Block {
199+
fn from_valid_hash(hash: chain_storable::BlockId, txn: Arc<Txn>) -> Block {
206200
Block {
207201
hash,
208202
txn,
@@ -391,8 +385,8 @@ pub struct BlockDate {
391385
slot: Slot,
392386
}
393387

394-
impl From<db::chain_storable::BlockDate> for BlockDate {
395-
fn from(date: db::chain_storable::BlockDate) -> BlockDate {
388+
impl From<chain_explorer::chain_storable::BlockDate> for BlockDate {
389+
fn from(date: chain_explorer::chain_storable::BlockDate) -> BlockDate {
396390
BlockDate {
397391
epoch: date.epoch.get().into(),
398392
slot: Slot(date.slot_id.get()),
@@ -402,7 +396,7 @@ impl From<db::chain_storable::BlockDate> for BlockDate {
402396

403397
#[derive(Clone)]
404398
pub struct Transaction {
405-
id: db::chain_storable::FragmentId,
399+
id: chain_explorer::chain_storable::FragmentId,
406400
block_hashes: Vec<BlockId>,
407401
txn: Arc<Txn>,
408402
}
@@ -466,7 +460,7 @@ impl Transaction {
466460
|h, input| {
467461
let single_account = matches!(
468462
input.input_type(),
469-
db::chain_storable::InputType::AccountSingle
463+
chain_explorer::chain_storable::InputType::AccountSingle
470464
);
471465

472466
Edge::new(
@@ -598,24 +592,29 @@ impl Transaction {
598592
.get_fragment_certificate(&id)
599593
.map_err(|_| ApiError::InternalDbError)?;
600594

601-
Ok(certificate.map(|cert| match cert.tag {
602-
CertificateTag::VotePlan => {
603-
certificates::Certificate::VotePlan(VotePlanCertificate {
604-
txn: Arc::clone(&txn),
605-
meta: Mutex::new(None),
606-
data: cert.clone().into_vote_plan().unwrap(),
595+
Ok(certificate.and_then(|cert| {
596+
cert.as_vote_plan()
597+
.map(|data| {
598+
certificates::Certificate::VotePlan(VotePlanCertificate {
599+
txn: Arc::clone(&txn),
600+
meta: Mutex::new(None),
601+
data: *data,
602+
})
607603
})
608-
}
609-
CertificateTag::PublicVoteCast => {
610-
certificates::Certificate::PublicVoteCast(PublicVoteCastCertificate {
611-
data: cert.clone().into_public_vote_cast().unwrap(),
604+
.or_else(|| {
605+
cert.as_private_vote_cast().map(|data| {
606+
certificates::Certificate::PrivateVoteCast(PrivateVoteCastCertificate {
607+
data: *data,
608+
})
609+
})
612610
})
613-
}
614-
CertificateTag::PrivateVoteCast => {
615-
certificates::Certificate::PrivateVoteCast(PrivateVoteCastCertificate {
616-
data: cert.clone().into_private_vote_cast().unwrap(),
611+
.or_else(|| {
612+
cert.as_public_vote_cast().map(|data| {
613+
certificates::Certificate::PublicVoteCast(PublicVoteCastCertificate {
614+
data: *data,
615+
})
616+
})
617617
})
618-
}
619618
}))
620619
})
621620
.await
@@ -649,7 +648,7 @@ pub struct TransactionOutput {
649648

650649
#[derive(Clone)]
651650
pub struct Address {
652-
id: db::chain_storable::Address,
651+
id: chain_explorer::chain_storable::Address,
653652
}
654653

655654
impl Address {
@@ -700,8 +699,7 @@ impl Query {
700699
let txn = Arc::new(
701700
extract_context(context)
702701
.db
703-
.get_txn()
704-
.await
702+
.txn_begin()
705703
.map_err(|_| ApiError::InternalDbError)?,
706704
);
707705

@@ -713,37 +711,35 @@ impl Query {
713711
context: &Context<'_>,
714712
length: ChainLength,
715713
) -> FieldResult<Vec<Block>> {
716-
let txn = Arc::new(
717-
extract_context(context)
718-
.db
719-
.get_txn()
720-
.await
721-
.map_err(|_| ApiError::InternalDbError)?,
722-
);
714+
let db = extract_context(context).db.clone();
715+
tokio::task::spawn_blocking(move || {
716+
let txn = Arc::new(db.txn_begin().map_err(|_| ApiError::InternalDbError)?);
723717

724-
let mut result = vec![];
725-
let chain_length = db::chain_storable::ChainLength::new(u32::from(length.0));
718+
let mut result = vec![];
719+
let chain_length = chain_storable::ChainLength::new(u32::from(length.0));
726720

727-
let blocks = txn
728-
.get_blocks_by_chain_length(&chain_length)
729-
.map_err(|_| ApiError::InternalDbError)?;
721+
let blocks = txn
722+
.get_blocks_by_chain_length(&chain_length)
723+
.map_err(|_| ApiError::InternalDbError)?;
730724

731-
for block in blocks {
732-
result.push(Block::from_valid_hash(block?.clone(), Arc::clone(&txn)));
733-
}
725+
for block in blocks {
726+
result.push(Block::from_valid_hash(block?.clone(), Arc::clone(&txn)));
727+
}
734728

735-
Ok(result)
729+
Ok(result)
730+
})
731+
.await?
736732
}
737733

738734
async fn transaction(&self, context: &Context<'_>, id: String) -> FieldResult<Transaction> {
739-
let db = &extract_context(context).db;
735+
let db = extract_context(context).db.clone();
740736

741737
let id = chain_impl_mockchain::fragment::FragmentId::from_str(&id)?;
742738

743-
let txn = db.get_txn().await.map_err(|_| ApiError::InternalDbError)?;
744-
745739
tokio::task::spawn_blocking(move || {
746-
let id = db::chain_storable::FragmentId::from(id);
740+
let txn = db.txn_begin().map_err(|_| ApiError::InternalDbError)?;
741+
742+
let id = chain_storable::FragmentId::from(id);
747743
let block_hashes = txn.transaction_blocks(&id)?;
748744
if block_hashes.is_empty() {
749745
Err(ApiError::NotFound(format!("transaction: {}", &id,)).into())
@@ -760,11 +756,11 @@ impl Query {
760756

761757
/// get all current branch heads, sorted (descending) by their length
762758
pub async fn branches(&self, context: &Context<'_>) -> FieldResult<Vec<Branch>> {
763-
let db = &extract_context(context).db;
764-
765-
let txn = Arc::new(db.get_txn().await.map_err(|_| ApiError::InternalDbError)?);
759+
let db = extract_context(context).db.clone();
766760

767761
tokio::task::spawn_blocking(move || {
762+
let txn = Arc::new(db.txn_begin().map_err(|_| ApiError::InternalDbError)?);
763+
768764
let branches = txn
769765
.get_branches()
770766
.map_err(|_| ApiError::InternalDbError)?
@@ -783,10 +779,11 @@ impl Query {
783779

784780
/// get the state that the ledger currently considers as the main branch
785781
async fn tip(&self, context: &Context<'_>) -> FieldResult<Branch> {
786-
let db = &extract_context(context).db;
787-
let txn = Arc::new(db.get_txn().await.map_err(|_| ApiError::InternalDbError)?);
782+
let db = extract_context(context).db.clone();
788783

789784
tokio::task::spawn_blocking(move || {
785+
let txn = Arc::new(db.txn_begin().map_err(|_| ApiError::InternalDbError)?);
786+
790787
let branch = txn
791788
.get_tip()
792789
.map_err(|_| ApiError::InternalDbError)
@@ -826,7 +823,7 @@ impl Subscription {
826823
let db = db.clone();
827824

828825
async move {
829-
let txn = Arc::new(db.get_txn().await.unwrap());
826+
let txn = Arc::new(db.txn_begin().unwrap());
830827
tip.ok().map(|id| Branch { id: id.into(), txn })
831828
}
832829
})
@@ -856,23 +853,23 @@ pub enum EdgeOrder {
856853
// currently the types are complex because it has both the type bounds of db::pagination and the
857854
// ones from graphql::pagination, and some extra ones to do conversion between them
858855
fn get_page<'a, N, K, C, I, Item, F, M, Output>(
859-
mut edges: db::pagination::SanakirjaCursorIter<'a, K, I, C, M>,
856+
mut edges: chain_explorer::pagination::SanakirjaCursorIter<'a, K, I, C, M>,
860857
edge_order: EdgeOrder,
861858
first: Option<usize>,
862859
last: Option<usize>,
863860
before: Option<IndexCursor>,
864861
after: Option<IndexCursor>,
865862
map_item: F,
866863
// TODO: extract this to an enum, or better yet, to two different functions
867-
) -> Result<Connection<IndexCursor, Item, ConnectionFields<N>>, FieldError>
864+
) -> Result<Connection<IndexCursor, Item, ConnectionFields<N>>, async_graphql::FieldError>
868865
where
869-
C: Clone + From<N> + sanakirja::Storable + db::pagination::PaginationCursor,
866+
C: Clone + From<N> + sanakirja::Storable + chain_explorer::pagination::PaginationCursor,
870867
K: sanakirja::Storable + PartialEq,
871868
N: From<C> + Clone + TryFrom<IndexCursor> + TryFrom<u64> + async_graphql::OutputType,
872869
I: sanakirja::Storable,
873870
u64: std::convert::From<N>,
874871
F: Fn(C, &Output) -> Edge<IndexCursor, Item, EmptyFields>,
875-
M: db::pagination::MapEntry<'a, K, I, C, Output = &'a Output>,
872+
M: chain_explorer::pagination::MapEntry<'a, K, I, C, Output = &'a Output>,
876873
Output: 'a,
877874
{
878875
let boundaries = edges

explorer/src/api/graphql/scalars.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::db::{chain_storable::StorableHash, SeqNum};
1+
use chain_explorer::{chain_storable::StorableHash, SeqNum};
22

33
use super::error::ApiError;
44
use async_graphql::{Enum, InputValueError, InputValueResult, Scalar, ScalarType, SimpleObject};

explorer/src/api/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
pub mod graphql;
22

33
use self::graphql::EContext;
4-
use crate::db::ExplorerDb;
54
use async_graphql::http::{playground_source, GraphQLPlaygroundConfig};
5+
use chain_explorer::ExplorerDb;
66
use futures::Future;
77
use jormungandr_lib::interfaces::{Cors, Tls};
88
use std::{net::SocketAddr, time::Duration};

explorer/src/indexer.rs

+16-11
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::db::ExplorerDb;
1+
use chain_explorer::ExplorerDb;
22
use chain_impl_mockchain::block::Block;
33
use chain_impl_mockchain::block::HeaderId as HeaderHash;
44
use std::sync::Arc;
@@ -17,7 +17,7 @@ pub enum IndexerError {
1717
#[error("url error")]
1818
UrlError(#[from] url::ParseError),
1919
#[error(transparent)]
20-
DbError(#[from] crate::db::error::DbError),
20+
DbError(#[from] chain_explorer::error::DbError),
2121
}
2222

2323
#[derive(Clone)]
@@ -28,10 +28,7 @@ pub struct Indexer {
2828
}
2929

3030
impl Indexer {
31-
pub fn new(
32-
db: crate::db::ExplorerDb,
33-
tip_broadcast: tokio::sync::broadcast::Sender<HeaderHash>,
34-
) -> Self {
31+
pub fn new(db: ExplorerDb, tip_broadcast: tokio::sync::broadcast::Sender<HeaderHash>) -> Self {
3532
let tip_candidate = Arc::new(Mutex::new(None));
3633
Indexer {
3734
db,
@@ -46,14 +43,19 @@ impl Indexer {
4643
async move {
4744
tracing::info!(
4845
"applying {} {}",
49-
block.header.id(),
50-
block.header.chain_length()
46+
block.header().id(),
47+
block.header().chain_length()
5148
);
5249

53-
self.db.apply_block(block.clone()).await?;
50+
let block_id = block.header().id();
51+
52+
let db = self.db.clone();
53+
tokio::task::spawn_blocking(move || db.apply_block(block.clone()))
54+
.await
55+
.unwrap()?;
5456

5557
let mut guard = self.tip_candidate.lock().await;
56-
if guard.map(|hash| hash == block.header.id()).unwrap_or(false) {
58+
if guard.map(|hash| hash == block_id).unwrap_or(false) {
5759
let hash = guard.take().unwrap();
5860
self.set_tip(hash).await?;
5961
}
@@ -68,7 +70,10 @@ impl Indexer {
6870
let span = span!(Level::INFO, "Indexer::set_tip");
6971

7072
async move {
71-
let successful = self.db.set_tip(tip).await?;
73+
let db = self.db.clone();
74+
let successful = tokio::task::spawn_blocking(move || db.set_tip(tip))
75+
.await
76+
.unwrap()?;
7277

7378
if !successful {
7479
let mut guard = self.tip_candidate.lock().await;

0 commit comments

Comments
 (0)