Skip to content

Commit

Permalink
fix devstack libp2p not connecting
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Jan 7, 2024
1 parent 1516f24 commit 3f48e1f
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/compute/store/boltdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewStore(ctx context.Context, dbPath string) (*Store, error) {
database, err := GetDatabase(dbPath)
if err != nil {
if err == bolt.ErrTimeout {
return nil, fmt.Errorf("timed out while opening database, is file %s in use", dbPath)
return nil, fmt.Errorf("timed out while opening database, is file %s in use?", dbPath)
}
return nil, err
}
Expand Down
20 changes: 11 additions & 9 deletions pkg/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func Setup(
// ////////////////////////////////////
// Transport layer (NATS or Libp2p)
// ////////////////////////////////////
var libp2pPeer []multiaddr.Multiaddr
var swarmPort int
if os.Getenv("PREDICTABLE_API_PORT") != "" {
const startSwarmPort = 4222 // 4222 is the default NATS port
Expand Down Expand Up @@ -174,7 +175,6 @@ func Setup(
clusterPeersAddrs = append(clusterPeersAddrs, fmt.Sprintf("0.0.0.0:%d", clusterPort))
}
} else {
var libp2pPeer []multiaddr.Multiaddr
if i == 0 {
if stackConfig.Peer != "" {
// connect 0'th node to external peer if specified
Expand All @@ -197,7 +197,7 @@ func Setup(
log.Ctx(ctx).Debug().Msgf("Connecting to first libp2p requester node: %s", libp2pPeer)
}

clusterConfig.Libp2pHost, err = createLibp2pHost(ctx, cm, libp2pPeer, swarmPort)
clusterConfig.Libp2pHost, err = createLibp2pHost(ctx, cm, swarmPort)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -289,6 +289,14 @@ func Setup(
return nil, err
}

// Start libp2p connections
if clusterConfig.Libp2pHost != nil {
err = bac_libp2p.ConnectToPeersContinuouslyWithRetryDuration(ctx, cm, clusterConfig.Libp2pHost, libp2pPeer, 2*time.Second)
if err != nil {
return nil, err
}
}

// start the node
err = n.Start(ctx)
if err != nil {
Expand All @@ -310,7 +318,7 @@ func Setup(
}, nil
}

func createLibp2pHost(ctx context.Context, cm *system.CleanupManager, peers []multiaddr.Multiaddr, port int) (host.Host, error) {
func createLibp2pHost(ctx context.Context, cm *system.CleanupManager, port int) (host.Host, error) {
var err error

// TODO(forrest): [devstack] Refactor the devstack s.t. each node has its own repo and config.
Expand All @@ -331,12 +339,6 @@ func createLibp2pHost(ctx context.Context, cm *system.CleanupManager, peers []mu
return nil, fmt.Errorf("error creating libp2p host: %w", err)
}

ctx = logger.ContextWithNodeIDLogger(ctx, libp2pHost.ID().String())
err = bac_libp2p.ConnectToPeersContinuouslyWithRetryDuration(ctx, cm, libp2pHost, peers, 2*time.Second)
if err != nil {
return nil, err
}

return libp2pHost, nil
}

Expand Down
1 change: 0 additions & 1 deletion pkg/libp2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ func connectToPeers(ctx context.Context, h host.Host, peers []multiaddr.Multiadd
Msg("Libp2p transport connected to peer")
}
}
log.Ctx(ctx).Debug().Msgf("Current peers: %s", h.Network().Peers())
if len(errors) > 0 {
return fmt.Errorf("libp2p transport had errors connecting to peers: %s", errors)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/nats/pubsub/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type PubSubParams struct {
// Conn is the NATS connection to use for publishing and subscribing.
Conn *nats.Conn
}

type PubSub[T any] struct {
subject string
subscriptionSubject string
Expand Down
3 changes: 0 additions & 3 deletions pkg/node/config_compute.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ type ComputeConfig struct {
BidResourceStrategy bidstrategy.ResourceBidStrategy

ExecutionStore store.ExecutionStore

// NATS config
Servers []string
}

func NewComputeConfigWithDefaults() (ComputeConfig, error) {
Expand Down

0 comments on commit 3f48e1f

Please sign in to comment.