1- use std:: mem:: ManuallyDrop ;
21use std:: ops:: RangeInclusive ;
32use std:: path:: Path ;
43use std:: sync:: Arc ;
5- use std:: time:: Instant ;
64
7- use anyhow:: Context ;
8- use metrics:: histogram;
95use rocksdb:: ReadOptions ;
106use sov_rollup_interface:: block:: L2Block ;
117use sov_rollup_interface:: da:: SequencerCommitment ;
@@ -14,9 +10,7 @@ use sov_rollup_interface::stf::StateDiff;
1410use sov_rollup_interface:: zk:: { Proof , StorageRootHash } ;
1511use sov_schema_db:: schema:: { KeyCodec , ValueCodec } ;
1612pub use sov_schema_db:: SchemaBatch ;
17- use sov_schema_db:: {
18- ScanDirection , Schema , SchemaIterator , SchemaIteratorTx , SeekKeyEncoder , TransactionDB , DB ,
19- } ;
13+ use sov_schema_db:: { ScanDirection , Schema , SchemaIterator , SeekKeyEncoder , DB } ;
2014use tracing:: instrument;
2115use uuid:: Uuid ;
2216
@@ -786,270 +780,3 @@ impl ForkMigration for LedgerDB {
786780 Ok ( ( ) )
787781 }
788782}
789-
790- /// A transaction for batching multiple ledger operations together.
791- pub struct LedgerTx {
792- /// The batch of schema changes to apply.
793- /// Using ManuallyDrop to avoid silent drop of SchemaBatch which would lose the changes.
794- /// The batch must be explicitly written to the DB using `::commit` method.
795- batch : ManuallyDrop < SchemaBatch > ,
796- }
797-
798- impl Default for LedgerTx {
799- fn default ( ) -> Self {
800- Self :: new ( )
801- }
802- }
803-
804- impl LedgerTx {
805- /// Create a new ledger transaction.
806- pub fn new ( ) -> Self {
807- Self {
808- batch : ManuallyDrop :: new ( SchemaBatch :: new ( ) ) ,
809- }
810- }
811-
812- /// Put a state diff into the transaction to be saved.
813- pub fn put_state_diff (
814- & mut self ,
815- l2_height : L2BlockNumber ,
816- state_diff : & StateDiff ,
817- ) -> Result < & mut Self , anyhow:: Error > {
818- self . batch
819- . put :: < StateDiffByBlockNumber > ( & l2_height, state_diff)
820- . context ( "Failed to add StateDiffByBlockNumber" ) ?;
821- Ok ( self )
822- }
823-
824- /// Put an L2 block into the transaction to be saved.
825- pub fn put_l2_block ( & mut self , l2_block : & StoredL2Block ) -> Result < & mut Self , anyhow:: Error > {
826- let l2_block_number = L2BlockNumber ( l2_block. height ) ;
827- self . batch
828- . put :: < L2BlockByNumber > ( & l2_block_number, l2_block)
829- . context ( "Failed to add L2BlockByNumber" ) ?;
830- self . batch
831- . put :: < L2BlockByHash > ( & l2_block. hash , & l2_block_number)
832- . context ( "Failed to add L2BlockByHash" ) ?;
833-
834- Ok ( self )
835- }
836-
837- /// Commit the transaction to the given ledger DB.
838- #[ must_use = "LedgerTx must be committed to apply changes" ]
839- pub fn commit ( self , db : & LedgerDB ) -> Result < ( ) , anyhow:: Error > {
840- let Self { batch } = self ;
841- let batch = ManuallyDrop :: into_inner ( batch) ;
842- db. db
843- . write_schemas ( batch)
844- . context ( "Failed to write LedgerTx to DB" )
845- }
846-
847- /// Reject the transaction, dropping all changes.
848- pub fn reject ( self ) {
849- let Self { batch } = self ;
850- let _ = ManuallyDrop :: into_inner ( batch) ;
851- }
852- }
853-
854- #[ derive( Clone ) ]
855- /// An instance of the ledger database capable of transactional operations.
856- pub struct TransactionLedgerDB {
857- /// The underlying database instance.
858- pub ( crate ) db : Arc < TransactionDB > ,
859- }
860-
861- /// A transaction for batching multiple ledger operations together.
862- pub struct LedgerDBTransaction < ' a > {
863- db : Arc < TransactionDB > ,
864- tx : rocksdb:: Transaction < ' a , rocksdb:: TransactionDB > ,
865- }
866-
867- impl TransactionLedgerDB {
868- /// LedgerDB path suffix
869- pub const DB_PATH_SUFFIX : & ' static str = "ledger" ;
870-
871- /// Open a [`LedgerDB`] (backed by RocksDB) at the specified path.
872- /// Will take optional column families, used for migration purposes.
873- /// The returned instance will be at the path `{path}/ledger`.
874- #[ instrument( level = "trace" , skip_all, err) ]
875- pub fn with_config ( cfg : & RocksdbConfig ) -> Result < Self , anyhow:: Error > {
876- let path = cfg. path . join ( Self :: DB_PATH_SUFFIX ) ;
877- let raw_options = cfg. as_raw_options ( false ) ;
878- let tables = cfg
879- . column_families
880- . clone ( )
881- . unwrap_or_else ( || LEDGER_TABLES . iter ( ) . map ( |e| e. to_string ( ) ) . collect ( ) ) ;
882- let inner = DB :: open_transaction_db ( path, tables, & raw_options) ?;
883-
884- Ok ( Self {
885- db : Arc :: new ( inner) ,
886- } )
887- }
888-
889- /// Create a new transaction
890- pub fn transaction ( & self ) -> LedgerDBTransaction {
891- LedgerDBTransaction {
892- db : Arc :: clone ( & self . db ) ,
893- tx : self . db . transaction ( ) ,
894- }
895- }
896- }
897-
898- /// Low-level operations
899- /// 1. to put/get/delete data using KeyCodec/ValueCodec.
900- /// 2. to commit the transaction.
901- impl LedgerDBTransaction < ' _ > {
902- /// Put an instance by key and value.
903- pub fn put < S : Schema > (
904- & self ,
905- key : & impl KeyCodec < S > ,
906- value : & impl ValueCodec < S > ,
907- ) -> anyhow:: Result < ( ) > {
908- let start = Instant :: now ( ) ;
909-
910- let cf_handle = self . get_cf_handle ( S :: COLUMN_FAMILY_NAME ) ?;
911- let key = key. encode_key ( ) ?;
912- let value = value. encode_value ( ) ?;
913-
914- self . tx . put_cf ( cf_handle, key, value) ?;
915-
916- histogram ! ( "ledger_tx_put_latency_seconds" ) . record (
917- Instant :: now ( )
918- . saturating_duration_since ( start)
919- . as_secs_f64 ( ) ,
920- ) ;
921- Ok ( ( ) )
922- }
923-
924- /// Returns the handle for a rocksdb column family.
925- fn get_cf_handle ( & self , cf_name : & str ) -> anyhow:: Result < & rocksdb:: ColumnFamily > {
926- self . db . get_cf_handle ( cf_name)
927- }
928-
929- /// Get an instance by key.
930- pub fn get < S : Schema > (
931- & self ,
932- schema_key : & impl KeyCodec < S > ,
933- ) -> anyhow:: Result < Option < S :: Value > > {
934- let start = Instant :: now ( ) ;
935-
936- let k = schema_key. encode_key ( ) ?;
937- let cf_handle = self . get_cf_handle ( S :: COLUMN_FAMILY_NAME ) ?;
938-
939- let result = self . tx . get_pinned_cf ( cf_handle, k) ?;
940-
941- histogram ! ( "schemadb_get_bytes" , "cf_name" => S :: COLUMN_FAMILY_NAME )
942- . record ( result. as_ref ( ) . map_or ( 0.0 , |v| v. len ( ) as f64 ) ) ;
943-
944- let result = result
945- . map ( |raw_value| S :: Value :: decode_value ( & raw_value) )
946- . transpose ( )
947- . map_err ( |err| err. into ( ) ) ;
948-
949- histogram ! ( "schemadb_get_latency_seconds" , "cf_name" => S :: COLUMN_FAMILY_NAME ) . record (
950- Instant :: now ( )
951- . saturating_duration_since ( start)
952- . as_secs_f64 ( ) ,
953- ) ;
954- result
955- }
956-
957- /// Delete an instance by key.
958- pub fn delete < S : Schema > ( & self , key : & impl KeyCodec < S > ) -> anyhow:: Result < ( ) > {
959- let cf_handle = self . get_cf_handle ( S :: COLUMN_FAMILY_NAME ) ?;
960- let key = key. encode_key ( ) ?;
961- self . tx . delete_cf ( cf_handle, key) ?;
962-
963- Ok ( ( ) )
964- }
965-
966- /// Returns a [`SchemaIteratorTx`] on a certain schema with the provided read options and direction.
967- pub fn iter_with_direction < S : Schema > (
968- & self ,
969- opts : ReadOptions ,
970- direction : ScanDirection ,
971- ) -> anyhow:: Result < SchemaIteratorTx < S > > {
972- let cf_handle = self . get_cf_handle ( S :: COLUMN_FAMILY_NAME ) ?;
973- Ok ( SchemaIteratorTx :: new (
974- self . tx . raw_iterator_cf_opt ( cf_handle, opts) ,
975- direction,
976- ) )
977- }
978-
979- /// Returns a forward [`SchemaIteratorTx`] on a certain schema with the default read options.
980- pub fn iter < S : Schema > ( & self ) -> anyhow:: Result < SchemaIteratorTx < S > > {
981- let mut read_options = ReadOptions :: default ( ) ;
982- read_options. set_async_io ( true ) ;
983- self . iter_with_direction :: < S > ( read_options, ScanDirection :: Forward )
984- }
985-
986- /// Commit the transaction
987- pub fn commit ( self ) -> anyhow:: Result < ( ) > {
988- let start = Instant :: now ( ) ;
989- self . tx . commit ( ) ?;
990-
991- histogram ! ( "ledger_tx_commit_latency_seconds" ) . record (
992- Instant :: now ( )
993- . saturating_duration_since ( start)
994- . as_secs_f64 ( ) ,
995- ) ;
996- Ok ( ( ) )
997- }
998- }
999-
1000- /// Implementation of high-level ledger operations for LedgerDBTransaction.
1001- /// TODO: consider removing it and use raw put/get/delete methods instead.
1002- impl LedgerDBTransaction < ' _ > {
1003- /// Sets the state diff by block number
1004- #[ instrument( level = "trace" , skip( self ) , err, ret) ]
1005- pub fn set_state_diff (
1006- & self ,
1007- l2_height : L2BlockNumber ,
1008- state_diff : & StateDiff ,
1009- ) -> anyhow:: Result < ( ) > {
1010- self . put :: < StateDiffByBlockNumber > ( & l2_height, state_diff) ?;
1011-
1012- Ok ( ( ) )
1013- }
1014-
1015- /// Removes the state diff by block range
1016- #[ instrument( level = "trace" , skip( self ) , err, ret) ]
1017- pub fn delete_state_diff_by_range (
1018- & self ,
1019- l2_height_range : RangeInclusive < L2BlockNumber > ,
1020- ) -> anyhow:: Result < ( ) > {
1021- for l2_height in l2_height_range. start ( ) . 0 ..=l2_height_range. end ( ) . 0 {
1022- self . delete :: < StateDiffByBlockNumber > ( & L2BlockNumber ( l2_height) ) ?;
1023- }
1024-
1025- Ok ( ( ) )
1026- }
1027-
1028- /// Gets the state diff by block number
1029- #[ instrument( level = "trace" , skip( self ) , err, ret) ]
1030- pub fn get_state_diff ( & self , l2_height : L2BlockNumber ) -> Result < StateDiff , anyhow:: Error > {
1031- self . get :: < StateDiffByBlockNumber > ( & l2_height)
1032- . map ( |diff| diff. unwrap_or_default ( ) )
1033- }
1034-
1035- /// Put an L2 block into the transaction to be saved.
1036- #[ instrument( level = "trace" , skip( self ) , err) ]
1037- pub fn set_l2_block ( & self , l2_block : & StoredL2Block ) -> Result < ( ) , anyhow:: Error > {
1038- let l2_block_number = L2BlockNumber ( l2_block. height ) ;
1039- self . put :: < L2BlockByNumber > ( & l2_block_number, l2_block)
1040- . context ( "Failed to add L2BlockByNumber" ) ?;
1041- self . put :: < L2BlockByHash > ( & l2_block. hash , & l2_block_number)
1042- . context ( "Failed to add L2BlockByHash" ) ?;
1043-
1044- Ok ( ( ) )
1045- }
1046-
1047- /// Gets l2 block by number
1048- #[ instrument( level = "trace" , skip( self ) , err) ]
1049- fn get_l2_block_by_number (
1050- & self ,
1051- number : & L2BlockNumber ,
1052- ) -> Result < Option < StoredL2Block > , anyhow:: Error > {
1053- self . get :: < L2BlockByNumber > ( number)
1054- }
1055- }
0 commit comments