Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: fix inconsistent state in gossip syncer #9424

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions discovery/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,8 @@ func (d *AuthenticatedGossiper) stop() {
func (d *AuthenticatedGossiper) ProcessRemoteAnnouncement(msg lnwire.Message,
peer lnpeer.Peer) chan error {

log.Debugf("Processing remote msg %T from peer=%x", msg, peer.PubKey())

errChan := make(chan error, 1)

// For messages in the known set of channel series queries, we'll
Expand Down Expand Up @@ -2371,7 +2373,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
timestamp := time.Unix(int64(nodeAnn.Timestamp), 0)

log.Debugf("Processing NodeAnnouncement: peer=%v, timestamp=%v, "+
"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
"node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
nMsg.source.SerializeCompressed())

// We'll quickly ask the router if it already has a newer update for
// this node so we can skip validating signatures if not required.
Expand Down Expand Up @@ -2430,7 +2433,8 @@ func (d *AuthenticatedGossiper) handleNodeAnnouncement(nMsg *networkMsg,
// TODO(roasbeef): get rid of the above

log.Debugf("Processed NodeAnnouncement: peer=%v, timestamp=%v, "+
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try get around to updating the gossiper to use structured logging. That way we only need to add all this to the context once and then can just log.DebugS(ctx, "Processed NodeAnnouncement")

"node=%x", nMsg.peer, timestamp, nodeAnn.NodeID)
"node=%x, source=%x", nMsg.peer, timestamp, nodeAnn.NodeID,
nMsg.source.SerializeCompressed())

return announcements, true
}
Expand Down Expand Up @@ -3034,9 +3038,9 @@ func (d *AuthenticatedGossiper) handleChanUpdate(nMsg *networkMsg,
edgeToUpdate = e2
}

log.Debugf("Validating ChannelUpdate: channel=%v, from node=%x, has "+
"edge=%v", chanInfo.ChannelID, pubKey.SerializeCompressed(),
edgeToUpdate != nil)
log.Debugf("Validating ChannelUpdate: channel=%v, for node=%x, has "+
"edge policy=%v", chanInfo.ChannelID,
pubKey.SerializeCompressed(), edgeToUpdate != nil)

// Validate the channel announcement with the expected public key and
// channel capacity. In the case of an invalid channel update, we'll
Expand Down
4 changes: 2 additions & 2 deletions discovery/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,8 +529,8 @@ func (m *SyncManager) createGossipSyncer(peer lnpeer.Peer) *GossipSyncer {
s.setSyncState(chansSynced)
s.setSyncType(PassiveSync)

log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%v",
s.syncState(), s.SyncType(), peer)
log.Debugf("Created new GossipSyncer[state=%s type=%s] for peer=%x",
s.syncState(), s.SyncType(), peer.PubKey())

return s
}
Expand Down
6 changes: 3 additions & 3 deletions discovery/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1342,9 +1342,9 @@ func (g *GossipSyncer) ApplyGossipFilter(filter *lnwire.GossipTimestampRange) er
return err
}

log.Infof("GossipSyncer(%x): applying new update horizon: start=%v, "+
"end=%v, backlog_size=%v", g.cfg.peerPub[:], startTime, endTime,
len(newUpdatestoSend))
log.Infof("GossipSyncer(%x): applying new remote update horizon: "+
"start=%v, end=%v, backlog_size=%v", g.cfg.peerPub[:],
startTime, endTime, len(newUpdatestoSend))

// If we don't have any to send, then we can return early.
if len(newUpdatestoSend) == 0 {
Expand Down
3 changes: 3 additions & 0 deletions graph/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,9 @@ func (b *Builder) processUpdate(msg interface{},
msg.ChannelID)
}

log.Debugf("Found edge1Timestamp=%v, edge2Timestamp=%v",
edge1Timestamp, edge2Timestamp)

// As edges are directional edge node has a unique policy for
// the direction of the edge they control. Therefore, we first
// check if we already have the most up-to-date information for
Expand Down
4 changes: 4 additions & 0 deletions graph/db/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -2797,6 +2797,10 @@ func (c *ChannelGraph) UpdateEdgePolicy(edge *models.ChannelEdgePolicy,
tx, edge, c.graphCache,
)

if err != nil {
log.Errorf("UpdateEdgePolicy faild: %v", err)
}

// Silence ErrEdgeNotFound so that the batch can
// succeed, but propagate the error via local state.
if errors.Is(err, ErrEdgeNotFound) {
Expand Down
8 changes: 8 additions & 0 deletions graph/db/graph_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,
var inboundFee lnwire.Fee
_, err := policy.ExtraOpaqueData.ExtractRecords(&inboundFee)
if err != nil {
log.Errorf("Failed to extract records from edge policy %v: %v",
policy.ChannelID, err)

return
}

Expand All @@ -236,11 +239,16 @@ func (c *GraphCache) UpdatePolicy(policy *models.ChannelEdgePolicy, fromNode,

updatePolicy := func(nodeKey route.Vertex) {
if len(c.nodeChannels[nodeKey]) == 0 {
log.Warnf("Node=%v not found in graph cache", nodeKey)

return
}

channel, ok := c.nodeChannels[nodeKey][policy.ChannelID]
if !ok {
log.Warnf("Channel=%v not found in graph cache",
policy.ChannelID)

return
}

Expand Down
8 changes: 8 additions & 0 deletions graph/db/models/channel_edge_policy.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package models

import (
"fmt"
"time"

"github.com/btcsuite/btcd/btcec/v2/ecdsa"
Expand Down Expand Up @@ -113,3 +114,10 @@ func (c *ChannelEdgePolicy) ComputeFee(

return c.FeeBaseMSat + (amt*c.FeeProportionalMillionths)/feeRateParts
}

// String returns a human-readable version of the channel edge policy.
func (c *ChannelEdgePolicy) String() string {
return fmt.Sprintf("ChannelID=%v, MessageFlags=%v, ChannelFlags=%v, "+
"LastUpdate=%v", c.ChannelID, c.MessageFlags, c.ChannelFlags,
c.LastUpdate)
}
6 changes: 6 additions & 0 deletions routing/unified_edges.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (u *nodeEdgeUnifier) addPolicy(fromNode route.Vertex,
// Skip channels if there is an outgoing channel restriction.
if localChan && u.outChanRestr != nil {
if _, ok := u.outChanRestr[edge.ChannelID]; !ok {
log.Debugf("Skipped adding policy for restricted edge "+
"%v", edge.ChannelID)

return
}
}
Expand Down Expand Up @@ -100,6 +103,9 @@ func (u *nodeEdgeUnifier) addGraphPolicies(g Graph) error {
// Note that we are searching backwards so this node would have
// come prior to the pivot node in the route.
if channel.InPolicy == nil {
log.Debugf("Skipped adding edge %v due to nil policy",
channel.ChannelID)

return nil
}

Expand Down