Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: fix inconsistent state in gossip syncer #9424

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 15 additions & 7 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ func (d *AuthenticatedGossiper) stop() {
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
peer lnpeer.Peer) chan error {

log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())

errChan := make(chan error, 1)

// For messages in the known set of channel series queries, we'll
Expand All @@ -830,9 +832,13 @@ func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,

// If we've found the message target, then we'll dispatch the
// message directly to it.
syncer.ProcessQueryMsg(m, peer.QuitSignal())
err := syncer.ProcessQueryMsg(m, peer.QuitSignal())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you explain why the chainSyncer remaining in the waitingQueryChanReply will cause the chan_announcments received from the peer directly to not be relayed ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah it's very intertwined...so we skip broadcasting channel anns from our peers here,

lnd/discovery/gossiper.go

Lines 1574 to 1580 in e0a920a

if newAnns != nil && shouldBroadcast {
// TODO(roasbeef): exclude peer that sent.
deDuped.AddMsgs(newAnns...)
} else if newAnns != nil {
log.Trace("Skipping broadcast of announcements received " +
"during initial graph sync")
}

and the shouldBroadcast is determined here,

lnd/discovery/gossiper.go

Lines 1533 to 1535 in e0a920a

// We should only broadcast this message forward if it originated from
// us or it wasn't received as part of our initial historical sync.
shouldBroadcast := !nMsg.isRemote || d.syncMgr.IsGraphSynced()

which relies on this method,

// IsGraphSynced determines whether we've completed our initial historical sync.
// The initial historical sync is done to ensure we've ingested as much of the
// public graph as possible.
func (m *SyncManager) IsGraphSynced() bool {
return atomic.LoadInt32(&m.initialHistoricalSyncCompleted) == 1
}

and the var initialHistoricalSyncCompleted is marked via,

// markGraphSynced allows us to report that the initial historical sync has
// completed.
func (m *SyncManager) markGraphSynced() {
atomic.StoreInt32(&m.initialHistoricalSyncCompleted, 1)
}

and the above method is called inside processChanRangeReply here,

lnd/discovery/syncer.go

Lines 951 to 954 in e0a920a

// Ensure that the sync manager becomes aware that the
// historical sync completed so synced_to_graph is updated over
// rpc.
g.cfg.markGraphSynced()

and the processChanRangeReply is called here,

lnd/discovery/syncer.go

Lines 528 to 537 in e0a920a

select {
case msg := <-g.gossipMsgs:
// The remote peer is sending a response to our
// initial query, we'll collate this response,
// and see if it's the final one in the series.
// If so, we can then transition to querying
// for the new channels.
queryReply, ok := msg.(*lnwire.ReplyChannelRange)
if ok {
err := g.processChanRangeReply(queryReply)

Note that it be blocked on case msg := <-g.gossipMsgs:, as the msg is never sent to the channel here,

lnd/discovery/syncer.go

Lines 1515 to 1532 in e0a920a

// Reply messages should only be expected in states where we're waiting
// for a reply.
case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
syncState := g.syncState()
if syncState != waitingQueryRangeReply &&
syncState != waitingQueryChanReply {
return fmt.Errorf("received unexpected query reply "+
"message %T", msg)
}
msgChan = g.gossipMsgs
default:
msgChan = g.gossipMsgs
}
select {
case msgChan <- msg:

As the above case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd: will error out due to the state of the syncer not being updated to waitingQueryRangeReply yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the detailed explanation, agree really nested.

if err != nil {
log.Errorf("Process query msg from peer %x got %v",
peer.PubKey(), err)
}

errChan <- nil
errChan <- err
return errChan

// If a peer is updating its current update horizon, then we'll dispatch
Expand Down Expand Up @@ -2371,7 +2377,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)

log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
"node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
nMsg.source.SerializeCompressed())

// We'll quickly ask the router if it already has a newer update for
// this node so we can skip validating signatures if not required.
Expand Down Expand Up @@ -2430,7 +2437,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
// TODO(roasbeef): get rid of the above

log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try get around to updating the gossiper to use structured logging. That way we only need to add all this to the context once and then can just log.DebugS(ctx, "Processed NodeAnnouncement")

"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
"node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
nMsg.source.SerializeCompressed())

return announcements, true
}
Expand Down Expand Up @@ -3034,9 +3042,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
edgeToUpdate = e2
}

log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
"edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
edgeToUpdate != nil)
log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
"edge policy=%v", chanInfo.ChannelID,
pubKey.SerializeCompressed(), edgeToUpdate != nil)

// Validate the channel announcement with the expected public key and
// channel capacity. In the case of an invalid channel update, we'll
Expand Down
4 changes: 2 additions & 2 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
s.setSyncState(chansSynced)
s.setSyncType(PassiveSync)

log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
s.syncState(), s.SyncType(), peer)
log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
s.syncState(), s.SyncType(), peer.PubKey())

return s
}
Expand Down
6 changes: 4 additions & 2 deletions discovery/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func randPeer(t *testing.T, quit chan struct{}) *mockPeer {
func peerWithPubkey(pk *btcec.PublicKey, quit chan struct{}) *mockPeer {
return &mockPeer{
pk: pk,
sentMsgs: make(chan lnwire.Message),
sentMsgs: make(chan lnwire.Message, 1),
quit: quit,
}
}
Expand Down Expand Up @@ -483,7 +483,9 @@ func TestSyncManagerWaitUntilInitialHistoricalSync(t *testing.T) {
// transition it to chansSynced to ensure the remaining syncers
// aren't started as active.
if i == 0 {
assertSyncerStatus(t, s, syncingChans, PassiveSync)
assertSyncerStatus(
t, s, waitingQueryRangeReply, PassiveSync,
)
continue
}

Expand Down
68 changes: 42 additions & 26 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,39 @@ func (g *GossipSyncer) Stop() {
})
}

// handleSyncingChans handles the state syncingChans for the GossipSyncer. When
// in this state, we will send a QueryChannelRange msg to our peer and advance
// the syncer's state to waitingQueryRangeReply.
func (g *GossipSyncer) handleSyncingChans() {
// Prepare the query msg.
queryRangeMsg, err := g.genChanRangeQuery(g.genHistoricalChanRangeQuery)
if err != nil {
log.Errorf("Unable to gen chan range query: %v", err)
return
}

// Acquire a lock so the following state transition is atomic.
//
// NOTE: We must lock the following steps as it's possible we get an
// immediate response (ReplyChannelRange) after sending the query msg.
// The response is handled in ProcessQueryMsg, which requires the
// current state to be waitingQueryRangeReply.
g.Lock()
defer g.Unlock()

// Send the msg to the remote peer, which is non-blocking as
// `sendToPeer` only queues the msg in Brontide.
err = g.cfg.sendToPeer(queryRangeMsg)
if err != nil {
log.Errorf("Unable to send chan range query: %v", err)
return
}

// With the message sent successfully, we'll transition into the next
// state where we wait for their reply.
g.setSyncState(waitingQueryRangeReply)
}

// channelGraphSyncer is the main goroutine responsible for ensuring that we
// properly channel graph state with the remote peer, and also that we only
// send them messages which actually pass their defined update horizon.
Expand All @@ -495,27 +528,7 @@ func (g *GossipSyncer) channelGraphSyncer() {
// understand, as we'll as responding to any other queries by
// them.
case syncingChans:
// If we're in this state, then we'll send the remote
// peer our opening QueryChannelRange message.
queryRangeMsg, err := g.genChanRangeQuery(
g.genHistoricalChanRangeQuery,
)
if err != nil {
log.Errorf("Unable to gen chan range "+
"query: %v", err)
return
}

err = g.cfg.sendToPeer(queryRangeMsg)
if err != nil {
log.Errorf("Unable to send chan range "+
"query: %v", err)
return
}

// With the message sent successfully, we'll transition
// into the next state where we wait for their reply.
g.setSyncState(waitingQueryRangeReply)
g.handleSyncingChans()

// In this state, we've sent out our initial channel range
// query and are waiting for the final response from the remote
Expand Down Expand Up @@ -1342,9 +1355,9 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
return err
}

log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
"end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
len(newUpdatestoSend))
log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
"start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
startTime, endTime, len(newUpdatestoSend))

// If we don't have any to send, then we can return early.
if len(newUpdatestoSend) == 0 {
Expand Down Expand Up @@ -1515,12 +1528,15 @@ func (g *GossipSyncer) ProcessQueryMsg(msg lnwire.Message, peerQuit <-chan struc
// Reply messages should only be expected in states where we're waiting
// for a reply.
case *lnwire.ReplyChannelRange, *lnwire.ReplyShortChanIDsEnd:
g.Lock()
syncState := g.syncState()
g.Unlock()

if syncState != waitingQueryRangeReply &&
syncState != waitingQueryChanReply {

return fmt.Errorf("received unexpected query reply "+
"message %T", msg)
return fmt.Errorf("unexpected msg %T received in "+
"state %v", msg, syncState)
}
msgChan = g.gossipMsgs

Expand Down
4 changes: 4 additions & 0 deletions docs/release-notes/release-notes-0.19.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@
This is a protocol gadget required for Dynamic Commitments and Splicing that
will be added later.

* [Fixed](https://github.com/lightningnetwork/lnd/pull/9424) a case where the
ziggie1984 marked this conversation as resolved.
Show resolved Hide resolved
initial historical sync may be blocked due to a race condition in handling the
syncer's internal state.

## Functional Enhancements
* [Add ability](https://github.com/lightningnetwork/lnd/pull/8998) to paginate
wallet transactions.
Expand Down
11 changes: 6 additions & 5 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,16 +1400,18 @@ func (b *Builder) processUpdate(msg interface{},
msg.ChannelID)
}

log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
edge1Timestamp, edge2Timestamp)

// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore, we first
// check if we already have the most up-to-date information for
// that edge. If this message has a timestamp not strictly
// newer than what we already know of we can exit early.
switch {
switch msg.ChannelFlags & lnwire.ChanUpdateDirection {
// A flag set of 0 indicates this is an announcement for the
// "first" node in the channel.
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 0:

case 0:
// Ignore outdated message.
if !edge1Timestamp.Before(msg.LastUpdate) {
return NewErrf(ErrOutdated, "Ignoring "+
Expand All @@ -1420,8 +1422,7 @@ func (b *Builder) processUpdate(msg interface{},

// Similarly, a flag set of 1 indicates this is an announcement
// for the "second" node in the channel.
case msg.ChannelFlags&lnwire.ChanUpdateDirection == 1:

case 1:
// Ignore outdated message.
if !edge2Timestamp.Before(msg.LastUpdate) {
return NewErrf(ErrOutdated, "Ignoring "+
Expand Down
4 changes: 4 additions & 0 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2797,6 +2797,10 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
tx, edge, c.graphCache,
)

if err != nil {
log.Errorf("UpdateEdgePolicy faild: %v", err)
}

// Silence ErrEdgeNotFound so that the batch can
// succeed, but propagate the error via local state.
if errors.Is(err, ErrEdgeNotFound) {
Expand Down
8 changes: 8 additions & 0 deletions graph/db/graph_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
var inboundFee lnwire.Fee
_, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
if err != nil {
log.Errorf("Failed to extract records from edge policy %v: %v",
policy.ChannelID, err)

return
}

Expand All @@ -236,11 +239,16 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,

updatePolicy := func(nodeKey route.Vertex) {
if len(c.nodeChannels[nodeKey]) == 0 {
log.Warnf("Node=%v not found in graph cache", nodeKey)

return
}

channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
if !ok {
log.Warnf("Channel=%v not found in graph cache",
policy.ChannelID)

return
}

Expand Down
8 changes: 8 additions & 0 deletions graph/db/models/channel_edge_policy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"fmt"
"time"

"github.com/btcsuite/btcd/btcec/v2/ecdsa"
Expand Down Expand Up @@ -113,3 +114,10 @@ func (c *ChannelEdgePolicy) ComputeFee(

return c.FeeBaseMSat + (amt*c.FeeProportionalMillionths)/feeRateParts
}

// String returns a human-readable version of the channel edge policy.
func (c *ChannelEdgePolicy) String() string {
return fmt.Sprintf("ChannelID=%v, MessageFlags=%v, ChannelFlags=%v, "+
"LastUpdate=%v", c.ChannelID, c.MessageFlags, c.ChannelFlags,
c.LastUpdate)
}
6 changes: 6 additions & 0 deletions routing/unified_edges.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (u *nodeEdgeUnifier) addPolicy(fromNode route.Vertex,
// Skip channels if there is an outgoing channel restriction.
if localChan && u.outChanRestr != nil {
if _, ok := u.outChanRestr[edge.ChannelID]; !ok {
log.Debugf("Skipped adding policy for restricted edge "+
"%v", edge.ChannelID)

return
}
}
Expand Down Expand Up @@ -100,6 +103,9 @@ func (u *nodeEdgeUnifier) addGraphPolicies(g Graph) error {
// Note that we are searching backwards so this node would have
// come prior to the pivot node in the route.
if channel.InPolicy == nil {
log.Debugf("Skipped adding edge %v due to nil policy",
channel.ChannelID)

return nil
}

Expand Down
Loading