diff --git a/cmd/drafter-mounter/main.go b/cmd/drafter-mounter/main.go index 15a98b9..596ece0 100644 --- a/cmd/drafter-mounter/main.go +++ b/cmd/drafter-mounter/main.go @@ -355,22 +355,30 @@ func main() { if err != nil { panic(err) } - defer lis.Close() defer func() { + defer goroutineManager.CreateForegroundPanicCollector()() + closeLock.Lock() - defer closeLock.Unlock() closed = true + + closeLock.Unlock() + + if err := lis.Close(); err != nil { + panic(err) + } }() log.Println("Serving on", lis.Addr()) l: for { - // We use `context.Background` here because we want to distinguish between a cancellation and a successful accept - // We select between `acceptedCtx` and `ctx` on all code paths so we don't leak the context - acceptedCtx, cancelAcceptedCtx := context.WithCancel(context.Background()) - defer cancelAcceptedCtx() + var ( + ready = make(chan struct{}) + signalReady = sync.OnceFunc(func() { + close(ready) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + ) var conn net.Conn goroutineManager.StartForegroundGoroutine(func(_ context.Context) { @@ -390,7 +398,7 @@ l: panic(err) } - cancelAcceptedCtx() + signalReady() }) bubbleSignals = true @@ -403,7 +411,7 @@ l: case <-done: break l - case <-acceptedCtx.Done(): + case <-ready: break s } diff --git a/cmd/drafter-peer/main.go b/cmd/drafter-peer/main.go index 2001815..1e74389 100644 --- a/cmd/drafter-peer/main.go +++ b/cmd/drafter-peer/main.go @@ -481,20 +481,28 @@ func main() { if err != nil { panic(err) } - defer lis.Close() defer func() { + defer goroutineManager.CreateForegroundPanicCollector()() + closeLock.Lock() - defer closeLock.Unlock() closed = true + + closeLock.Unlock() + + if err := lis.Close(); err != nil { + panic(err) + } }() log.Println("Serving on", lis.Addr()) - // We use `context.Background` here because we want to distinguish between a cancellation and a successful accept - // We select between `acceptedCtx` and `ctx` on all code paths so we don't leak the context - acceptedCtx, cancelAcceptedCtx := context.WithCancel(context.Background()) - defer cancelAcceptedCtx() + var ( + ready = make(chan struct{}) + signalReady = sync.OnceFunc(func() { + close(ready) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + ) var conn net.Conn goroutineManager.StartForegroundGoroutine(func(_ context.Context) { @@ -514,7 +522,7 @@ func main() { panic(err) } - cancelAcceptedCtx() + signalReady() }) bubbleSignals = true @@ -536,7 +544,7 @@ func main() { return - case <-acceptedCtx.Done(): + case <-ready: break } diff --git a/os/configs/drafteros-firecracker-x86_64_pvm_defconfig b/os/configs/drafteros-firecracker-x86_64_pvm_defconfig index 731dc27..023bde4 100644 --- a/os/configs/drafteros-firecracker-x86_64_pvm_defconfig +++ b/os/configs/drafteros-firecracker-x86_64_pvm_defconfig @@ -1,6 +1,7 @@ BR2_x86_64=y BR2_KERNEL_HEADERS_VERSION=y -BR2_DEFAULT_KERNEL_VERSION="6.8.12" +BR2_DEFAULT_KERNEL_VERSION="6.7.0-rc6" +BR2_KERNEL_HEADERS_6_6=y BR2_GNU_MIRROR="https://mirrors.kernel.org/gnu" BR2_CCACHE=y BR2_TARGET_GENERIC_HOSTNAME="drafterhost" diff --git a/pkg/forwarder/forward_ports.go b/pkg/forwarder/forward_ports.go index 8a1babf..773a55d 100644 --- a/pkg/forwarder/forward_ports.go +++ b/pkg/forwarder/forward_ports.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" "runtime" + "sync" "github.com/coreos/go-iptables/iptables" "github.com/loopholelabs/drafter/internal/utils" @@ -252,9 +253,14 @@ func ForwardPorts( }, ) - closeInProgress := make(chan any) + var ( + closeInProgress = make(chan struct{}) + signalCloseInProgress = sync.OnceFunc(func() { + close(closeInProgress) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + ) forwardedPorts.Close = func() (errs error) { - defer close(closeInProgress) + defer signalCloseInProgress() for _, closeFuncs := range deferFuncs { for _, closeFunc := range closeFuncs { diff --git a/pkg/ipc/agent_client.go b/pkg/ipc/agent_client.go index 35af45f..d663182 100644 --- a/pkg/ipc/agent_client.go +++ b/pkg/ipc/agent_client.go @@ -96,7 +96,12 @@ func StartAgentClient( cancelLinkCtx(goroutineManager.GetErrGoroutineStopped()) } - ready := make(chan any) + var ( + ready = make(chan struct{}) + signalReady = sync.OnceFunc(func() { + close(ready) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + ) // This goroutine will not leak on function return because it selects on `goroutineManager.Context().Done()` // internally and we return a wait function goroutineManager.StartBackgroundGoroutine(func(ctx context.Context) { @@ -120,7 +125,7 @@ func StartAgentClient( &rpc.RegistryHooks{ OnClientConnect: func(remoteID string) { - close(ready) + signalReady() }, }, ) diff --git a/pkg/ipc/agent_server.go b/pkg/ipc/agent_server.go index 19ac365..5f94362 100644 --- a/pkg/ipc/agent_server.go +++ b/pkg/ipc/agent_server.go @@ -94,7 +94,12 @@ func (agentServer *AgentServer) Accept(acceptCtx context.Context, remoteCtx cont defer goroutineManager.StopAllGoroutines() defer goroutineManager.CreateBackgroundPanicCollector()() - ready := make(chan any) + var ( + ready = make(chan struct{}) + signalReady = sync.OnceFunc(func() { + close(ready) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + ) // This goroutine will not leak on function return because it selects on `goroutineManager.Context().Done()` internally goroutineManager.StartBackgroundGoroutine(func(ctx context.Context) { select { @@ -170,7 +175,7 @@ func (agentServer *AgentServer) Accept(acceptCtx context.Context, remoteCtx cont &rpc.RegistryHooks{ OnClientConnect: func(remoteID string) { - close(ready) + signalReady() }, }, ) diff --git a/pkg/mounter/migrate_from.go b/pkg/mounter/migrate_from.go index 154eaff..28ff78e 100644 --- a/pkg/mounter/migrate_from.go +++ b/pkg/mounter/migrate_from.go @@ -76,8 +76,15 @@ func MigrateFromAndMount( } var ( - allRemoteDevicesReceived = make(chan any) - allRemoteDevicesReady = make(chan any) + allRemoteDevicesReceived = make(chan struct{}) + signalAllRemoteDevicesReceived = sync.OnceFunc(func() { + close(allRemoteDevicesReceived) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + + allRemoteDevicesReady = make(chan struct{}) + signalAllRemoteDevicesReady = sync.OnceFunc(func() { + close(allRemoteDevicesReady) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) ) // We don't `defer cancelProtocolCtx()` this because we cancel in the wait function @@ -244,7 +251,7 @@ func MigrateFromAndMount( case packets.EventCustom: switch e.CustomType { case byte(registry.EventCustomAllDevicesSent): - close(allRemoteDevicesReceived) + signalAllRemoteDevicesReceived() if hook := hooks.OnRemoteAllDevicesReceived; hook != nil { hook() @@ -252,7 +259,7 @@ func MigrateFromAndMount( case byte(registry.EventCustomTransferAuthority): if receivedButNotReadyRemoteDevices.Add(-1) <= 0 { - close(allRemoteDevicesReady) + signalAllRemoteDevicesReady() } if hook := hooks.OnRemoteDeviceAuthorityReceived; hook != nil { @@ -298,7 +305,7 @@ func MigrateFromAndMount( select { case <-allRemoteDevicesReceived: default: - close(allRemoteDevicesReceived) + signalAllRemoteDevicesReceived() // We need to call the hook manually too since we would otherwise only call if we received at least one device if hook := hooks.OnRemoteAllDevicesReceived; hook != nil { @@ -306,7 +313,7 @@ func MigrateFromAndMount( } } - close(allRemoteDevicesReady) + signalAllRemoteDevicesReady() if hook := hooks.OnRemoteAllMigrationsCompleted; hook != nil { hook() diff --git a/pkg/mounter/migrate_to.go b/pkg/mounter/migrate_to.go index 8963f5b..3a0d879 100644 --- a/pkg/mounter/migrate_to.go +++ b/pkg/mounter/migrate_to.go @@ -89,14 +89,14 @@ func (migratableMounter *MigratableMounter) MigrateTo( suspendedVM bool ) - suspendedVMCh := make(chan any) + suspendedVMCh := make(chan struct{}) suspendAndMsyncVM := sync.OnceValue(func() error { suspendedVMLock.Lock() suspendedVM = true suspendedVMLock.Unlock() - close(suspendedVMCh) + close(suspendedVMCh) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d return nil }) diff --git a/pkg/nat/nat.go b/pkg/nat/nat.go index 53092bc..a198e60 100644 --- a/pkg/nat/nat.go +++ b/pkg/nat/nat.go @@ -159,7 +159,7 @@ func CreateNAT( return nil } - ready := make(chan any) + ready := make(chan struct{}) // This goroutine will not leak on function return because it selects on `goroutineManager.Context().Done()` internally goroutineManager.StartBackgroundGoroutine(func(internalCtx context.Context) { select { @@ -299,7 +299,7 @@ func CreateNAT( } } - close(ready) + close(ready) // We can safely close() this channel since this code path will only run once return } diff --git a/pkg/peer/migrate_from.go b/pkg/peer/migrate_from.go index 7cea285..de476bb 100644 --- a/pkg/peer/migrate_from.go +++ b/pkg/peer/migrate_from.go @@ -69,8 +69,15 @@ func (peer *Peer) MigrateFrom( } var ( - allRemoteDevicesReceived = make(chan any) - allRemoteDevicesReady = make(chan any) + allRemoteDevicesReceived = make(chan struct{}) + signalAllRemoteDevicesReceived = sync.OnceFunc(func() { + close(allRemoteDevicesReceived) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) + + allRemoteDevicesReady = make(chan struct{}) + signalAllRemoteDevicesReady = sync.OnceFunc(func() { + close(allRemoteDevicesReady) // We can safely close() this channel since the caller only runs once/is `sync.OnceFunc`d + }) ) // We don't `defer cancelProtocolCtx()` this because we cancel in the wait function @@ -268,7 +275,7 @@ func (peer *Peer) MigrateFrom( case packets.EventCustom: switch e.CustomType { case byte(registry.EventCustomAllDevicesSent): - close(allRemoteDevicesReceived) + signalAllRemoteDevicesReceived() if hook := hooks.OnRemoteAllDevicesReceived; hook != nil { hook() @@ -276,7 +283,7 @@ func (peer *Peer) MigrateFrom( case byte(registry.EventCustomTransferAuthority): if receivedButNotReadyRemoteDevices.Add(-1) <= 0 { - close(allRemoteDevicesReady) + signalAllRemoteDevicesReady() } if hook := hooks.OnRemoteDeviceAuthorityReceived; hook != nil { @@ -322,7 +329,7 @@ func (peer *Peer) MigrateFrom( select { case <-allRemoteDevicesReceived: default: - close(allRemoteDevicesReceived) + signalAllRemoteDevicesReceived() // We need to call the hook manually too since we would otherwise only call if we received at least one device if hook := hooks.OnRemoteAllDevicesReceived; hook != nil { @@ -330,7 +337,7 @@ func (peer *Peer) MigrateFrom( } } - close(allRemoteDevicesReady) + signalAllRemoteDevicesReady() if hook := hooks.OnRemoteAllMigrationsCompleted; hook != nil { hook() diff --git a/pkg/peer/migrate_to.go b/pkg/peer/migrate_to.go index 67739f7..561df47 100644 --- a/pkg/peer/migrate_to.go +++ b/pkg/peer/migrate_to.go @@ -88,7 +88,7 @@ func (migratablePeer *MigratablePeer) MigrateTo( suspendedVM bool ) - suspendedVMCh := make(chan any) + suspendedVMCh := make(chan struct{}) suspendAndMsyncVM := sync.OnceValue(func() error { if hook := hooks.OnBeforeSuspend; hook != nil { @@ -111,7 +111,7 @@ func (migratablePeer *MigratablePeer) MigrateTo( suspendedVM = true suspendedVMLock.Unlock() - close(suspendedVMCh) + close(suspendedVMCh) // We can safely close() this channel since the caller only runs once/is `sync.OnceValue`d return nil })