Skip to content

Commit

Permalink
[Backport] Tweak logs around network (#3784)
Browse files Browse the repository at this point in the history
This pull request backports
#3777 to the
`releases/mainnet/v2.0.0-m7` branch.
  • Loading branch information
lukasz-zimnoch authored Feb 12, 2024
2 parents efbb583 + 4052de0 commit fc6ee92
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 17 deletions.
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ const (

// LogLevelEnvVariable can be used to define logging configuration.
LogLevelEnvVariable = "LOG_LEVEL"

// PubsubLogLevelEnvVariable can be used to define logging configuration
// for the pubsub implementation.
PubsubLogLevelEnvVariable = "PUBSUB_LOG_LEVEL"
)

// Config is the top level config structure.
Expand Down
28 changes: 24 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,34 @@ import (
var logger = log.Logger("keep-main")

func main() {
err := logging.Configure(os.Getenv(config.LogLevelEnvVariable))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to configure logging: [%v]\n", err)
}
configureLogging()

rootCmd := cmd.Initialize(build.Version, build.Revision)

if err := rootCmd.Execute(); err != nil {
logger.Fatal(err)
}
}

func configureLogging() {
logLevel := "info"
if env := os.Getenv(config.LogLevelEnvVariable); len(env) > 0 {
logLevel = env
}

pubsubLogLevel := "warn"
if env := os.Getenv(config.PubsubLogLevelEnvVariable); len(env) > 0 {
pubsubLogLevel = env
}

levelDirective := fmt.Sprintf(
"%s pubsub=%s",
logLevel,
pubsubLogLevel,
)

err := logging.Configure(levelDirective)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to configure logging: [%v]\n", err)
}
}
64 changes: 56 additions & 8 deletions pkg/net/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
rhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/net/upgrader"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"

ma "github.com/multiformats/go-multiaddr"
)
Expand Down Expand Up @@ -68,6 +69,10 @@ const (
// to prevent uncontrolled message propagation.
const MaximumDisseminationTime = 90

// pingTimeout is the maximum duration of the ping test performed for
// freshly connected peers.
const pingTestTimeout = 60 * time.Second

// Config defines the configuration for the libp2p network provider.
type Config struct {
Bootstrap bool
Expand Down Expand Up @@ -320,7 +325,7 @@ func Connect(
return nil, err
}

host.Network().Notify(buildNotifiee())
host.Network().Notify(buildNotifiee(host))

broadcastChannelManager, err := newChannelManager(ctx, identity, host, ticker)
if err != nil {
Expand Down Expand Up @@ -528,17 +533,20 @@ func extractMultiAddrFromPeers(peers []string) ([]peer.AddrInfo, error) {
return peerInfos, nil
}

func buildNotifiee() libp2pnet.Notifiee {
func buildNotifiee(libp2pHost host.Host) libp2pnet.Notifiee {
notifyBundle := &libp2pnet.NotifyBundle{}

notifyBundle.ConnectedF = func(_ libp2pnet.Network, connection libp2pnet.Conn) {
logger.Infof(
"established connection to [%v]",
multiaddressWithIdentity(
connection.RemoteMultiaddr(),
connection.RemotePeer(),
),
peerID := connection.RemotePeer()

peerMultiaddress := multiaddressWithIdentity(
connection.RemoteMultiaddr(),
peerID,
)

logger.Infof("established connection to [%v]", peerMultiaddress)

go executePingTest(libp2pHost, peerID, peerMultiaddress)
}
notifyBundle.DisconnectedF = func(_ libp2pnet.Network, connection libp2pnet.Conn) {
logger.Infof(
Expand All @@ -553,6 +561,46 @@ func buildNotifiee() libp2pnet.Notifiee {
return notifyBundle
}

func executePingTest(
libp2pHost host.Host,
peerID peer.ID,
peerMultiaddress string,
) {
logger.Infof("starting ping test for [%v]", peerMultiaddress)

ctx, cancelCtx := context.WithTimeout(
context.Background(),
pingTestTimeout,
)
defer cancelCtx()

resultChan := ping.Ping(ctx, libp2pHost, peerID)

select {
case result := <-resultChan:
if result.Error != nil {
logger.Warnf(
"ping test for [%v] failed: [%v]",
peerMultiaddress,
result.Error,
)
} else if result.Error == nil && result.RTT == 0 {
logger.Warnf(
"peer test for [%v] failed without clear reason",
peerMultiaddress,
)
} else {
logger.Infof(
"ping test for [%v] completed with success (RTT [%v])",
peerMultiaddress,
result.RTT,
)
}
case <-ctx.Done():
logger.Warnf("ping test for [%v] timed out", peerMultiaddress)
}
}

func multiaddressWithIdentity(
multiaddress ma.Multiaddr,
peerID peer.ID,
Expand Down
32 changes: 32 additions & 0 deletions pkg/protocol/announcer/announcer.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,35 @@ loop:

return readyMembersIndexes, nil
}

// UnreadyMembers returns a list of member indexes that turned out to be unready
// during the announcement. The list is sorted in ascending order.
func UnreadyMembers(
readyMembers []group.MemberIndex,
groupSize int,
) []group.MemberIndex {
if len(readyMembers) == groupSize {
return []group.MemberIndex{}
}

readyMembersSet := make(map[group.MemberIndex]bool)
for _, memberIndex := range readyMembers {
readyMembersSet[memberIndex] = true
}

unreadyMembers := make([]group.MemberIndex, 0)

for i := 0; i < groupSize; i++ {
memberIndex := group.MemberIndex(i + 1)

if _, isReady := readyMembersSet[memberIndex]; !isReady {
unreadyMembers = append(unreadyMembers, memberIndex)
}
}

sort.Slice(unreadyMembers, func(i, j int) bool {
return unreadyMembers[i] < unreadyMembers[j]
})

return unreadyMembers
}
43 changes: 43 additions & 0 deletions pkg/protocol/announcer/announcer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,3 +227,46 @@ func TestAnnouncer(t *testing.T) {
})
}
}

func TestUnreadyMembers(t *testing.T) {
tests := map[string]struct {
readyMembers []group.MemberIndex
groupSize int
expected []group.MemberIndex
}{
"all members are ready": {
readyMembers: []group.MemberIndex{1, 2, 3, 4, 5},
groupSize: 5,
expected: []group.MemberIndex{},
},
"some members are not ready": {
readyMembers: []group.MemberIndex{1, 3, 5},
groupSize: 5,
expected: []group.MemberIndex{2, 4},
},
"no members are ready": {
readyMembers: []group.MemberIndex{},
groupSize: 5,
expected: []group.MemberIndex{1, 2, 3, 4, 5},
},
"group size is zero": {
readyMembers: []group.MemberIndex{},
groupSize: 0,
expected: []group.MemberIndex{},
},
}

for testName, test := range tests {
t.Run(testName, func(t *testing.T) {
result := UnreadyMembers(test.readyMembers, test.groupSize)

if !reflect.DeepEqual(test.expected, result) {
t.Errorf(
"unexpected result\nexpected: %v\nactual: %v",
test.expected,
result,
)
}
})
}
}
8 changes: 7 additions & 1 deletion pkg/tbtc/dkg.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"golang.org/x/exp/maps"
"math/big"
"sort"

Expand Down Expand Up @@ -181,10 +182,15 @@ func (de *dkgExecutor) checkEligibility(
}

dkgLogger.Infof(
"selected group members for DKG = %s",
"selected group members (seats) for DKG: [%s]",
groupSelectionResult.OperatorsAddresses,
)

dkgLogger.Infof(
"distinct operators participating in DKG: [%s]",
maps.Keys(groupSelectionResult.OperatorsAddresses.Set()),
)

if len(groupSelectionResult.OperatorsAddresses) > de.groupParameters.GroupSize {
return nil, nil, fmt.Errorf(
"group size larger than supported: [%v]",
Expand Down
14 changes: 12 additions & 2 deletions pkg/tbtc/dkg_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/keep-network/keep-core/pkg/protocol/announcer"
"math/big"

"github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -209,6 +210,11 @@ func (drl *dkgRetryLoop) start(
continue
}

unreadyMembersIndexes := announcer.UnreadyMembers(
readyMembersIndexes,
drl.groupParameters.GroupSize,
)

// Check the loop stop signal.
if ctx.Err() != nil {
return nil, ctx.Err()
Expand All @@ -217,19 +223,23 @@ func (drl *dkgRetryLoop) start(
if len(readyMembersIndexes) >= drl.groupParameters.GroupQuorum {
drl.logger.Infof(
"[member:%v] completed announcement phase for attempt [%v] "+
"with quorum of [%v] members ready to perform DKG",
"with quorum of [%v] members ready to perform DKG; "+
"following members are not ready: [%v]",
drl.memberIndex,
drl.attemptCounter,
len(readyMembersIndexes),
unreadyMembersIndexes,
)
} else {
drl.logger.Warnf(
"[member:%v] completed announcement phase for attempt [%v] "+
"with non-quorum of [%v] members ready to perform DKG; "+
"starting next attempt",
"following members are not ready: [%v]; "+
"moving to the next attempt",
drl.memberIndex,
drl.attemptCounter,
len(readyMembersIndexes),
unreadyMembersIndexes,
)
continue
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/tbtc/signing_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/sha256"
"encoding/binary"
"fmt"
"github.com/keep-network/keep-core/pkg/protocol/announcer"
"math/big"
"math/rand"
"sort"
Expand Down Expand Up @@ -261,6 +262,11 @@ func (srl *signingRetryLoop) start(
continue
}

unreadyMembersIndexes := announcer.UnreadyMembers(
readyMembersIndexes,
len(srl.signingGroupOperators),
)

// Check the loop stop signal again. The announcement took some time
// and the context may be done now.
if ctx.Err() != nil {
Expand All @@ -270,19 +276,23 @@ func (srl *signingRetryLoop) start(
if len(readyMembersIndexes) >= srl.groupParameters.HonestThreshold {
srl.logger.Infof(
"[member:%v] completed announcement phase for attempt [%v] "+
"with honest majority of [%v] members ready to sign",
"with honest majority of [%v] members ready to sign; "+
"following members are not ready: [%v]",
srl.signingGroupMemberIndex,
srl.attemptCounter,
len(readyMembersIndexes),
unreadyMembersIndexes,
)
} else {
srl.logger.Warnf(
"[member:%v] completed announcement phase for attempt [%v] "+
"with minority of [%v] members ready to sign; "+
"starting next attempt",
"following members are not ready: [%v]; "+
"moving to the next attempt",
srl.signingGroupMemberIndex,
srl.attemptCounter,
len(readyMembersIndexes),
unreadyMembersIndexes,
)
continue
}
Expand Down

0 comments on commit fc6ee92

Please sign in to comment.