diff --git a/discovery/gossiper.go b/discovery/gossiper.go index 9c51734396..23874fed29 100644 --- a/discovery/gossiper.go +++ b/discovery/gossiper.go @@ -808,6 +808,8 @@ func (d *AuthenticatedGossiper) stop() { func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, peer lnpeer.Peer) chan error { + log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey()) + errChan := make(chan error, 1) // For messages in the known set of channel series queries, we'll @@ -830,9 +832,13 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message, // If we've found the message target, then we'll dispatch the // message directly to it. - syncer.ProcessQueryMsg(m, peer.QuitSignal()) + err := syncer.ProcessQueryMsg(m, peer.QuitSignal()) + if err != nil { + log.Errorf("Process query msg from peer %x got %v", + peer.PubKey(), err) + } - errChan <- nil + errChan <- err return errChan // If a peer is updating its current update horizon, then we'll dispatch @@ -2371,7 +2377,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, timestamp := time.Unix(int64(nodeAnn.Timestamp), 0) log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+ - "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID, + nMsg.source.SerializeCompressed()) // We'll quickly ask the router if it already has a newer update for // this node so we can skip validating signatures if not required. @@ -2430,7 +2437,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg, // TODO(roasbeef): get rid of the above log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+ - "node=%x", nMsg.peer, timestamp, nodeAnn.NodeID) + "node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID, + nMsg.source.SerializeCompressed()) return announcements, true } @@ -3034,9 +3042,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg, edgeToUpdate = e2 } - log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+ - "edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(), - edgeToUpdate != nil) + log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+ + "edge policy=%v", chanInfo.ChannelID, + pubKey.SerializeCompressed(), edgeToUpdate != nil) // Validate the channel announcement with the expected public key and // channel capacity. In the case of an invalid channel update, we'll diff --git a/discovery/sync_manager.go b/discovery/sync_manager.go index 70d28784b8..9c589d638e 100644 --- a/discovery/sync_manager.go +++ b/discovery/sync_manager.go @@ -529,8 +529,8 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer { s.setSyncState(chansSynced) s.setSyncType(PassiveSync) - log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v", - s.syncState(), s.SyncType(), peer) + log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x", + s.syncState(), s.SyncType(), peer.PubKey()) return s } diff --git a/discovery/sync_manager_test.go b/discovery/sync_manager_test.go index f71d1728ec..f4e35bddd6 100644 --- a/discovery/sync_manager_test.go +++ b/discovery/sync_manager_test.go @@ -28,7 +28,7 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer { func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer { return &mockPeer{ pk: pk, - sentMsgs: make(chan lnwire.Message), + sentMsgs: make(chan lnwire.Message, 1), quit: quit, } } @@ -483,7 +483,9 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) { // transition it to chansSynced to ensure the remaining syncers // aren't started as active. if i == 0 { - assertSyncerStatus(t, s, syncingChans, PassiveSync) + assertSyncerStatus( + t, s, waitingQueryRangeReply, PassiveSync, + ) continue } diff --git a/discovery/syncer.go b/discovery/syncer.go index 745fda24b3..f0ce4ffcb5 100644 --- a/discovery/syncer.go +++ b/discovery/syncer.go @@ -475,6 +475,39 @@ func (g *GossipSyncer) Stop() { }) } +// handleSyncingChans handles the state syncingChans for the GossipSyncer. When +// in this state, we will send a QueryChannelRange msg to our peer and advance +// the syncer's state to waitingQueryRangeReply. +func (g *GossipSyncer) handleSyncingChans() { + // Prepare the query msg. + queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery) + if err != nil { + log.Errorf("Unable to gen chan range query: %v", err) + return + } + + // Acquire a lock so the following state transition is atomic. + // + // NOTE: We must lock the following steps as it's possible we get an + // immediate response (ReplyChannelRange) after sending the query msg. + // The response is handled in ProcessQueryMsg, which requires the + // current state to be waitingQueryRangeReply. + g.Lock() + defer g.Unlock() + + // Send the msg to the remote peer, which is non-blocking as + // `sendToPeer` only queues the msg in Brontide. + err = g.cfg.sendToPeer(queryRangeMsg) + if err != nil { + log.Errorf("Unable to send chan range query: %v", err) + return + } + + // With the message sent successfully, we'll transition into the next + // state where we wait for their reply. + g.setSyncState(waitingQueryRangeReply) +} + // channelGraphSyncer is the main goroutine responsible for ensuring that we // properly channel graph state with the remote peer, and also that we only // send them messages which actually pass their defined update horizon. @@ -495,27 +528,7 @@ func (g *GossipSyncer) channelGraphSyncer() { // understand, as we'll as responding to any other queries by // them. case syncingChans: - // If we're in this state, then we'll send the remote - // peer our opening QueryChannelRange message. - queryRangeMsg, err := g.genChanRangeQuery( - g.genHistoricalChanRangeQuery, - ) - if err != nil { - log.Errorf("Unable to gen chan range "+ - "query: %v", err) - return - } - - err = g.cfg.sendToPeer(queryRangeMsg) - if err != nil { - log.Errorf("Unable to send chan range "+ - "query: %v", err) - return - } - - // With the message sent successfully, we'll transition - // into the next state where we wait for their reply. - g.setSyncState(waitingQueryRangeReply) + g.handleSyncingChans() // In this state, we've sent out our initial channel range // query and are waiting for the final response from the remote @@ -1342,9 +1355,9 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er return err } - log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+ - "end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime, - len(newUpdatestoSend)) + log.Infof("GossipSyncer(%x): applying new remote update horizon: "+ + "start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:], + startTime, endTime, len(newUpdatestoSend)) // If we don't have any to send, then we can return early. if len(newUpdatestoSend) == 0 { @@ -1515,12 +1528,15 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc // Reply messages should only be expected in states where we're waiting // for a reply. case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: + g.Lock() syncState := g.syncState() + g.Unlock() + if syncState != waitingQueryRangeReply && syncState != waitingQueryChanReply { - return fmt.Errorf("received unexpected query reply "+ - "message %T", msg) + return fmt.Errorf("unexpected msg %T received in "+ + "state %v", msg, syncState) } msgChan = g.gossipMsgs diff --git a/docs/release-notes/release-notes-0.19.0.md b/docs/release-notes/release-notes-0.19.0.md index 52879fcc0e..ad994423f1 100644 --- a/docs/release-notes/release-notes-0.19.0.md +++ b/docs/release-notes/release-notes-0.19.0.md @@ -74,6 +74,10 @@ This is a protocol gadget required for Dynamic Commitments and Splicing that will be added later. +* [Fixed](https://github.com/lightningnetwork/lnd/pull/9424) a case where the + initial historical sync may be blocked due to a race condition in handling the + syncer's internal state. + ## Functional Enhancements * [Add ability](https://github.com/lightningnetwork/lnd/pull/8998) to paginate wallet transactions. diff --git a/graph/builder.go b/graph/builder.go index d6984af709..1aa9488007 100644 --- a/graph/builder.go +++ b/graph/builder.go @@ -1400,16 +1400,18 @@ func (b *Builder) processUpdate(msg interface{}, msg.ChannelID) } + log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v", + edge1Timestamp, edge2Timestamp) + // As edges are directional edge node has a unique policy for // the direction of the edge they control. Therefore, we first // check if we already have the most up-to-date information for // that edge. If this message has a timestamp not strictly // newer than what we already know of we can exit early. - switch { + switch msg.ChannelFlags & lnwire.ChanUpdateDirection { // A flag set of 0 indicates this is an announcement for the // "first" node in the channel. - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0: - + case 0: // Ignore outdated message. if !edge1Timestamp.Before(msg.LastUpdate) { return NewErrf(ErrOutdated, "Ignoring "+ @@ -1420,8 +1422,7 @@ func (b *Builder) processUpdate(msg interface{}, // Similarly, a flag set of 1 indicates this is an announcement // for the "second" node in the channel. - case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1: - + case 1: // Ignore outdated message. if !edge2Timestamp.Before(msg.LastUpdate) { return NewErrf(ErrOutdated, "Ignoring "+ diff --git a/graph/db/graph.go b/graph/db/graph.go index 2a91398c62..d5a876a79a 100644 --- a/graph/db/graph.go +++ b/graph/db/graph.go @@ -2797,6 +2797,10 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy, tx, edge, c.graphCache, ) + if err != nil { + log.Errorf("UpdateEdgePolicy faild: %v", err) + } + // Silence ErrEdgeNotFound so that the batch can // succeed, but propagate the error via local state. if errors.Is(err, ErrEdgeNotFound) { diff --git a/graph/db/graph_cache.go b/graph/db/graph_cache.go index 2b10a0a15a..a034e23af2 100644 --- a/graph/db/graph_cache.go +++ b/graph/db/graph_cache.go @@ -228,6 +228,9 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode, var inboundFee lnwire.Fee _, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee) if err != nil { + log.Errorf("Failed to extract records from edge policy %v: %v", + policy.ChannelID, err) + return } @@ -236,11 +239,16 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode, updatePolicy := func(nodeKey route.Vertex) { if len(c.nodeChannels[nodeKey]) == 0 { + log.Warnf("Node=%v not found in graph cache", nodeKey) + return } channel, ok := c.nodeChannels[nodeKey][policy.ChannelID] if !ok { + log.Warnf("Channel=%v not found in graph cache", + policy.ChannelID) + return } diff --git a/graph/db/models/channel_edge_policy.go b/graph/db/models/channel_edge_policy.go index 322ce3cd09..365acbfe13 100644 --- a/graph/db/models/channel_edge_policy.go +++ b/graph/db/models/channel_edge_policy.go @@ -1,6 +1,7 @@ package models import ( + "fmt" "time" "github.com/btcsuite/btcd/btcec/v2/ecdsa" @@ -113,3 +114,10 @@ func (c *ChannelEdgePolicy) ComputeFee( return c.FeeBaseMSat + (amt*c.FeeProportionalMillionths)/feeRateParts } + +// String returns a human-readable version of the channel edge policy. +func (c *ChannelEdgePolicy) String() string { + return fmt.Sprintf("ChannelID=%v, MessageFlags=%v, ChannelFlags=%v, "+ + "LastUpdate=%v", c.ChannelID, c.MessageFlags, c.ChannelFlags, + c.LastUpdate) +} diff --git a/routing/unified_edges.go b/routing/unified_edges.go index c2e008e473..c0e1c10440 100644 --- a/routing/unified_edges.go +++ b/routing/unified_edges.go @@ -59,6 +59,9 @@ func (u *nodeEdgeUnifier) addPolicy(fromNode route.Vertex, // Skip channels if there is an outgoing channel restriction. if localChan && u.outChanRestr != nil { if _, ok := u.outChanRestr[edge.ChannelID]; !ok { + log.Debugf("Skipped adding policy for restricted edge "+ + "%v", edge.ChannelID) + return } } @@ -100,6 +103,9 @@ func (u *nodeEdgeUnifier) addGraphPolicies(g Graph) error { // Note that we are searching backwards so this node would have // come prior to the pivot node in the route. if channel.InPolicy == nil { + log.Debugf("Skipped adding edge %v due to nil policy", + channel.ChannelID) + return nil }