Skip to content

Commit

Permalink
introduce nats transport (#3188)
Browse files Browse the repository at this point in the history
This PR introduces a new transport layer for inter-node connectivity
based on NATS, in addition to the existing libp2p transport.

## Integration Architecture
NATS is embedded into Bacalhau nodes. Each Orchestrator node (aka
Requester node) also runs a NATS server and connects with other
Orchestrator nodes to form a NATS cluster. Compute nodes on the other
hand only run a NATS client and connect to any of the Orchestrator nodes
to listen for job requests and publish their node info to the network.

With this approach, only the Orchestrator nodes need to be reachable,
which simplies the network requirements for the Compute nodes. Also, the
Compute nodes only need to know the address of only a single
Orchestrator node, and will learn about the other Orchestrators at
runtime. This simplifies bootstrapping, but also enables the Compute
node to failover and reconnect to other Orchestrators at runtime.

For completeness, the Orchestrator nodes also run a NATS client to
subscribe for Compute node info, publish job requests and listen for
responses.


### NATS Subjects
NATS follows subject based addressing where clients publish and
subscribe to subjects, and the servers take care of routing of those
messages. These are the current subject patterns used to orchestrate
jobs:

- `node.compute.<node-id>.*`
Every compute node subscribes to a topic in the format of
`node.compute.<node-id>.*` to handle job requests from the orchestrator,
such as `AskForBid` and `BidAccepted`. For example, if the Orchestrator
selects node `QmUg1MoAUMEbpzg6jY45zpfNo2dmTBJS6CzZCTMbniYsvC` to run a
job, it will send a message to
`node.compute.QmUg1MoAUMEbpzg6jY45zpfNo2dmTBJS6CzZCTMbniYsvC.AskForBid/1`,
which will land on the correct compute node. The suffix `/1` is just for
versioning in case we change the `AskForBid` message format

- `node.orchestrator.<node-id>.*`
Similarly, Orchestrator nodes listen to a dedicate subject to handle
compute callbacks, such as `OnRunComplete` and `OnComputeFailure`

- `node.info.<node-id>`
Compute nodes periodically publish their node info to
`node.info.<node-id>` subject, whereas Orchestrator nodes listen to
`node.info.*` so their can handle node info from all compute nodes


## Running with NATS
NATS is currently opt-in, and libp2p is still the default transport
layer. To run a network with NATS, use the following commands:
```
# Run Orchestrator node
bacalhau serve --node-type=requester --use-nats

# Run a second Orchestrator node
bacalhau serve --node-type=requester --use-nats --cluster-peers=<HOST>:6222


# Run a compute node
bacalhau serve --node-type=compute --use-nats --orchestrators=<HOST>:4222

# Run devstack with NATS
bacalhau devstack --use-nats
```

## Testing Done
- Deployed and tested in `development` environment
- All existing tests are passing when using NATS transport 

## Remaining Work
1. Logstream support for NATS transport
1. Auth, as currently any node can join the network
1. Devstack tests based on NATS and deployed as part of circleci checks
  • Loading branch information
wdbaruni authored Jan 22, 2024
1 parent eec366e commit a02d156
Show file tree
Hide file tree
Showing 100 changed files with 2,725 additions and 786 deletions.
4 changes: 2 additions & 2 deletions cmd/cli/agent/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (s *NodeSuite) TestNodeJSONOutput() {
nodeInfo := &models.NodeInfo{}
err = marshaller.JSONUnmarshalWithMax([]byte(out), &nodeInfo)
s.Require().NoError(err, "Could not unmarshall the output into json - %+v", err)
s.Require().Equal(s.Node.Host.ID(), nodeInfo.PeerInfo.ID, "Node ID does not match in json.")
s.Require().Equal(s.Node.ID, nodeInfo.ID(), "Node ID does not match in json.")
}

func (s *NodeSuite) TestNodeYAMLOutput() {
Expand All @@ -46,5 +46,5 @@ func (s *NodeSuite) TestNodeYAMLOutput() {
nodeInfo := &models.NodeInfo{}
err = marshaller.YAMLUnmarshalWithMax([]byte(out), &nodeInfo)
s.Require().NoError(err, "Could not unmarshall the output into yaml - %+v", err)
s.Require().Equal(s.Node.Host.ID(), nodeInfo.PeerInfo.ID, "Node ID does not match in yaml.")
s.Require().Equal(s.Node.ID, nodeInfo.ID(), "Node ID does not match in yaml.")
}
4 changes: 3 additions & 1 deletion cmd/cli/devstack/devstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ func NewCmd() *cobra.Command {
&ODs.ConfigurationRepo, "stack-repo", ODs.ConfigurationRepo,
"Folder to act as the devstack configuration repo",
)

devstackCmd.PersistentFlags().StringVar(
&ODs.NetworkType, "network", ODs.NetworkType,
"Type of inter-node network layer. e.g. nats and libp2p")
return devstackCmd
}

Expand Down
12 changes: 6 additions & 6 deletions cmd/cli/get/get_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (s *GetSuite) TestDockerRunWriteToJobFolderAutoDownload() {
_, runOutput, err := cmdtesting.ExecuteTestCobraCommand(args...)
require.NoError(s.T(), err, "Error submitting job")
jobID := system.FindJobIDInTestOutputLegacy(runOutput)
hostID := s.Node.Host.ID().String()
hostID := s.Node.ID
outputFolder := filepath.Join(tempDir, util.GetDefaultJobFolder(jobID))
testDownloadOutput(s.T(), runOutput, jobID, tempDir)
testResultsFolderStructure(s.T(), outputFolder, hostID, nil)
Expand All @@ -157,7 +157,7 @@ func (s *GetSuite) TestDockerRunWriteToJobFolderNamedDownload() {
_, runOutput, err := cmdtesting.ExecuteTestCobraCommand(args...)
require.NoError(s.T(), err, "Error submitting job")
jobID := system.FindJobIDInTestOutputLegacy(runOutput)
hostID := s.Node.Host.ID().String()
hostID := s.Node.ID
testDownloadOutput(s.T(), runOutput, jobID, tempDir)
testResultsFolderStructure(s.T(), tempDir, hostID, nil)
}
Expand All @@ -177,7 +177,7 @@ func (s *GetSuite) TestGetWriteToJobFolderAutoDownload() {
_, out, err := cmdtesting.ExecuteTestCobraCommand(args...)
require.NoError(s.T(), err, "Error submitting job")
jobID := system.FindJobIDInTestOutputLegacy(out)
hostID := s.Node.Host.ID().String()
hostID := s.Node.ID

_, getOutput, err := cmdtesting.ExecuteTestCobraCommand("get",
"--api-host", s.Node.APIServer.Address,
Expand Down Expand Up @@ -224,7 +224,7 @@ func (s *GetSuite) TestGetSingleFileFromOutput() {
_, out, err := cmdtesting.ExecuteTestCobraCommand(args...)
require.NoError(s.T(), err, "Error submitting job")
jobID := system.FindJobIDInTestOutputLegacy(out)
hostID := s.Node.Host.ID().String()
hostID := s.Node.ID

_, getOutput, err := cmdtesting.ExecuteTestCobraCommand("get",
"--api-host", s.Node.APIServer.Address,
Expand All @@ -250,7 +250,7 @@ func (s *GetSuite) TestGetSingleNestedFileFromOutput() {
_, out, err := cmdtesting.ExecuteTestCobraCommand(args...)
require.NoError(s.T(), err, "Error submitting job")
jobID := system.FindJobIDInTestOutputLegacy(out)
hostID := s.Node.Host.ID().String()
hostID := s.Node.ID

_, getOutput, err := cmdtesting.ExecuteTestCobraCommand("get",
"--api-host", s.Node.APIServer.Address,
Expand Down Expand Up @@ -288,7 +288,7 @@ func (s *GetSuite) TestGetWriteToJobFolderNamedDownload() {

require.NoError(s.T(), err, "Error submitting job")
jobID := system.FindJobIDInTestOutputLegacy(out)
hostID := s.Node.Host.ID().String()
hostID := s.Node.ID

_, getOutput, err := cmdtesting.ExecuteTestCobraCommand("get",
"--api-host", s.Node.APIServer.Address,
Expand Down
18 changes: 11 additions & 7 deletions cmd/cli/list/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@ func TestListSuite(t *testing.T) {
suite.Run(t, new(ListSuite))
}

func (suite *ListSuite) setupRun() {
// have to create a fresh node for each test case to avoid jobs of different runs to be mixed up
suite.TearDownTest()
// Clear the repo that was created by the previous run so a fresh one is created
// TODO: find a better solution to set the repo path for tests in pkg/setup/setup.go:49 instead of env vars to avoid such hacks
suite.T().Setenv("BACALHAU_DIR", "")
suite.SetupTest()
}

func (suite *ListSuite) TestList_NumberOfJobs() {
tests := []struct {
numberOfJobs int
Expand Down Expand Up @@ -167,9 +176,7 @@ func (suite *ListSuite) TestList_AnnotationFilter() {
for _, tc := range testCases {
suite.Run(tc.Name, func() {
ctx := context.Background()
// have to create a fresh node for each test case to avoid jobs of different runs to be mixed up
suite.TearDownTest()
suite.SetupTest()
suite.setupRun()

testJob := testutils.MakeJobWithOpts(suite.T(),
jobutils.WithAnnotations(tc.JobLabels...),
Expand Down Expand Up @@ -257,10 +264,7 @@ func (suite *ListSuite) TestList_SortFlags() {
for _, sortFlags := range sortFlagsToTest {
suite.Run(fmt.Sprintf("%+v/%+v", tc, sortFlags), func() {
ctx := context.Background()

// have to create a fresh node for each test case to avoid jobs of different runs to be mixed up
suite.TearDownTest()
suite.SetupTest()
suite.setupRun()

var jobIDs []string
for i := 0; i < tc.numberOfJobs; i++ {
Expand Down
Loading

0 comments on commit a02d156

Please sign in to comment.