Skip to content

Commit

Permalink
feat: Add support for bi-directional RPC guest services by exposing t…
Browse files Browse the repository at this point in the history
…he registry through hooks in agent client and server

Signed-off-by: Felicitas Pojtinger <[email protected]>
  • Loading branch information
pojntfx committed Sep 27, 2024
1 parent 54bfea8 commit dc12384
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 3 deletions.
1 change: 1 addition & 0 deletions cmd/drafter-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func main() {
uint32(*vsockPort),

agentClient,
ipc.StartAgentClientHooks[struct{}]{},
)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions cmd/drafter-peer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ func main() {
*rescueTimeout,

struct{}{},
ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{},

runner.SnapshotLoadConfiguration{
ExperimentalMapPrivate: *experimentalMapPrivate,
Expand Down
1 change: 1 addition & 0 deletions cmd/drafter-runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,7 @@ func main() {
packageConfig.AgentVSockPort,

struct{}{},
ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{},

runner.SnapshotLoadConfiguration{
ExperimentalMapPrivate: *experimentalMapPrivate,
Expand Down
9 changes: 9 additions & 0 deletions pkg/ipc/agent_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ type ConnectedAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any] st
Close func()
}

type StartAgentClientHooks[R AgentClientRemote] struct {
OnAfterRegistrySetup func(forRemotes func(cb func(remoteID string, remote R) error) error) error
}

func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any](
dialCtx context.Context,
remoteCtx context.Context,
Expand All @@ -69,6 +73,7 @@ func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any](
vsockPort uint32,

agentClientLocal L,
hooks StartAgentClientHooks[R],
) (connectedAgentClient *ConnectedAgentClient[L, R, G], errs error) {
connectedAgentClient = &ConnectedAgentClient[L, R, G]{
Wait: func() error {
Expand Down Expand Up @@ -144,6 +149,10 @@ func StartAgentClient[L *AgentClientLocal[G], R AgentClientRemote, G any](
},
)

if hook := hooks.OnAfterRegistrySetup; hook != nil {
hook(registry.ForRemotes)
}

connectedAgentClient.Wait = sync.OnceValue(func() error {
// We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us
defer cancelLinkCtx(nil)
Expand Down
15 changes: 14 additions & 1 deletion pkg/ipc/agent_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,23 @@ func StartAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any](
return
}

type AgentServerAcceptHooks[R AgentServerRemote[G], G any] struct {
OnAfterRegistrySetup func(forRemotes func(cb func(remoteID string, remote R) error) error) error
}

type AcceptingAgentServer[L AgentServerLocal, R AgentServerRemote[G], G any] struct {
Remote R

Wait func() error
Close func() error
}

func (agentServer *AgentServer[L, R, G]) Accept(acceptCtx context.Context, remoteCtx context.Context) (acceptingAgentServer *AcceptingAgentServer[L, R, G], errs error) {
func (agentServer *AgentServer[L, R, G]) Accept(
acceptCtx context.Context,
remoteCtx context.Context,

hooks AgentServerAcceptHooks[R, G],
) (acceptingAgentServer *AcceptingAgentServer[L, R, G], errs error) {
acceptingAgentServer = &AcceptingAgentServer[L, R, G]{
Wait: func() error {
return nil
Expand Down Expand Up @@ -198,6 +207,10 @@ func (agentServer *AgentServer[L, R, G]) Accept(acceptCtx context.Context, remot
},
)

if hook := hooks.OnAfterRegistrySetup; hook != nil {
hook(registry.ForRemotes)
}

acceptingAgentServer.Wait = sync.OnceValue(func() error {
// We don't `defer conn.Close` here since Firecracker handles resetting active VSock connections for us
defer cancelLinkCtx(nil)
Expand Down
2 changes: 2 additions & 0 deletions pkg/peer/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func (migratedPeer *MigratedPeer[L, R, G]) Resume(
rescueTimeout time.Duration,

agentServerLocal L,
agentServerHooks ipc.AgentServerAcceptHooks[R, G],

snapshotLoadConfiguration runner.SnapshotLoadConfiguration,
) (resumedPeer *ResumedPeer[L, R, G], errs error) {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (migratedPeer *MigratedPeer[L, R, G]) Resume(
packageConfig.AgentVSockPort,

agentServerLocal,
agentServerHooks,

snapshotLoadConfiguration,
)
Expand Down
8 changes: 7 additions & 1 deletion pkg/runner/resume.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (runner *Runner[L, R, G]) Resume(
agentVSockPort uint32,

agentServerLocal L,
agentServerHooks ipc.AgentServerAcceptHooks[R, G],

snapshotLoadConfiguration SnapshotLoadConfiguration,
) (
Expand Down Expand Up @@ -235,7 +236,12 @@ func (runner *Runner[L, R, G]) Resume(

suspendOnPanicWithError = true

resumedRunner.acceptingAgent, err = resumedRunner.agent.Accept(resumeSnapshotAndAcceptCtx, ctx)
resumedRunner.acceptingAgent, err = resumedRunner.agent.Accept(
resumeSnapshotAndAcceptCtx,
ctx,

agentServerHooks,
)
if err != nil {
panic(errors.Join(ErrCouldNotAcceptAgent, err))
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/snapshotter/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,12 @@ func CreateSnapshot(
acceptCtx, cancel := context.WithTimeout(goroutineManager.Context(), agentConfiguration.ResumeTimeout)
defer cancel()

acceptingAgent, err = agent.Accept(acceptCtx, goroutineManager.Context())
acceptingAgent, err = agent.Accept(
acceptCtx,
goroutineManager.Context(),

ipc.AgentServerAcceptHooks[ipc.AgentServerRemote[struct{}], struct{}]{},
)
if err != nil {
panic(errors.Join(ErrCouldNotAcceptAgentConnection, err))
}
Expand Down

0 comments on commit dc12384

Please sign in to comment.