From 001e5599b6d67d28b2b1a256e3d46a4a480bcb93 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 17 Jan 2025 00:17:23 +0800 Subject: [PATCH 1/6] multi: add debug logs for edge policy flow This commit adds more logs around the ChannelUpdate->edge policy process flow. --- discovery/gossiper.go | 14 +++++++++----- discovery/sync_manager.go | 4 ++-- discovery/syncer.go | 6 +++--- graph/builder.go | 3 +++ graph/db/graph.go | 4 ++++ graph/db/graph_cache.go | 8 ++++++++ graph/db/models/channel_edge_policy.go | 8 ++++++++ routing/unified_edges.go | 6 ++++++ 8 files changed, 43 insertions(+), 10 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 9c51734396..ef5c55bd67 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -808,6 +808,8 @@ func (d *AuthenticatedGossiper) stop() { func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, peer lnpeer.Peer) chan error { + log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey()) + errChan := make(chan error, 1) // For messages in the known set of channel series queries, we'll @@ -2371,7 +2373,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, timestamp := time.Unix(int64(nodeAnn.Timestamp), 0) log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+ - "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID, + nMsg.source.SerializeCompressed()) // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. @@ -2430,7 +2433,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, // TODO(roasbeef): get rid of the above log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+ - "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID, + nMsg.source.SerializeCompressed()) return announcements, true } @@ -3034,9 +3038,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, edgeToUpdate = e2 } - log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+ - "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(), - edgeToUpdate != nil) + log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+ + "edge policy=%v", chanInfo.ChannelID, + pubKey.SerializeCompressed(), edgeToUpdate != nil) // Validate the channel announcement with the expected public key and // channel capacity. In the case of an invalid channel update, we'll diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 70d28784b8..9c589d638e 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -529,8 +529,8 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { s.setSyncState(chansSynced) s.setSyncType(PassiveSync) - log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v", - s.syncState(), s.SyncType(), peer) + log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x", + s.syncState(), s.SyncType(), peer.PubKey()) return s } diff --git a/discovery/syncer.go b/discovery/syncer.go index 745fda24b3..eda757ceed 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -1342,9 +1342,9 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er return err } - log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+ - "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime, - len(newUpdatestoSend)) + log.Infof("GossipSyncer(%x): applying new remote update horizon: "+ + "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:], + startTime, endTime, len(newUpdatestoSend)) // If we don't have any to send, then we can return early. if len(newUpdatestoSend) == 0 { diff --git a/graph/builder.go b/graph/builder.go index d6984af709..0eb8b59337 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -1400,6 +1400,9 @@ func (b *Builder) processUpdate(msg interface{}, msg.ChannelID) } + log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", + edge1Timestamp, edge2Timestamp) + // As edges are directional edge node has a unique policy for // the direction of the edge they control. Therefore, we first // check if we already have the most up-to-date information for diff --git a/graph/db/graph.go b/graph/db/graph.go index 2a91398c62..d5a876a79a 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -2797,6 +2797,10 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, tx, edge, c.graphCache, ) + if err != nil { + log.Errorf("UpdateEdgePolicy faild: %v", err) + } + // Silence ErrEdgeNotFound so that the batch can // succeed, but propagate the error via local state. if errors.Is(err, ErrEdgeNotFound) { diff --git a/graph/db/graph_cache.go b/graph/db/graph_cache.go index 2b10a0a15a..a034e23af2 100644 --- a/graph/db/graph_cache.go +++ b/graph/db/graph_cache.go @@ -228,6 +228,9 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode, var inboundFee lnwire.Fee _, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee) if err != nil { + log.Errorf("Failed to extract records from edge policy %v: %v", + policy.ChannelID, err) + return } @@ -236,11 +239,16 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode, updatePolicy := func(nodeKey route.Vertex) { if len(c.nodeChannels[nodeKey]) == 0 { + log.Warnf("Node=%v not found in graph cache", nodeKey) + return } channel, ok := c.nodeChannels[nodeKey][policy.ChannelID] if !ok { + log.Warnf("Channel=%v not found in graph cache", + policy.ChannelID) + return } diff --git a/graph/db/models/channel_edge_policy.go b/graph/db/models/channel_edge_policy.go index 322ce3cd09..365acbfe13 100644 --- a/graph/db/models/channel_edge_policy.go +++ b/graph/db/models/channel_edge_policy.go @@ -1,6 +1,7 @@ package models import ( + "fmt" "time" "github.com/btcsuite/btcd/btcec/v2/ecdsa" @@ -113,3 +114,10 @@ func (c *ChannelEdgePolicy) ComputeFee( return c.FeeBaseMSat + (amt*c.FeeProportionalMillionths)/feeRateParts } + +// String returns a human-readable version of the channel edge policy. +func (c *ChannelEdgePolicy) String() string { + return fmt.Sprintf("ChannelID=%v, MessageFlags=%v, ChannelFlags=%v, "+ + "LastUpdate=%v", c.ChannelID, c.MessageFlags, c.ChannelFlags, + c.LastUpdate) +} diff --git a/routing/unified_edges.go b/routing/unified_edges.go index c2e008e473..c0e1c10440 100644 --- a/routing/unified_edges.go +++ b/routing/unified_edges.go @@ -59,6 +59,9 @@ func (u *nodeEdgeUnifier) addPolicy(fromNode route.Vertex, // Skip channels if there is an outgoing channel restriction. if localChan && u.outChanRestr != nil { if _, ok := u.outChanRestr[edge.ChannelID]; !ok { + log.Debugf("Skipped adding policy for restricted edge "+ + "%v", edge.ChannelID) + return } } @@ -100,6 +103,9 @@ func (u *nodeEdgeUnifier) addGraphPolicies(g Graph) error { // Note that we are searching backwards so this node would have // come prior to the pivot node in the route. if channel.InPolicy == nil { + log.Debugf("Skipped adding edge %v due to nil policy", + channel.ChannelID) + return nil } From eb2b0c783fdbf635140d1caae779bf91ca7f8fe7 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 17 Jan 2025 00:18:47 +0800 Subject: [PATCH 2/6] graph: fix `staticcheck` suggestion From staticcheck: QF1002 - Convert untagged switch to tagged switch. --- graph/builder.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/graph/builder.go b/graph/builder.go index 0eb8b59337..1aa9488007 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -1408,11 +1408,10 @@ func (b *Builder) processUpdate(msg interface{}, // check if we already have the most up-to-date information for // that edge. If this message has a timestamp not strictly // newer than what we already know of we can exit early. - switch { + switch msg.ChannelFlags & lnwire.ChanUpdateDirection { // A flag set of 0 indicates this is an announcement for the // "first" node in the channel. - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - + case 0: // Ignore outdated message. if !edge1Timestamp.Before(msg.LastUpdate) { return NewErrf(ErrOutdated, "Ignoring "+ @@ -1423,8 +1422,7 @@ func (b *Builder) processUpdate(msg interface{}, // Similarly, a flag set of 1 indicates this is an announcement // for the "second" node in the channel. - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - + case 1: // Ignore outdated message. if !edge2Timestamp.Before(msg.LastUpdate) { return NewErrf(ErrOutdated, "Ignoring "+ From 4b30b09d1cb772e726ea6de89c93ddffeee38bb9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Jan 2025 22:36:09 +0800 Subject: [PATCH 3/6] discovery: add new method `handleSyncingChans` This is a pure refactor to add a dedicated handler when the gossiper is in state syncingChans. --- discovery/syncer.go | 44 +++++++++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/discovery/syncer.go b/discovery/syncer.go index eda757ceed..e98281d1c2 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -475,6 +475,28 @@ func (g *GossipSyncer) Stop() { }) } +// handleSyncingChans handles the state syncingChans for the GossipSyncer. When +// in this state, we will send a QueryChannelRange msg to our peer and advance +// the syncer's state to waitingQueryRangeReply. +func (g *GossipSyncer) handleSyncingChans() { + // Prepare the query msg. + queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery) + if err != nil { + log.Errorf("Unable to gen chan range query: %v", err) + return + } + + err = g.cfg.sendToPeer(queryRangeMsg) + if err != nil { + log.Errorf("Unable to send chan range query: %v", err) + return + } + + // With the message sent successfully, we'll transition into the next + // state where we wait for their reply. + g.setSyncState(waitingQueryRangeReply) +} + // channelGraphSyncer is the main goroutine responsible for ensuring that we // properly channel graph state with the remote peer, and also that we only // send them messages which actually pass their defined update horizon. @@ -495,27 +517,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // understand, as we'll as responding to any other queries by // them. case syncingChans: - // If we're in this state, then we'll send the remote - // peer our opening QueryChannelRange message. - queryRangeMsg, err := g.genChanRangeQuery( - g.genHistoricalChanRangeQuery, - ) - if err != nil { - log.Errorf("Unable to gen chan range "+ - "query: %v", err) - return - } - - err = g.cfg.sendToPeer(queryRangeMsg) - if err != nil { - log.Errorf("Unable to send chan range "+ - "query: %v", err) - return - } - - // With the message sent successfully, we'll transition - // into the next state where we wait for their reply. - g.setSyncState(waitingQueryRangeReply) + g.handleSyncingChans() // In this state, we've sent out our initial channel range // query and are waiting for the final response from the remote From 9fecfed3b5ba41dd0666fbbbf46d9eba52306129 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Thu, 16 Jan 2025 22:49:25 +0800 Subject: [PATCH 4/6] discovery: fix race access to syncer's state This commit fixes the following race, 1. syncer(state=syncingChans) sends QueryChannelRange 2. remote peer replies ReplyChannelRange 3. ProcessQueryMsg fails to process the remote peer's msg as its state is neither waitingQueryChanReply nor waitingQueryRangeReply. 4. syncer marks its new state waitingQueryChanReply, but too late. The historical sync will now fail, and the syncer will be stuck at this state. What's worse is it cannot forward channel announcements to other connected peers now as it will skip the broadcasting during initial graph sync. This is now fixed to make sure the following two steps are atomic, 1. syncer(state=syncingChans) sends QueryChannelRange 2. syncer marks its new state waitingQueryChanReply. --- discovery/gossiper.go | 8 ++++++-- discovery/syncer.go | 16 ++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/discovery/gossiper.go b/discovery/gossiper.go index ef5c55bd67..23874fed29 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -832,9 +832,13 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // If we've found the message target, then we'll dispatch the // message directly to it. - syncer.ProcessQueryMsg(m, peer.QuitSignal()) + err := syncer.ProcessQueryMsg(m, peer.QuitSignal()) + if err != nil { + log.Errorf("Process query msg from peer %x got %v", + peer.PubKey(), err) + } - errChan <- nil + errChan <- err return errChan // If a peer is updating its current update horizon, then we'll dispatch diff --git a/discovery/syncer.go b/discovery/syncer.go index e98281d1c2..79794271a0 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -486,6 +486,15 @@ func (g *GossipSyncer) handleSyncingChans() { return } + // Acquire a lock so the following state transition is atomic. + // + // NOTE: We must lock the following steps as it's possible we get an + // immediate response (ReplyChannelRange) after sending the query msg. + // The response is handled in ProcessQueryMsg, which requires the + // current state to be waitingQueryRangeReply. + g.Lock() + defer g.Unlock() + err = g.cfg.sendToPeer(queryRangeMsg) if err != nil { log.Errorf("Unable to send chan range query: %v", err) @@ -1517,12 +1526,15 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc // Reply messages should only be expected in states where we're waiting // for a reply. case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: + g.Lock() syncState := g.syncState() + g.Unlock() + if syncState != waitingQueryRangeReply && syncState != waitingQueryChanReply { - return fmt.Errorf("received unexpected query reply "+ - "message %T", msg) + return fmt.Errorf("unexpected msg %T received in "+ + "state %v", msg, syncState) } msgChan = g.gossipMsgs From 772a9d5f42a90c7c7dc8a8c9b713cb371226dfa8 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 17 Jan 2025 17:56:22 +0800 Subject: [PATCH 5/6] discovery: fix mocked peer in unit tests The mocked peer used here blocks on `sendToPeer`, which is not the behavior of the `SendMessageLazy` of `lnpeer.Peer`. To reflect the reality, we now make sure the `sendToPeer` is non-blocking in the tests. --- discovery/sync_manager_test.go | 6 ++++-- discovery/syncer.go | 2 ++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index f71d1728ec..f4e35bddd6 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -28,7 +28,7 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer { func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer { return &mockPeer{ pk: pk, - sentMsgs: make(chan lnwire.Message), + sentMsgs: make(chan lnwire.Message, 1), quit: quit, } } @@ -483,7 +483,9 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) { // transition it to chansSynced to ensure the remaining syncers // aren't started as active. if i == 0 { - assertSyncerStatus(t, s, syncingChans, PassiveSync) + assertSyncerStatus( + t, s, waitingQueryRangeReply, PassiveSync, + ) continue } diff --git a/discovery/syncer.go b/discovery/syncer.go index 79794271a0..f0ce4ffcb5 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -495,6 +495,8 @@ func (g *GossipSyncer) handleSyncingChans() { g.Lock() defer g.Unlock() + // Send the msg to the remote peer, which is non-blocking as + // `sendToPeer` only queues the msg in Brontide. err = g.cfg.sendToPeer(queryRangeMsg) if err != nil { log.Errorf("Unable to send chan range query: %v", err) From 56ff6d1fe03d6c49888ec31345808ddf1217c8c9 Mon Sep 17 00:00:00 2001 From: yyforyongyu Date: Fri, 17 Jan 2025 18:58:20 +0800 Subject: [PATCH 6/6] docs: update release notes --- docs/release-notes/release-notes-0.19.0.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 52879fcc0e..ad994423f1 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -74,6 +74,10 @@ This is a protocol gadget required for Dynamic Commitments and Splicing that will be added later. +* [Fixed](https://github.com/lightningnetwork/lnd/pull/9424) a case where the + initial historical sync may be blocked due to a race condition in handling the + syncer's internal state. + ## Functional Enhancements * [Add ability](https://github.com/lightningnetwork/lnd/pull/8998) to paginate wallet transactions.