Skip to content

Commit f936504

Browse files
committed
Impl a basic db tx using rocksdb transaction
1 parent 2774d59 commit f936504

File tree

5 files changed

+255
-170
lines changed

5 files changed

+255
-170
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/sovereign-sdk/full-node/db/sov-db/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ anyhow = { workspace = true, default-features = true }
2525
bincode = { workspace = true }
2626
borsh = { workspace = true, default-features = true, features = ["bytes", "rc"] }
2727
byteorder = { workspace = true, default-features = true }
28+
metrics = { workspace = true }
2829
rlimit = { workspace = true }
2930
rocksdb = { workspace = true }
3031
serde = { workspace = true, default-features = true, features = ["rc"] }

crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs

Lines changed: 206 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@ use std::mem::ManuallyDrop;
22
use std::ops::RangeInclusive;
33
use std::path::Path;
44
use std::sync::Arc;
5+
use std::time::Instant;
56

67
use anyhow::Context;
7-
use rocksdb::{ReadOptions, WriteBatch};
8+
use metrics::histogram;
9+
use rocksdb::ReadOptions;
810
use sov_rollup_interface::block::L2Block;
911
use sov_rollup_interface::da::SequencerCommitment;
1012
use sov_rollup_interface::fork::{Fork, ForkMigration};
1113
use sov_rollup_interface::stf::StateDiff;
1214
use sov_rollup_interface::zk::{Proof, StorageRootHash};
13-
use sov_schema_db::{ScanDirection, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, DB};
15+
use sov_schema_db::schema::{KeyCodec, ValueCodec};
16+
use sov_schema_db::{
17+
ScanDirection, Schema, SchemaBatch, SchemaIterator, SeekKeyEncoder, TransactionDB, DB,
18+
};
1419
use tracing::instrument;
1520
use uuid::Uuid;
1621

@@ -82,39 +87,6 @@ impl LedgerDB {
8287
})
8388
}
8489

85-
/// Returns the handle foe the column family with the given name
86-
pub fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> {
87-
self.db.get_cf_handle(cf_name)
88-
}
89-
90-
/// Insert a key-value pair into the database given a column family
91-
pub fn insert_into_cf_raw(
92-
&self,
93-
cf_handle: &rocksdb::ColumnFamily,
94-
key: &[u8],
95-
value: &[u8],
96-
) -> anyhow::Result<()> {
97-
self.db.put_cf(cf_handle, key, value)
98-
}
99-
100-
/// Deletes a key-value pair from a column family given key and column family.
101-
pub fn delete_from_cf_raw(
102-
&self,
103-
cf_handle: &rocksdb::ColumnFamily,
104-
key: &[u8],
105-
) -> anyhow::Result<()> {
106-
self.db.delete_cf(cf_handle, key)
107-
}
108-
109-
/// Get an iterator for the given column family
110-
pub fn get_iterator_for_cf<'a>(
111-
&'a self,
112-
cf_handle: &rocksdb::ColumnFamily,
113-
iterator_mode: Option<rocksdb::IteratorMode>,
114-
) -> anyhow::Result<rocksdb::DBIterator<'a>> {
115-
Ok(self.db.iter_cf(cf_handle, iterator_mode))
116-
}
117-
11890
/// Gets all data with identifier in `range.start` to `range.end`. If `range.end` is outside
11991
/// the range of the database, the result will smaller than the requested range.
12092
/// Note that this method blindly preallocates for the requested range, so it should not be exposed
@@ -151,21 +123,6 @@ impl LedgerDB {
151123
}
152124
}
153125

154-
fn put_l2_block(
155-
&self,
156-
l2_block: &StoredL2Block,
157-
schema_batch: &mut SchemaBatch,
158-
) -> Result<(), anyhow::Error> {
159-
let l2_block_number = L2BlockNumber(l2_block.height);
160-
schema_batch.put::<L2BlockByNumber>(&l2_block_number, l2_block)?;
161-
schema_batch.put::<L2BlockByHash>(&l2_block.hash, &l2_block_number)
162-
}
163-
164-
/// Write raw rocksdb WriteBatch
165-
pub fn write(&self, batch: WriteBatch) -> anyhow::Result<()> {
166-
self.db.write(batch)
167-
}
168-
169126
/// Reference to underlying sov DB
170127
pub fn db_handle(&self) -> Arc<sov_schema_db::DB> {
171128
self.db.clone()
@@ -190,8 +147,6 @@ impl SharedLedgerOps for LedgerDB {
190147
tx_hashes: Vec<[u8; 32]>,
191148
tx_bodies: Option<Vec<Vec<u8>>>,
192149
) -> Result<(), anyhow::Error> {
193-
let mut schema_batch = SchemaBatch::new();
194-
195150
let txs = if let Some(tx_bodies) = tx_bodies {
196151
assert_eq!(
197152
tx_bodies.len(),
@@ -227,7 +182,12 @@ impl SharedLedgerOps for LedgerDB {
227182
timestamp: l2_block.timestamp(),
228183
tx_merkle_root: l2_block.tx_merkle_root(),
229184
};
230-
self.put_l2_block(&l2_block_to_store, &mut schema_batch)?;
185+
186+
let mut schema_batch = SchemaBatch::new();
187+
188+
let l2_block_number = L2BlockNumber(height);
189+
schema_batch.put::<L2BlockByNumber>(&l2_block_number, &l2_block_to_store)?;
190+
schema_batch.put::<L2BlockByHash>(&l2_block.hash(), &l2_block_number)?;
231191

232192
self.db.write_schemas(schema_batch)?;
233193

@@ -999,7 +959,6 @@ impl ForkMigration for LedgerDB {
999959
}
1000960
}
1001961

1002-
1003962
/// A transaction for batching multiple ledger operations together.
1004963
pub struct LedgerTx {
1005964
/// The batch of schema changes to apply.
@@ -1008,6 +967,12 @@ pub struct LedgerTx {
1008967
batch: ManuallyDrop<SchemaBatch>,
1009968
}
1010969

970+
impl Default for LedgerTx {
971+
fn default() -> Self {
972+
Self::new()
973+
}
974+
}
975+
1011976
impl LedgerTx {
1012977
/// Create a new ledger transaction.
1013978
pub fn new() -> Self {
@@ -1022,21 +987,21 @@ impl LedgerTx {
1022987
l2_height: L2BlockNumber,
1023988
state_diff: &StateDiff,
1024989
) -> Result<&mut Self, anyhow::Error> {
1025-
self
1026-
.batch
990+
self.batch
1027991
.put::<StateDiffByBlockNumber>(&l2_height, state_diff)
1028992
.context("Failed to add StateDiffByBlockNumber")?;
1029993
Ok(self)
1030994
}
1031995

1032996
/// Put an L2 block into the transaction to be saved.
1033-
pub fn put_l2_block(
1034-
&mut self,
1035-
l2_block: &StoredL2Block,
1036-
) -> Result<&mut Self, anyhow::Error> {
997+
pub fn put_l2_block(&mut self, l2_block: &StoredL2Block) -> Result<&mut Self, anyhow::Error> {
1037998
let l2_block_number = L2BlockNumber(l2_block.height);
1038-
self.batch.put::<L2BlockByNumber>(&l2_block_number, l2_block).context("Failed to add L2BlockByNumber")?;
1039-
self.batch.put::<L2BlockByHash>(&l2_block.hash, &l2_block_number).context("Failed to add L2BlockByHash")?;
999+
self.batch
1000+
.put::<L2BlockByNumber>(&l2_block_number, l2_block)
1001+
.context("Failed to add L2BlockByNumber")?;
1002+
self.batch
1003+
.put::<L2BlockByHash>(&l2_block.hash, &l2_block_number)
1004+
.context("Failed to add L2BlockByHash")?;
10401005

10411006
Ok(self)
10421007
}
@@ -1046,7 +1011,9 @@ impl LedgerTx {
10461011
pub fn commit(self, db: &LedgerDB) -> Result<(), anyhow::Error> {
10471012
let Self { batch } = self;
10481013
let batch = ManuallyDrop::into_inner(batch);
1049-
db.db.write_schemas(batch).context("Failed to write LedgerTx to DB")
1014+
db.db
1015+
.write_schemas(batch)
1016+
.context("Failed to write LedgerTx to DB")
10501017
}
10511018

10521019
/// Reject the transaction, dropping all changes.
@@ -1055,3 +1022,179 @@ impl LedgerTx {
10551022
let _ = ManuallyDrop::into_inner(batch);
10561023
}
10571024
}
1025+
1026+
#[derive(Clone)]
1027+
/// Asd
1028+
pub struct TransactionLedgerDB {
1029+
/// Asd
1030+
pub(crate) db: Arc<TransactionDB>,
1031+
}
1032+
1033+
/// Asd
1034+
pub struct LedgerDBTransaction<'a> {
1035+
db: Arc<TransactionDB>,
1036+
tx: rocksdb::Transaction<'a, rocksdb::TransactionDB>,
1037+
}
1038+
1039+
impl TransactionLedgerDB {
1040+
/// LedgerDB path suffix
1041+
pub const DB_PATH_SUFFIX: &'static str = "ledger";
1042+
const DB_NAME: &'static str = "ledger-db";
1043+
1044+
/// Open a [`LedgerDB`] (backed by RocksDB) at the specified path.
1045+
/// Will take optional column families, used for migration purposes.
1046+
/// The returned instance will be at the path `{path}/ledger`.
1047+
#[instrument(level = "trace", skip_all, err)]
1048+
pub fn with_config(cfg: &RocksdbConfig) -> Result<Self, anyhow::Error> {
1049+
let path = cfg.path.join(Self::DB_PATH_SUFFIX);
1050+
let raw_options = cfg.as_raw_options(false);
1051+
let tables = cfg
1052+
.column_families
1053+
.clone()
1054+
.unwrap_or_else(|| LEDGER_TABLES.iter().map(|e| e.to_string()).collect());
1055+
let inner = DB::open_transaction_db(path, Self::DB_NAME, tables, &raw_options)?;
1056+
1057+
Ok(Self {
1058+
db: Arc::new(inner),
1059+
})
1060+
}
1061+
1062+
/// Create a new transaction
1063+
pub fn transaction(&self) -> LedgerDBTransaction {
1064+
LedgerDBTransaction {
1065+
db: Arc::clone(&self.db),
1066+
tx: self.db.transaction(),
1067+
}
1068+
}
1069+
}
1070+
1071+
/// Low-level operations
1072+
/// 1. to put/get/delete data using KeyCodec/ValueCodec.
1073+
/// 2. to commit the transaction.
1074+
impl LedgerDBTransaction<'_> {
1075+
fn put<S: Schema>(
1076+
&self,
1077+
key: &impl KeyCodec<S>,
1078+
value: &impl ValueCodec<S>,
1079+
) -> anyhow::Result<()> {
1080+
let start = Instant::now();
1081+
1082+
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
1083+
let key = key.encode_key()?;
1084+
let value = value.encode_value()?;
1085+
1086+
self.tx.put_cf(cf_handle, key, value)?;
1087+
1088+
histogram!("ledger_tx_put_latency_seconds").record(
1089+
Instant::now()
1090+
.saturating_duration_since(start)
1091+
.as_secs_f64(),
1092+
);
1093+
Ok(())
1094+
}
1095+
1096+
/// Returns the handle for a rocksdb column family.
1097+
fn get_cf_handle(&self, cf_name: &str) -> anyhow::Result<&rocksdb::ColumnFamily> {
1098+
self.db.get_cf_handle(cf_name)
1099+
}
1100+
1101+
fn get<S: Schema>(&self, schema_key: &impl KeyCodec<S>) -> anyhow::Result<Option<S::Value>> {
1102+
let start = Instant::now();
1103+
1104+
let k = schema_key.encode_key()?;
1105+
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
1106+
1107+
let result = self.tx.get_pinned_cf(cf_handle, k)?;
1108+
1109+
histogram!("schemadb_get_bytes", "cf_name" => S::COLUMN_FAMILY_NAME)
1110+
.record(result.as_ref().map_or(0.0, |v| v.len() as f64));
1111+
1112+
let result = result
1113+
.map(|raw_value| S::Value::decode_value(&raw_value))
1114+
.transpose()
1115+
.map_err(|err| err.into());
1116+
1117+
histogram!("schemadb_get_latency_seconds", "cf_name" => S::COLUMN_FAMILY_NAME).record(
1118+
Instant::now()
1119+
.saturating_duration_since(start)
1120+
.as_secs_f64(),
1121+
);
1122+
result
1123+
}
1124+
1125+
fn delete<S: Schema>(&self, key: &impl KeyCodec<S>) -> anyhow::Result<()> {
1126+
let cf_handle = self.get_cf_handle(S::COLUMN_FAMILY_NAME)?;
1127+
let key = key.encode_key()?;
1128+
self.tx.delete_cf(cf_handle, key)?;
1129+
1130+
Ok(())
1131+
}
1132+
1133+
/// Commit the transaction
1134+
pub fn commit(self) -> anyhow::Result<()> {
1135+
let start = Instant::now();
1136+
self.tx.commit()?;
1137+
1138+
histogram!("ledger_tx_commit_latency_seconds").record(
1139+
Instant::now()
1140+
.saturating_duration_since(start)
1141+
.as_secs_f64(),
1142+
);
1143+
Ok(())
1144+
}
1145+
}
1146+
1147+
impl LedgerDBTransaction<'_> {
1148+
/// Sets the state diff by block number
1149+
#[instrument(level = "trace", skip(self), err, ret)]
1150+
pub fn set_state_diff(
1151+
&self,
1152+
l2_height: L2BlockNumber,
1153+
state_diff: &StateDiff,
1154+
) -> anyhow::Result<()> {
1155+
self.put::<StateDiffByBlockNumber>(&l2_height, state_diff)?;
1156+
1157+
Ok(())
1158+
}
1159+
1160+
/// Removes the state diff by block range
1161+
#[instrument(level = "trace", skip(self), err, ret)]
1162+
pub fn delete_state_diff_by_range(
1163+
&self,
1164+
l2_height_range: RangeInclusive<L2BlockNumber>,
1165+
) -> anyhow::Result<()> {
1166+
for l2_height in l2_height_range.start().0..=l2_height_range.end().0 {
1167+
self.delete::<StateDiffByBlockNumber>(&L2BlockNumber(l2_height))?;
1168+
}
1169+
1170+
Ok(())
1171+
}
1172+
1173+
/// Gets the state diff by block number
1174+
#[instrument(level = "trace", skip(self), err, ret)]
1175+
pub fn get_state_diff(&self, l2_height: L2BlockNumber) -> Result<StateDiff, anyhow::Error> {
1176+
self.get::<StateDiffByBlockNumber>(&l2_height)
1177+
.map(|diff| diff.unwrap_or_default())
1178+
}
1179+
1180+
/// Put an L2 block into the transaction to be saved.
1181+
#[instrument(level = "trace", skip(self), err)]
1182+
pub fn set_l2_block(&self, l2_block: &StoredL2Block) -> Result<(), anyhow::Error> {
1183+
let l2_block_number = L2BlockNumber(l2_block.height);
1184+
self.put::<L2BlockByNumber>(&l2_block_number, l2_block)
1185+
.context("Failed to add L2BlockByNumber")?;
1186+
self.put::<L2BlockByHash>(&l2_block.hash, &l2_block_number)
1187+
.context("Failed to add L2BlockByHash")?;
1188+
1189+
Ok(())
1190+
}
1191+
1192+
/// Gets l2 block by number
1193+
#[instrument(level = "trace", skip(self), err)]
1194+
fn get_l2_block_by_number(
1195+
&self,
1196+
number: &L2BlockNumber,
1197+
) -> Result<Option<StoredL2Block>, anyhow::Error> {
1198+
self.get::<L2BlockByNumber>(number)
1199+
}
1200+
}

0 commit comments

Comments
 (0)