Skip to content

Commit

Permalink
Split out IO management into standalone app.
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Kaneshiro <[email protected]>
  • Loading branch information
ikaneshiro committed Feb 20, 2020
1 parent 7bc2cdb commit 89ba514
Show file tree
Hide file tree
Showing 7 changed files with 181 additions and 35 deletions.
34 changes: 31 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/sylabs/compute-service/internal/app/iomanager"
"github.com/sylabs/compute-service/internal/app/server"
"github.com/sylabs/compute-service/internal/pkg/mongodb"
"github.com/sylabs/compute-service/internal/pkg/rediskv"
Expand Down Expand Up @@ -42,7 +43,7 @@ var (
)

// signalHandler catches SIGINT/SIGTERM to perform an orderly shutdown.
func signalHandler(s server.Server) {
func signalHandler(nc *nats.Conn, s server.Server, m iomanager.IOManager) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

Expand All @@ -51,7 +52,20 @@ func signalHandler(s server.Server) {
}).Info("shutting down due to signal")

if err := s.Stop(context.Background()); err != nil {
logrus.WithError(err).Warning("shutdown failed")
logrus.WithError(err).Warning("server shutdown failed")
}

if err := m.Stop(); err != nil {
logrus.WithError(err).Warning("IO manager shutdown failed")
}

// Drain nats connection before closing.
if err := nc.Drain(); err != nil {
logrus.WithError(err).Warning("starting nats connection draining failed")
}

// Wait for connection to drain and close.
for nc.IsDraining() {
}
}

Expand Down Expand Up @@ -151,6 +165,20 @@ func main() {
rc.Disconnect()
}()

// Spin up IO Manager.
ioc := iomanager.Config{
Version: version,
NATSConn: nc,
RedisConn: rc,
}

m, err := iomanager.New(ioc)
if err != nil {
logrus.WithError(err).Error("failed to create IO manager")
return
}
m.Start()

// Spin up server.
c := server.Config{
Version: version,
Expand All @@ -170,7 +198,7 @@ func main() {
}

// Spin off signal handler to do graceful shutdown.
go signalHandler(s)
go signalHandler(nc, s, m)

// Main server routine.
s.Run()
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@ require (
github.com/friendsofgo/graphiql v0.2.2
github.com/go-redis/redis v6.15.7+incompatible
github.com/golang/snappy v0.0.1 // indirect
github.com/gorilla/mux v1.7.4 // indirect

github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277
github.com/nats-io/nats-server/v2 v2.1.4 // indirect

github.com/nats-io/nats.go v1.9.1
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect

github.com/pbnjay/memory v0.0.0-20190104145345-974d429e7ae4
github.com/prometheus/client_golang v1.3.0
github.com/rs/cors v1.7.0
github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/tidwall/pretty v1.0.0 // indirect
github.com/urfave/negroni v1.0.0 // indirect
github.com/tidwall/pretty v1.0.1 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect

go.mongodb.org/mongo-driver v1.2.1
gopkg.in/square/go-jose.v2 v2.4.1
)
18 changes: 2 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,12 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277 h1:E0whKxgp2ojts0FDgUA8dl62bmH0LxKanMoBr6MDTDM=
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -110,19 +104,13 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8=
github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
Expand All @@ -136,7 +124,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49N
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand All @@ -157,7 +144,6 @@ golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
100 changes: 100 additions & 0 deletions internal/app/iomanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2020, Sylabs, Inc. All rights reserved.

package iomanager

import (
"strings"

"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/sylabs/compute-service/internal/pkg/rediskv"
)

// Config describes the IO manager configuration.
type Config struct {
Version string
NATSConn *nats.Conn
RedisConn *rediskv.Connection
}

// IOManager contains the state of the IO Manager.
type IOManager struct {
nc *nats.Conn
rc *rediskv.Connection
subs []*nats.Subscription
}

// New returns a new IO Manager.
func New(c Config) (m IOManager, err error) {
return IOManager{
c.NATSConn,
c.RedisConn,
nil,
}, nil
}

// Start starts the IO Manager by initializing handlers for
// NATS subscriptions.
func (m IOManager) Start() {
// Subscribe to relevant topics.
if err := m.subscribe(); err != nil {
logrus.WithError(err).Warn("failed to subscribe")
return
}
}

// Stop stops the IO Manager by putting NATS subscriptions
// in a draining state.
func (m IOManager) Stop() error {
return m.unsubscribe()
}

// subscribe expresses interest in subjects that are relevant to the IO Manager.
func (m IOManager) subscribe() error {
subs := []struct {
subject string
handler nats.MsgHandler
}{
{"job.*.output", m.jobOutputHandler},
}
for _, s := range subs {
sub, err := m.nc.Subscribe(s.subject, s.handler)
if err != nil {
logrus.WithField("subject", s.subject).WithError(err).Warn("failed to subscribe")
return err
}
logrus.WithField("subject", s.subject).Info("subscribed")

m.subs = append(m.subs, sub)
}
return nil
}

// subscribe removes interest in subjects that are relevant to the IO Manager.
// NOTE: NATS will continue to handle callbacks until queue is empty.
func (m IOManager) unsubscribe() error {
for _, s := range m.subs {
err := s.Drain()
if err != nil {
logrus.WithField("subject", s.Subject).WithError(err).Warn("failed to unsubscribe")
return err
}
}

return nil
}

// NOTE: If multiple jobOutputHandlers are spun off, output for a job couple be placed
// out of order in Redis.
func (m IOManager) jobOutputHandler(msg *nats.Msg) {
// Parse subject for job ID
s := strings.Split(msg.Subject, ".")
if len(s) != 3 {
logrus.Errorf("malformed job output subject: %s, skipping", msg.Subject)
}

id := s[1]
if err := m.rc.Append(id, string(msg.Data)); err != nil {
logrus.Errorf("failed to append job %s output: %v", id, err)
}
}
11 changes: 11 additions & 0 deletions internal/pkg/rediskv/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ func (c *Connection) Set(key, value string) error {

}

// Append will append the value to the existing entry for the
// supplied key, or create a new one.
func (c *Connection) Append(key, value string) error {
return c.rc.Append(key, value).Err()
}

// Get will retrieve the value at the supplied key.
// If the key is not found, "" is returned without an error.
func (c *Connection) Get(key string) (string, error) {
v, err := c.rc.Get(key).Result()
if err != nil {
if err == redis.Nil {
return "", nil
}
return "", err
}

return v, nil
}

Expand Down
40 changes: 32 additions & 8 deletions internal/pkg/rediskv/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package rediskv

import (
"crypto/rand"
"flag"
"fmt"
"log"
"math/rand"
"math"
"math/big"
"os"
"testing"
)
Expand Down Expand Up @@ -38,25 +40,47 @@ func TestMain(m *testing.M) {
os.Exit(run(m))
}

func TestGetSet(t *testing.T) {
i := rand.Int()
func TestGetSetAppend(t *testing.T) {
n, err := rand.Int(rand.Reader, big.NewInt(int64(math.MaxInt32)))
if err != nil {
t.Fatalf("failed to generate random int: %v", err)
}
i := int32(n.Int64())
testkey, testvalue := fmt.Sprintf("testkey-%d", i), fmt.Sprintf("testvalue-%d", i)
_, err := testConnection.Get(testkey)
if err == nil {
t.Fatal("unexpected success")
val, err := testConnection.Get(testkey)
if err != nil {
t.Fatal("unexpected failure")
}
if val != "" {
t.Fatalf("want %q, got %q", "", val)
}

err = testConnection.Set(testkey, testvalue)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

val, err := testConnection.Get(testkey)
val, err = testConnection.Get(testkey)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

if testvalue != val {
t.Fatalf("want %s, got %s", string(testvalue), string(val))
t.Fatalf("want %s, got %s", testvalue, val)
}

err = testConnection.Append(testkey, testvalue)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

val, err = testConnection.Get(testkey)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

want := testvalue + testvalue
if want != val {
t.Fatalf("want %q, got %q", want, val)
}
}
4 changes: 0 additions & 4 deletions internal/pkg/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ func (s *Scheduler) runJob(ctx context.Context, j model.Job) error {
_, err := s.m.Subscribe(fmt.Sprintf("job.%v.finished", j.ID), func(msg struct {
Status string
RC int
Out string
}) {
s.p.SetJobStatus(ctx, j.ID, msg.Status, msg.RC)
if err := s.iop.Set(j.ID, msg.Out); err != nil {
log.Errorf("failed to record job output: %v", err)
}
close(jobFinished)
})
if err != nil {
Expand Down

0 comments on commit 89ba514

Please sign in to comment.