Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split out IO management into standalone app. #34

Merged
merged 1 commit into from
Feb 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/sylabs/compute-service/internal/app/iomanager"
"github.com/sylabs/compute-service/internal/app/server"
"github.com/sylabs/compute-service/internal/pkg/mongodb"
"github.com/sylabs/compute-service/internal/pkg/rediskv"
Expand Down Expand Up @@ -42,7 +43,7 @@ var (
)

// signalHandler catches SIGINT/SIGTERM to perform an orderly shutdown.
func signalHandler(s server.Server) {
func signalHandler(nc *nats.Conn, s server.Server, m iomanager.IOManager) {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)

Expand All @@ -51,7 +52,20 @@ func signalHandler(s server.Server) {
}).Info("shutting down due to signal")

if err := s.Stop(context.Background()); err != nil {
logrus.WithError(err).Warning("shutdown failed")
logrus.WithError(err).Warning("server shutdown failed")
}

if err := m.Stop(); err != nil {
logrus.WithError(err).Warning("IO manager shutdown failed")
}

// Drain nats connection before closing.
if err := nc.Drain(); err != nil {
logrus.WithError(err).Warning("starting nats connection draining failed")
}

// Wait for connection to drain and close.
for nc.IsDraining() {
}
}

Expand Down Expand Up @@ -151,6 +165,20 @@ func main() {
rc.Disconnect()
}()

// Spin up IO Manager.
ioc := iomanager.Config{
Version: version,
NATSConn: nc,
RedisConn: rc,
}

m, err := iomanager.New(ioc)
if err != nil {
logrus.WithError(err).Error("failed to create IO manager")
return
}
m.Start()

// Spin up server.
c := server.Config{
Version: version,
Expand All @@ -170,7 +198,7 @@ func main() {
}

// Spin off signal handler to do graceful shutdown.
go signalHandler(s)
go signalHandler(nc, s, m)

// Main server routine.
s.Run()
Expand Down
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@ require (
github.com/friendsofgo/graphiql v0.2.2
github.com/go-redis/redis v6.15.7+incompatible
github.com/golang/snappy v0.0.1 // indirect
github.com/gorilla/mux v1.7.4 // indirect

github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277
github.com/nats-io/nats-server/v2 v2.1.4 // indirect

github.com/nats-io/nats.go v1.9.1
github.com/onsi/ginkgo v1.12.0 // indirect
github.com/onsi/gomega v1.9.0 // indirect

github.com/pbnjay/memory v0.0.0-20190104145345-974d429e7ae4
github.com/prometheus/client_golang v1.3.0
github.com/rs/cors v1.7.0
github.com/sirupsen/logrus v1.4.2
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/tidwall/pretty v1.0.0 // indirect
github.com/urfave/negroni v1.0.0 // indirect
github.com/tidwall/pretty v1.0.1 // indirect
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect

go.mongodb.org/mongo-driver v1.2.1
gopkg.in/square/go-jose.v2 v2.4.1
)
18 changes: 2 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,12 @@ github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEW
github.com/google/go-cmp v0.3.1 h1:Xye71clBPdm5HgqGwUkwhbynsUJZhDbS20FvLhQ2izg=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277 h1:E0whKxgp2ojts0FDgUA8dl62bmH0LxKanMoBr6MDTDM=
github.com/graph-gophers/graphql-go v0.0.0-20191115155744-f33e81362277/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
Expand Down Expand Up @@ -110,19 +104,13 @@ github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIKYqbNC9s=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/urfave/negroni v1.0.0 h1:kIimOitoypq34K7TG7DUaJ9kq/N4Ofuwi1sjz0KipXc=
github.com/urfave/negroni v1.0.0/go.mod h1:Meg73S6kFm/4PpbYdq35yYWoCZ9mS/YSx+lKnmiohz4=
github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8=
github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
Expand All @@ -136,7 +124,6 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49N
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand All @@ -157,7 +144,6 @@ golang.org/x/sys v0.0.0-20191220142924-d4481acd189f h1:68K/z8GLUxV76xGSqwTWw2gyk
golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
Expand Down
100 changes: 100 additions & 0 deletions internal/app/iomanager/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// Copyright (c) 2020, Sylabs, Inc. All rights reserved.

package iomanager

import (
"strings"

"github.com/nats-io/nats.go"
"github.com/sirupsen/logrus"
"github.com/sylabs/compute-service/internal/pkg/rediskv"
)

// Config describes the IO manager configuration.
type Config struct {
Version string
NATSConn *nats.Conn
RedisConn *rediskv.Connection
}

// IOManager contains the state of the IO Manager.
type IOManager struct {
nc *nats.Conn
rc *rediskv.Connection
subs []*nats.Subscription
}

// New returns a new IO Manager.
func New(c Config) (m IOManager, err error) {
return IOManager{
c.NATSConn,
c.RedisConn,
nil,
}, nil
}

// Start starts the IO Manager by initializing handlers for
// NATS subscriptions.
func (m IOManager) Start() {
// Subscribe to relevant topics.
if err := m.subscribe(); err != nil {
logrus.WithError(err).Warn("failed to subscribe")
return
}
}

// Stop stops the IO Manager by putting NATS subscriptions
// in a draining state.
func (m IOManager) Stop() error {
return m.unsubscribe()
}

// subscribe expresses interest in subjects that are relevant to the IO Manager.
func (m IOManager) subscribe() error {
subs := []struct {
subject string
handler nats.MsgHandler
}{
{"job.*.output", m.jobOutputHandler},
}
for _, s := range subs {
sub, err := m.nc.Subscribe(s.subject, s.handler)
if err != nil {
logrus.WithField("subject", s.subject).WithError(err).Warn("failed to subscribe")
return err
}
logrus.WithField("subject", s.subject).Info("subscribed")

m.subs = append(m.subs, sub)
}
return nil
}

// subscribe removes interest in subjects that are relevant to the IO Manager.
// NOTE: NATS will continue to handle callbacks until queue is empty.
func (m IOManager) unsubscribe() error {
for _, s := range m.subs {
err := s.Drain()
if err != nil {
logrus.WithField("subject", s.Subject).WithError(err).Warn("failed to unsubscribe")
return err
}
}

return nil
}

// NOTE: If multiple jobOutputHandlers are spun off, output for a job could be placed
// out of order in Redis.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this we might need to solve at some point, as we'll hit it during rolling upgrades. Maybe we should open an issue for that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also see this being an issue when scaling horizontally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Captured in #35

func (m IOManager) jobOutputHandler(msg *nats.Msg) {
// Parse subject for job ID
s := strings.Split(msg.Subject, ".")
if len(s) != 3 {
logrus.Errorf("malformed job output subject: %s, skipping", msg.Subject)
}

id := s[1]
if err := m.rc.Append(id, string(msg.Data)); err != nil {
logrus.Errorf("failed to append job %s output: %v", id, err)
}
}
11 changes: 11 additions & 0 deletions internal/pkg/rediskv/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,23 @@ func (c *Connection) Set(key, value string) error {

}

// Append will append the value to the existing entry for the
// supplied key, or create a new one.
func (c *Connection) Append(key, value string) error {
return c.rc.Append(key, value).Err()
}

// Get will retrieve the value at the supplied key.
// If the key is not found, "" is returned without an error.
func (c *Connection) Get(key string) (string, error) {
v, err := c.rc.Get(key).Result()
if err != nil {
if err == redis.Nil {
return "", nil
}
return "", err
}

return v, nil
}

Expand Down
40 changes: 32 additions & 8 deletions internal/pkg/rediskv/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
package rediskv

import (
"crypto/rand"
"flag"
"fmt"
"log"
"math/rand"
"math"
"math/big"
"os"
"testing"
)
Expand Down Expand Up @@ -38,25 +40,47 @@ func TestMain(m *testing.M) {
os.Exit(run(m))
}

func TestGetSet(t *testing.T) {
i := rand.Int()
func TestGetSetAppend(t *testing.T) {
ikaneshiro marked this conversation as resolved.
Show resolved Hide resolved
n, err := rand.Int(rand.Reader, big.NewInt(int64(math.MaxInt32)))
if err != nil {
t.Fatalf("failed to generate random int: %v", err)
}
i := int32(n.Int64())
testkey, testvalue := fmt.Sprintf("testkey-%d", i), fmt.Sprintf("testvalue-%d", i)
_, err := testConnection.Get(testkey)
if err == nil {
t.Fatal("unexpected success")
val, err := testConnection.Get(testkey)
if err != nil {
t.Fatal("unexpected failure")
}
if val != "" {
t.Fatalf("want %q, got %q", "", val)
}

err = testConnection.Set(testkey, testvalue)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

val, err := testConnection.Get(testkey)
val, err = testConnection.Get(testkey)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

if testvalue != val {
t.Fatalf("want %s, got %s", string(testvalue), string(val))
t.Fatalf("want %s, got %s", testvalue, val)
}

err = testConnection.Append(testkey, testvalue)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

val, err = testConnection.Get(testkey)
if err != nil {
t.Fatalf("unexpected failure: %v", err)
}

want := testvalue + testvalue
if want != val {
t.Fatalf("want %q, got %q", want, val)
}
}
4 changes: 0 additions & 4 deletions internal/pkg/scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,8 @@ func (s *Scheduler) runJob(ctx context.Context, j model.Job) error {
_, err := s.m.Subscribe(fmt.Sprintf("job.%v.finished", j.ID), func(msg struct {
Status string
RC int
Out string
}) {
s.p.SetJobStatus(ctx, j.ID, msg.Status, msg.RC)
if err := s.iop.Set(j.ID, msg.Out); err != nil {
log.Errorf("failed to record job output: %v", err)
}
close(jobFinished)
})
if err != nil {
Expand Down