@@ -2,15 +2,20 @@ use std::mem::ManuallyDrop;
22use std:: ops:: RangeInclusive ;
33use std:: path:: Path ;
44use std:: sync:: Arc ;
5+ use std:: time:: Instant ;
56
67use anyhow:: Context ;
7- use rocksdb:: { ReadOptions , WriteBatch } ;
8+ use metrics:: histogram;
9+ use rocksdb:: ReadOptions ;
810use sov_rollup_interface:: block:: L2Block ;
911use sov_rollup_interface:: da:: SequencerCommitment ;
1012use sov_rollup_interface:: fork:: { Fork , ForkMigration } ;
1113use sov_rollup_interface:: stf:: StateDiff ;
1214use sov_rollup_interface:: zk:: { Proof , StorageRootHash } ;
13- use sov_schema_db:: { ScanDirection , Schema , SchemaBatch , SchemaIterator , SeekKeyEncoder , DB } ;
15+ use sov_schema_db:: schema:: { KeyCodec , ValueCodec } ;
16+ use sov_schema_db:: {
17+ ScanDirection , Schema , SchemaBatch , SchemaIterator , SeekKeyEncoder , TransactionDB , DB ,
18+ } ;
1419use tracing:: instrument;
1520use uuid:: Uuid ;
1621
@@ -82,39 +87,6 @@ impl LedgerDB {
8287 } )
8388 }
8489
85- /// Returns the handle foe the column family with the given name
86- pub fn get_cf_handle ( & self , cf_name : & str ) -> anyhow:: Result < & rocksdb:: ColumnFamily > {
87- self . db . get_cf_handle ( cf_name)
88- }
89-
90- /// Insert a key-value pair into the database given a column family
91- pub fn insert_into_cf_raw (
92- & self ,
93- cf_handle : & rocksdb:: ColumnFamily ,
94- key : & [ u8 ] ,
95- value : & [ u8 ] ,
96- ) -> anyhow:: Result < ( ) > {
97- self . db . put_cf ( cf_handle, key, value)
98- }
99-
100- /// Deletes a key-value pair from a column family given key and column family.
101- pub fn delete_from_cf_raw (
102- & self ,
103- cf_handle : & rocksdb:: ColumnFamily ,
104- key : & [ u8 ] ,
105- ) -> anyhow:: Result < ( ) > {
106- self . db . delete_cf ( cf_handle, key)
107- }
108-
109- /// Get an iterator for the given column family
110- pub fn get_iterator_for_cf < ' a > (
111- & ' a self ,
112- cf_handle : & rocksdb:: ColumnFamily ,
113- iterator_mode : Option < rocksdb:: IteratorMode > ,
114- ) -> anyhow:: Result < rocksdb:: DBIterator < ' a > > {
115- Ok ( self . db . iter_cf ( cf_handle, iterator_mode) )
116- }
117-
11890 /// Gets all data with identifier in `range.start` to `range.end`. If `range.end` is outside
11991 /// the range of the database, the result will smaller than the requested range.
12092 /// Note that this method blindly preallocates for the requested range, so it should not be exposed
@@ -151,21 +123,6 @@ impl LedgerDB {
151123 }
152124 }
153125
154- fn put_l2_block (
155- & self ,
156- l2_block : & StoredL2Block ,
157- schema_batch : & mut SchemaBatch ,
158- ) -> Result < ( ) , anyhow:: Error > {
159- let l2_block_number = L2BlockNumber ( l2_block. height ) ;
160- schema_batch. put :: < L2BlockByNumber > ( & l2_block_number, l2_block) ?;
161- schema_batch. put :: < L2BlockByHash > ( & l2_block. hash , & l2_block_number)
162- }
163-
164- /// Write raw rocksdb WriteBatch
165- pub fn write ( & self , batch : WriteBatch ) -> anyhow:: Result < ( ) > {
166- self . db . write ( batch)
167- }
168-
169126 /// Reference to underlying sov DB
170127 pub fn db_handle ( & self ) -> Arc < sov_schema_db:: DB > {
171128 self . db . clone ( )
@@ -190,8 +147,6 @@ impl SharedLedgerOps for LedgerDB {
190147 tx_hashes : Vec < [ u8 ; 32 ] > ,
191148 tx_bodies : Option < Vec < Vec < u8 > > > ,
192149 ) -> Result < ( ) , anyhow:: Error > {
193- let mut schema_batch = SchemaBatch :: new ( ) ;
194-
195150 let txs = if let Some ( tx_bodies) = tx_bodies {
196151 assert_eq ! (
197152 tx_bodies. len( ) ,
@@ -227,7 +182,12 @@ impl SharedLedgerOps for LedgerDB {
227182 timestamp : l2_block. timestamp ( ) ,
228183 tx_merkle_root : l2_block. tx_merkle_root ( ) ,
229184 } ;
230- self . put_l2_block ( & l2_block_to_store, & mut schema_batch) ?;
185+
186+ let mut schema_batch = SchemaBatch :: new ( ) ;
187+
188+ let l2_block_number = L2BlockNumber ( height) ;
189+ schema_batch. put :: < L2BlockByNumber > ( & l2_block_number, & l2_block_to_store) ?;
190+ schema_batch. put :: < L2BlockByHash > ( & l2_block. hash ( ) , & l2_block_number) ?;
231191
232192 self . db . write_schemas ( schema_batch) ?;
233193
@@ -999,7 +959,6 @@ impl ForkMigration for LedgerDB {
999959 }
1000960}
1001961
1002-
1003962/// A transaction for batching multiple ledger operations together.
1004963pub struct LedgerTx {
1005964 /// The batch of schema changes to apply.
@@ -1008,6 +967,12 @@ pub struct LedgerTx {
1008967 batch : ManuallyDrop < SchemaBatch > ,
1009968}
1010969
970+ impl Default for LedgerTx {
971+ fn default ( ) -> Self {
972+ Self :: new ( )
973+ }
974+ }
975+
1011976impl LedgerTx {
1012977 /// Create a new ledger transaction.
1013978 pub fn new ( ) -> Self {
@@ -1022,21 +987,21 @@ impl LedgerTx {
1022987 l2_height : L2BlockNumber ,
1023988 state_diff : & StateDiff ,
1024989 ) -> Result < & mut Self , anyhow:: Error > {
1025- self
1026- . batch
990+ self . batch
1027991 . put :: < StateDiffByBlockNumber > ( & l2_height, state_diff)
1028992 . context ( "Failed to add StateDiffByBlockNumber" ) ?;
1029993 Ok ( self )
1030994 }
1031995
1032996 /// Put an L2 block into the transaction to be saved.
1033- pub fn put_l2_block (
1034- & mut self ,
1035- l2_block : & StoredL2Block ,
1036- ) -> Result < & mut Self , anyhow:: Error > {
997+ pub fn put_l2_block ( & mut self , l2_block : & StoredL2Block ) -> Result < & mut Self , anyhow:: Error > {
1037998 let l2_block_number = L2BlockNumber ( l2_block. height ) ;
1038- self . batch . put :: < L2BlockByNumber > ( & l2_block_number, l2_block) . context ( "Failed to add L2BlockByNumber" ) ?;
1039- self . batch . put :: < L2BlockByHash > ( & l2_block. hash , & l2_block_number) . context ( "Failed to add L2BlockByHash" ) ?;
999+ self . batch
1000+ . put :: < L2BlockByNumber > ( & l2_block_number, l2_block)
1001+ . context ( "Failed to add L2BlockByNumber" ) ?;
1002+ self . batch
1003+ . put :: < L2BlockByHash > ( & l2_block. hash , & l2_block_number)
1004+ . context ( "Failed to add L2BlockByHash" ) ?;
10401005
10411006 Ok ( self )
10421007 }
@@ -1046,7 +1011,9 @@ impl LedgerTx {
10461011 pub fn commit ( self , db : & LedgerDB ) -> Result < ( ) , anyhow:: Error > {
10471012 let Self { batch } = self ;
10481013 let batch = ManuallyDrop :: into_inner ( batch) ;
1049- db. db . write_schemas ( batch) . context ( "Failed to write LedgerTx to DB" )
1014+ db. db
1015+ . write_schemas ( batch)
1016+ . context ( "Failed to write LedgerTx to DB" )
10501017 }
10511018
10521019 /// Reject the transaction, dropping all changes.
@@ -1055,3 +1022,176 @@ impl LedgerTx {
10551022 let _ = ManuallyDrop :: into_inner ( batch) ;
10561023 }
10571024}
1025+
1026+ #[ derive( Clone ) ]
1027+ /// Asd
1028+ pub struct TransactionLedgerDB {
1029+ /// Asd
1030+ pub ( crate ) db : Arc < TransactionDB > ,
1031+ }
1032+
1033+ /// Asd
1034+ pub struct LedgerDBTransaction < ' a > {
1035+ db : Arc < TransactionDB > ,
1036+ tx : rocksdb:: Transaction < ' a , rocksdb:: TransactionDB > ,
1037+ }
1038+
1039+ impl TransactionLedgerDB {
1040+ /// LedgerDB path suffix
1041+ pub const DB_PATH_SUFFIX : & ' static str = "ledger" ;
1042+ const DB_NAME : & ' static str = "ledger-db" ;
1043+
1044+ /// Open a [`LedgerDB`] (backed by RocksDB) at the specified path.
1045+ /// Will take optional column families, used for migration purposes.
1046+ /// The returned instance will be at the path `{path}/ledger`.
1047+ #[ instrument( level = "trace" , skip_all, err) ]
1048+ pub fn with_config ( cfg : & RocksdbConfig ) -> Result < Self , anyhow:: Error > {
1049+ let path = cfg. path . join ( Self :: DB_PATH_SUFFIX ) ;
1050+ let raw_options = cfg. as_raw_options ( false ) ;
1051+ let tables = cfg
1052+ . column_families
1053+ . clone ( )
1054+ . unwrap_or_else ( || LEDGER_TABLES . iter ( ) . map ( |e| e. to_string ( ) ) . collect ( ) ) ;
1055+ let inner = DB :: open_transaction_db ( path, Self :: DB_NAME , tables, & raw_options) ?;
1056+
1057+ Ok ( Self {
1058+ db : Arc :: new ( inner) ,
1059+ } )
1060+ }
1061+
1062+ /// Create a new transaction
1063+ pub fn transaction ( & self ) -> LedgerDBTransaction {
1064+ LedgerDBTransaction {
1065+ db : Arc :: clone ( & self . db ) ,
1066+ tx : self . db . transaction ( ) ,
1067+ }
1068+ }
1069+ }
1070+
1071+ /// Low-level operations
1072+ /// 1. to put/get/delete data using KeyCodec/ValueCodec.
1073+ /// 2. to commit the transaction.
1074+ impl LedgerDBTransaction < ' _ > {
1075+ fn put < S : Schema > (
1076+ & self ,
1077+ key : & impl KeyCodec < S > ,
1078+ value : & impl ValueCodec < S > ,
1079+ ) -> anyhow:: Result < ( ) > {
1080+ let start = Instant :: now ( ) ;
1081+ let key = key. encode_key ( ) ?;
1082+ let value = value. encode_value ( ) ?;
1083+
1084+ self . tx . put ( key, value) ?;
1085+
1086+ histogram ! ( "ledger_tx_put_latency_seconds" ) . record (
1087+ Instant :: now ( )
1088+ . saturating_duration_since ( start)
1089+ . as_secs_f64 ( ) ,
1090+ ) ;
1091+ Ok ( ( ) )
1092+ }
1093+
1094+ /// Returns the handle for a rocksdb column family.
1095+ fn get_cf_handle ( & self , cf_name : & str ) -> anyhow:: Result < & rocksdb:: ColumnFamily > {
1096+ self . db . get_cf_handle ( cf_name)
1097+ }
1098+
1099+ fn get < S : Schema > ( & self , schema_key : & impl KeyCodec < S > ) -> anyhow:: Result < Option < S :: Value > > {
1100+ let start = Instant :: now ( ) ;
1101+
1102+ let k = schema_key. encode_key ( ) ?;
1103+ let cf_handle = self . get_cf_handle ( S :: COLUMN_FAMILY_NAME ) ?;
1104+
1105+ let result = self . tx . get_pinned_cf ( cf_handle, k) ?;
1106+
1107+ histogram ! ( "schemadb_get_bytes" , "cf_name" => S :: COLUMN_FAMILY_NAME )
1108+ . record ( result. as_ref ( ) . map_or ( 0.0 , |v| v. len ( ) as f64 ) ) ;
1109+
1110+ let result = result
1111+ . map ( |raw_value| S :: Value :: decode_value ( & raw_value) )
1112+ . transpose ( )
1113+ . map_err ( |err| err. into ( ) ) ;
1114+
1115+ histogram ! ( "schemadb_get_latency_seconds" , "cf_name" => S :: COLUMN_FAMILY_NAME ) . record (
1116+ Instant :: now ( )
1117+ . saturating_duration_since ( start)
1118+ . as_secs_f64 ( ) ,
1119+ ) ;
1120+ result
1121+ }
1122+
1123+ fn delete < S : Schema > ( & self , key : & impl KeyCodec < S > ) -> anyhow:: Result < ( ) > {
1124+ let key = key. encode_key ( ) ?;
1125+ self . tx . delete ( key) ?;
1126+
1127+ Ok ( ( ) )
1128+ }
1129+
1130+ /// Commit the transaction
1131+ pub fn commit ( self ) -> anyhow:: Result < ( ) > {
1132+ let start = Instant :: now ( ) ;
1133+ self . tx . commit ( ) ?;
1134+
1135+ histogram ! ( "ledger_tx_commit_latency_seconds" ) . record (
1136+ Instant :: now ( )
1137+ . saturating_duration_since ( start)
1138+ . as_secs_f64 ( ) ,
1139+ ) ;
1140+ Ok ( ( ) )
1141+ }
1142+ }
1143+
1144+ impl LedgerDBTransaction < ' _ > {
1145+ /// Sets the state diff by block number
1146+ #[ instrument( level = "trace" , skip( self ) , err, ret) ]
1147+ pub fn set_state_diff (
1148+ & self ,
1149+ l2_height : L2BlockNumber ,
1150+ state_diff : & StateDiff ,
1151+ ) -> anyhow:: Result < ( ) > {
1152+ self . put :: < StateDiffByBlockNumber > ( & l2_height, state_diff) ?;
1153+
1154+ Ok ( ( ) )
1155+ }
1156+
1157+ /// Removes the state diff by block range
1158+ #[ instrument( level = "trace" , skip( self ) , err, ret) ]
1159+ pub fn delete_state_diff_by_range (
1160+ & self ,
1161+ l2_height_range : RangeInclusive < L2BlockNumber > ,
1162+ ) -> anyhow:: Result < ( ) > {
1163+ for l2_height in l2_height_range. start ( ) . 0 ..=l2_height_range. end ( ) . 0 {
1164+ self . delete :: < StateDiffByBlockNumber > ( & L2BlockNumber ( l2_height) ) ?;
1165+ }
1166+
1167+ Ok ( ( ) )
1168+ }
1169+
1170+ /// Gets the state diff by block number
1171+ #[ instrument( level = "trace" , skip( self ) , err, ret) ]
1172+ pub fn get_state_diff ( & self , l2_height : L2BlockNumber ) -> Result < StateDiff , anyhow:: Error > {
1173+ self . get :: < StateDiffByBlockNumber > ( & l2_height)
1174+ . map ( |diff| diff. unwrap_or_default ( ) )
1175+ }
1176+
1177+ /// Put an L2 block into the transaction to be saved.
1178+ #[ instrument( level = "trace" , skip( self ) , err) ]
1179+ pub fn set_l2_block ( & self , l2_block : & StoredL2Block ) -> Result < ( ) , anyhow:: Error > {
1180+ let l2_block_number = L2BlockNumber ( l2_block. height ) ;
1181+ self . put :: < L2BlockByNumber > ( & l2_block_number, l2_block)
1182+ . context ( "Failed to add L2BlockByNumber" ) ?;
1183+ self . put :: < L2BlockByHash > ( & l2_block. hash , & l2_block_number)
1184+ . context ( "Failed to add L2BlockByHash" ) ?;
1185+
1186+ Ok ( ( ) )
1187+ }
1188+
1189+ /// Gets l2 block by number
1190+ #[ instrument( level = "trace" , skip( self ) , err) ]
1191+ fn get_l2_block_by_number (
1192+ & self ,
1193+ number : & L2BlockNumber ,
1194+ ) -> Result < Option < StoredL2Block > , anyhow:: Error > {
1195+ self . get :: < L2BlockByNumber > ( number)
1196+ }
1197+ }
0 commit comments