Skip to content

Commit c865194

Browse files
committed
add store pub-sub
Add StoreWithPubSub to emit store update events. It uses ractor OutputPort for pub-sub. This is a rework of StoreWithHooks in nervosnetwork#511 - Use decorator pattern to add additional features on Store - Minimize the interface change that original codes relying on Store traits dot not need to change. - Avoid using an Actor to fetch extra data asynchronously. This resolves the issue mentioned in nervosnetwork#615 (comment) . The channel actor cleans preimage on success payment automatically. Thus we may fail to get the preimage when settling the cch payment in the BTC end.
1 parent 5832f70 commit c865194

File tree

16 files changed

+1125
-8
lines changed

16 files changed

+1125
-8
lines changed

crates/fiber-bin/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use fnn::rpc::watchtower::{
1414
UpdateLocalSettlementParams, UpdateRevocationParams, WatchtowerRpcClient,
1515
};
1616
use fnn::store::Store;
17+
use fnn::store::StoreWithPubSub;
1718
use fnn::tasks::{
1819
cancel_tasks_and_wait_for_completion, new_tokio_cancellation_token, new_tokio_task_tracker,
1920
};
@@ -84,6 +85,7 @@ pub async fn main() -> Result<(), ExitMessage> {
8485
.store_path();
8586

8687
let store = Store::new(store_path).map_err(|err| ExitMessage(err.to_string()))?;
88+
let store = StoreWithPubSub::new(store);
8789

8890
let tracker = new_tokio_task_tracker();
8991
let token = new_tokio_cancellation_token();

crates/fiber-lib/src/fiber/channel.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7599,6 +7599,62 @@ pub trait ChannelActorStateStore {
75997599
fn get_payment_custom_records(&self, payment_hash: &Hash256) -> Option<PaymentCustomRecords>;
76007600
}
76017601

7602+
/// Used for delegating the store trait
7603+
pub trait ChannelActorStateStoreDeref {
7604+
type Target: ChannelActorStateStore;
7605+
fn channel_actor_state_store_deref(&self) -> &Self::Target;
7606+
}
7607+
7608+
impl<T: ChannelActorStateStoreDeref> ChannelActorStateStore for T {
7609+
fn get_channel_actor_state(&self, id: &Hash256) -> Option<ChannelActorState> {
7610+
self.channel_actor_state_store_deref()
7611+
.get_channel_actor_state(id)
7612+
}
7613+
fn insert_channel_actor_state(&self, state: ChannelActorState) {
7614+
self.channel_actor_state_store_deref()
7615+
.insert_channel_actor_state(state);
7616+
}
7617+
fn delete_channel_actor_state(&self, id: &Hash256) {
7618+
self.channel_actor_state_store_deref()
7619+
.delete_channel_actor_state(id);
7620+
}
7621+
fn get_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256> {
7622+
self.channel_actor_state_store_deref()
7623+
.get_channel_ids_by_peer(peer_id)
7624+
}
7625+
fn get_active_channel_ids_by_peer(&self, peer_id: &PeerId) -> Vec<Hash256> {
7626+
self.channel_actor_state_store_deref()
7627+
.get_active_channel_ids_by_peer(peer_id)
7628+
}
7629+
fn get_channel_states(&self, peer_id: Option<PeerId>) -> Vec<(PeerId, Hash256, ChannelState)> {
7630+
self.channel_actor_state_store_deref()
7631+
.get_channel_states(peer_id)
7632+
}
7633+
fn get_active_channel_states(
7634+
&self,
7635+
peer_id: Option<PeerId>,
7636+
) -> Vec<(PeerId, Hash256, ChannelState)> {
7637+
self.channel_actor_state_store_deref()
7638+
.get_active_channel_states(peer_id)
7639+
}
7640+
fn get_channel_state_by_outpoint(&self, id: &OutPoint) -> Option<ChannelActorState> {
7641+
self.channel_actor_state_store_deref()
7642+
.get_channel_state_by_outpoint(id)
7643+
}
7644+
fn insert_payment_custom_records(
7645+
&self,
7646+
payment_hash: &Hash256,
7647+
custom_records: PaymentCustomRecords,
7648+
) {
7649+
self.channel_actor_state_store_deref()
7650+
.insert_payment_custom_records(payment_hash, custom_records);
7651+
}
7652+
fn get_payment_custom_records(&self, payment_hash: &Hash256) -> Option<PaymentCustomRecords> {
7653+
self.channel_actor_state_store_deref()
7654+
.get_payment_custom_records(payment_hash)
7655+
}
7656+
}
7657+
76027658
/// A wrapper on CommitmentTransaction that has a partial signature along with
76037659
/// the ckb transaction.
76047660
#[derive(Clone, Debug)]

crates/fiber-lib/src/fiber/gossip.rs

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,131 @@ pub trait GossipMessageStore {
242242
fn delete_channel_timestamps(&self, outpoint: &OutPoint);
243243
}
244244

245+
/// Used for delegating the store trait
246+
pub trait GossipMessageStoreDeref {
247+
type Target: GossipMessageStore;
248+
fn gossip_message_store_deref(&self) -> &Self::Target;
249+
}
250+
251+
impl<T: GossipMessageStoreDeref> GossipMessageStore for T {
252+
fn get_broadcast_messages_iter(
253+
&self,
254+
after_cursor: &Cursor,
255+
) -> impl IntoIterator<Item = BroadcastMessageWithTimestamp> {
256+
self.gossip_message_store_deref()
257+
.get_broadcast_messages_iter(after_cursor)
258+
}
259+
260+
fn get_broadcast_messages(
261+
&self,
262+
after_cursor: &Cursor,
263+
count: Option<u16>,
264+
) -> Vec<BroadcastMessageWithTimestamp> {
265+
self.gossip_message_store_deref()
266+
.get_broadcast_messages(after_cursor, count)
267+
}
268+
269+
fn query_broadcast_messages<I: IntoIterator<Item = BroadcastMessageQuery>>(
270+
&self,
271+
queries: I,
272+
) -> (Vec<BroadcastMessageWithTimestamp>, Vec<u16>) {
273+
self.gossip_message_store_deref()
274+
.query_broadcast_messages(queries)
275+
}
276+
277+
fn query_broadcast_message(
278+
&self,
279+
query: BroadcastMessageQuery,
280+
) -> Option<BroadcastMessageWithTimestamp> {
281+
self.gossip_message_store_deref()
282+
.query_broadcast_message(query)
283+
}
284+
285+
fn get_broadcast_message_with_cursor(
286+
&self,
287+
cursor: &Cursor,
288+
) -> Option<BroadcastMessageWithTimestamp> {
289+
self.gossip_message_store_deref()
290+
.get_broadcast_message_with_cursor(cursor)
291+
}
292+
293+
fn get_latest_broadcast_message_cursor(&self) -> Option<Cursor> {
294+
self.gossip_message_store_deref()
295+
.get_latest_broadcast_message_cursor()
296+
}
297+
298+
fn get_latest_channel_announcement_timestamp(&self, outpoint: &OutPoint) -> Option<u64> {
299+
self.gossip_message_store_deref()
300+
.get_latest_channel_announcement_timestamp(outpoint)
301+
}
302+
303+
fn get_latest_channel_update_timestamp(
304+
&self,
305+
outpoint: &OutPoint,
306+
is_node1: bool,
307+
) -> Option<u64> {
308+
self.gossip_message_store_deref()
309+
.get_latest_channel_update_timestamp(outpoint, is_node1)
310+
}
311+
312+
fn get_latest_node_announcement_timestamp(&self, pk: &Pubkey) -> Option<u64> {
313+
self.gossip_message_store_deref()
314+
.get_latest_node_announcement_timestamp(pk)
315+
}
316+
317+
fn get_latest_channel_announcement(
318+
&self,
319+
outpoint: &OutPoint,
320+
) -> Option<(u64, ChannelAnnouncement)> {
321+
self.gossip_message_store_deref()
322+
.get_latest_channel_announcement(outpoint)
323+
}
324+
325+
fn get_latest_channel_update(
326+
&self,
327+
outpoint: &OutPoint,
328+
is_node1: bool,
329+
) -> Option<ChannelUpdate> {
330+
self.gossip_message_store_deref()
331+
.get_latest_channel_update(outpoint, is_node1)
332+
}
333+
334+
fn get_latest_node_announcement(&self, pk: &Pubkey) -> Option<NodeAnnouncement> {
335+
self.gossip_message_store_deref()
336+
.get_latest_node_announcement(pk)
337+
}
338+
339+
fn delete_broadcast_message(&self, cursor: &Cursor) {
340+
self.gossip_message_store_deref()
341+
.delete_broadcast_message(cursor);
342+
}
343+
344+
fn save_channel_announcement(&self, timestamp: u64, channel_announcement: ChannelAnnouncement) {
345+
self.gossip_message_store_deref()
346+
.save_channel_announcement(timestamp, channel_announcement);
347+
}
348+
349+
fn save_channel_update(&self, channel_update: ChannelUpdate) {
350+
self.gossip_message_store_deref()
351+
.save_channel_update(channel_update)
352+
}
353+
354+
fn save_node_announcement(&self, node_announcement: NodeAnnouncement) {
355+
self.gossip_message_store_deref()
356+
.save_node_announcement(node_announcement);
357+
}
358+
359+
fn get_channel_timestamps_iter(&self) -> impl IntoIterator<Item = (OutPoint, [u64; 3])> {
360+
self.gossip_message_store_deref()
361+
.get_channel_timestamps_iter()
362+
}
363+
364+
fn delete_channel_timestamps(&self, outpoint: &OutPoint) {
365+
self.gossip_message_store_deref()
366+
.delete_channel_timestamps(outpoint);
367+
}
368+
}
369+
245370
// A batch of gossip messages has been added to the store since the last time
246371
// we pulled new messages/messages are pushed to us.
247372
#[derive(Clone, Debug)]

crates/fiber-lib/src/fiber/graph.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1567,7 +1567,7 @@ pub trait NetworkGraphStateStore {
15671567
}
15681568

15691569
/// The status of a payment, will update as the payment progresses.
1570-
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
1570+
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
15711571
pub enum PaymentSessionStatus {
15721572
/// initial status, payment session is created, no HTLC is sent
15731573
Created,

crates/fiber-lib/src/fiber/network.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2337,6 +2337,23 @@ pub trait NetworkActorStateStore {
23372337
fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState);
23382338
}
23392339

2340+
/// Used for delegating the store trait
2341+
pub trait NetworkActorStateStoreDeref {
2342+
type Target: NetworkActorStateStore;
2343+
fn network_actor_state_store_deref(&self) -> &Self::Target;
2344+
}
2345+
2346+
impl<T: NetworkActorStateStoreDeref> NetworkActorStateStore for T {
2347+
fn get_network_actor_state(&self, id: &PeerId) -> Option<PersistentNetworkActorState> {
2348+
self.network_actor_state_store_deref()
2349+
.get_network_actor_state(id)
2350+
}
2351+
fn insert_network_actor_state(&self, id: &PeerId, state: PersistentNetworkActorState) {
2352+
self.network_actor_state_store_deref()
2353+
.insert_network_actor_state(id, state);
2354+
}
2355+
}
2356+
23402357
static CHANNEL_ACTOR_NAME_PREFIX: AtomicU64 = AtomicU64::new(0u64);
23412358

23422359
// ractor requires that the actor name is unique, so we add a prefix to the actor name.

crates/fiber-lib/src/invoice/store.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,3 +29,29 @@ pub trait PreimageStore {
2929
/// Search for the stored preimage with the given payment hash prefix, should be the first 20 bytes of the payment hash.
3030
fn search_preimage(&self, payment_hash_prefix: &[u8]) -> Option<Hash256>;
3131
}
32+
33+
/// Used for delegating the store trait
34+
pub trait PreimageStoreDeref {
35+
type Target: PreimageStore;
36+
fn preimage_store_deref(&self) -> &Self::Target;
37+
}
38+
39+
impl<T: PreimageStoreDeref> PreimageStore for T {
40+
fn insert_preimage(&self, payment_hash: Hash256, preimage: Hash256) {
41+
self.preimage_store_deref()
42+
.insert_preimage(payment_hash, preimage);
43+
}
44+
45+
fn remove_preimage(&self, payment_hash: &Hash256) {
46+
self.preimage_store_deref().remove_preimage(payment_hash);
47+
}
48+
49+
fn get_preimage(&self, payment_hash: &Hash256) -> Option<Hash256> {
50+
self.preimage_store_deref().get_preimage(payment_hash)
51+
}
52+
53+
fn search_preimage(&self, payment_hash_prefix: &[u8]) -> Option<Hash256> {
54+
self.preimage_store_deref()
55+
.search_preimage(payment_hash_prefix)
56+
}
57+
}

crates/fiber-lib/src/store/mod.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ pub mod db_migrate;
22
pub mod migration;
33
mod schema;
44
pub mod store_impl;
5-
pub use store_impl::Store;
5+
pub mod subscription;
6+
7+
pub use store_impl::{Store, StoreWithPubSub};
8+
69
#[cfg(test)]
710
mod tests;

crates/fiber-lib/src/store/store_impl/browser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
pub struct Store {}
2+
pub struct StoreWithPubSub {}
23
pub struct Batch {}
34
pub enum IteratorMode<'a> {
45
Start,

0 commit comments

Comments
 (0)