From 79528087cd4cef2c4c44d7fb854c64762edd275f Mon Sep 17 00:00:00 2001 From: Andrei Stan Date: Wed, 10 Apr 2024 18:18:48 +0300 Subject: [PATCH] feat(machine/xen): Introduce Xen support Signed-off-by: Andrei Stan --- go.mod | 1 + go.sum | 2 + machine/platform/register_linux.go | 28 ++ machine/xen/init.go | 15 + machine/xen/v1alpha1.go | 519 +++++++++++++++++++++++++++++ machine/xen/xenstore_client.go | 190 +++++++++++ 6 files changed, 755 insertions(+) create mode 100644 machine/xen/init.go create mode 100644 machine/xen/v1alpha1.go create mode 100644 machine/xen/xenstore_client.go diff --git a/go.mod b/go.mod index 062aafdadc..c14bc9e552 100644 --- a/go.mod +++ b/go.mod @@ -78,6 +78,7 @@ require ( oras.land/oras-go/v2 v2.5.0 sdk.kraft.cloud v0.5.5-0.20240410102038-8d0f0333b17a sigs.k8s.io/kustomize/kyaml v0.14.3 + xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240402142354-17cf285d87e2 ) require ( diff --git a/go.sum b/go.sum index eeccbcd888..4b9c67cd46 100644 --- a/go.sum +++ b/go.sum @@ -1695,3 +1695,5 @@ sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc= sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo= sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8= +xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240402142354-17cf285d87e2 h1:f3OAMM0NgzlqWqZnuTIz6B6HPK1pGGfgKH6S94kYEWY= +xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight v0.0.0-20240402142354-17cf285d87e2/go.mod h1:tbZ4iMnk8RWkXPxTiCGdAw3hCOa3feShlf3sBh50uIc= diff --git a/machine/platform/register_linux.go b/machine/platform/register_linux.go index 3db05fc646..fdb3366051 100644 --- a/machine/platform/register_linux.go +++ b/machine/platform/register_linux.go @@ -14,6 +14,7 @@ import ( "kraftkit.sh/internal/set" "kraftkit.sh/machine/firecracker" "kraftkit.sh/store" + "kraftkit.sh/machine/xen" ) var firecrackerV1alpha1Driver = func(ctx context.Context, opts ...any) (machinev1alpha1.MachineService, error) { @@ -43,6 +44,30 @@ var firecrackerV1alpha1Driver = func(ctx context.Context, opts ...any) (machinev ) } +var xenV1alpha1Driver = func(ctx context.Context, opts ...any) (machinev1alpha1.MachineService, error) { + service, err := xen.NewMachineV1alpha1Service(ctx) + if err != nil { + return nil, err + } + + embeddedStore, err := store.NewEmbeddedStore[machinev1alpha1.MachineSpec, machinev1alpha1.MachineStatus]( + filepath.Join( + config.G[config.KraftKit](ctx).RuntimeDir, + "machinev1alpha1", + ), + ) + if err != nil { + return nil, err + } + + return machinev1alpha1.NewMachineServiceHandler( + ctx, + service, + zip.WithStore[machinev1alpha1.MachineSpec, machinev1alpha1.MachineStatus](embeddedStore, zip.StoreRehydrationSpecNil), + zip.WithBefore(storePlatformFilter(PlatformXen)), + ) +} + func unixVariantStrategies() map[Platform]*Strategy { // TODO(jake-ciolek): The firecracker driver has a dependency on github.com/containernetworking/plugins/pkg/ns via // github.com/firecracker-microvm/firecracker-go-sdk @@ -51,5 +76,8 @@ func unixVariantStrategies() map[Platform]*Strategy { PlatformFirecracker: { NewMachineV1alpha1: firecrackerV1alpha1Driver, }, + PlatformXen: { + NewMachineV1alpha1: xenV1alpha1Driver, + }, } } diff --git a/machine/xen/init.go b/machine/xen/init.go new file mode 100644 index 0000000000..620ca3c3d8 --- /dev/null +++ b/machine/xen/init.go @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) 2024, Unikraft GmbH and The KraftKit Authors. +// Licensed under the BSD-3-Clause License (the "License"). +// You may not use this file except in compliance with the License. +package xen + +import ( + "encoding/gob" + + "xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight" +) + +func init() { + gob.Register(xenlight.Domid(0)) +} diff --git a/machine/xen/v1alpha1.go b/machine/xen/v1alpha1.go new file mode 100644 index 0000000000..49b9f28680 --- /dev/null +++ b/machine/xen/v1alpha1.go @@ -0,0 +1,519 @@ +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) 2022, Unikraft GmbH and The KraftKit Authors. +// Licensed under the BSD-3-Clause License (the "License"). +// You may not use this file except in compliance with the License. +package xen + +import ( + "context" + "fmt" + "io/fs" + "os" + "path/filepath" + "slices" + "strings" + "time" + + zip "api.zip" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + + machinev1alpha1 "kraftkit.sh/api/machine/v1alpha1" + "kraftkit.sh/config" + "kraftkit.sh/internal/logtail" + "kraftkit.sh/log" + "kraftkit.sh/machine/network/macaddr" + "kraftkit.sh/unikraft/export/v0/ukargparse" + "kraftkit.sh/unikraft/export/v0/vfscore" + "xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight" +) + +const ( + XenMemoryScale = 1024 + XenMemoryDefault = 64 + XenCPUsDefault = 1 +) + +type machineV1alpha1Service struct{} + +func NewMachineV1alpha1Service(ctx context.Context) (machinev1alpha1.MachineService, error) { + return &machineV1alpha1Service{}, nil +} + +func (service *machineV1alpha1Service) Create(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + cfg, err := NewXenConfig() + if err != nil { + return nil, err + } + + if machine.Status.KernelPath == "" { + return machine, fmt.Errorf("cannot create xen instace without a kernel") + } + + if !slices.Contains([]string{"arm64", "x86_64", "amd64", "arm"}, machine.Spec.Architecture) { + return machine, fmt.Errorf("unsupported architecture %s", machine.Spec.Architecture) + } + + if _, err := os.Stat(machine.Status.KernelPath); err != nil && os.IsNotExist(err) { + return machine, fmt.Errorf("supplied kernel path does not exist: %s", machine.Status.KernelPath) + } + + if machine.ObjectMeta.UID == "" { + machine.ObjectMeta.UID = uuid.NewUUID() + } + + machine.Status.State = machinev1alpha1.MachineStateUnknown + + if len(machine.Status.StateDir) == 0 { + machine.Status.StateDir = filepath.Join(config.G[config.KraftKit](ctx).RuntimeDir, string(machine.ObjectMeta.UID)) + } + + if err := os.MkdirAll(machine.Status.StateDir, fs.ModeSetgid|0o775); err != nil { + return machine, err + } + + if len(machine.Status.LogFile) == 0 { + machine.Status.LogFile = filepath.Join(machine.Status.StateDir, "machine.log") + } + + fd, err := os.Create(machine.Status.LogFile) + if err != nil { + return machine, err + } + defer fd.Close() + + if machine.Spec.Resources.Requests == nil { + machine.Spec.Resources.Requests = make(corev1.ResourceList, 2) + } + + if machine.Spec.Resources.Requests.Memory().Value() == 0 { + quantity, err := resource.ParseQuantity(fmt.Sprintf("%d", XenMemoryDefault) + "Mi") + if err != nil { + machine.Status.State = machinev1alpha1.MachineStateFailed + return machine, err + } + + machine.Spec.Resources.Requests[corev1.ResourceMemory] = quantity + } + + if machine.Spec.Resources.Requests.Cpu().Value() == 0 { + quantity, err := resource.ParseQuantity(fmt.Sprintf("%d", XenCPUsDefault)) + if err != nil { + machine.Status.State = machinev1alpha1.MachineStateFailed + return machine, err + } + + machine.Spec.Resources.Requests[corev1.ResourceCPU] = quantity + } + + cfg.BInfo.MaxVcpus = int(machine.Spec.Resources.Requests.Cpu().Value()) + cfg.BInfo.MaxMemkb = uint64(machine.Spec.Resources.Requests.Memory().Value() / XenMemoryScale) + cfg.BInfo.Kernel = machine.Status.KernelPath + cfg.CInfo.Name = string(machine.ObjectMeta.Name) + cfg.CInfo.Type = xenlight.DomainTypePv + + if len(machine.Status.InitrdPath) > 0 { + cfg.BInfo.Ramdisk = machine.Status.InitrdPath + } + + if len(machine.Spec.Ports) > 0 { + return machine, fmt.Errorf("mapping ports is not supported for xen") + } + + kernelArgs, err := ukargparse.Parse(machine.Spec.KernelArgs...) + if err != nil { + return machine, err + } + + if len(machine.Spec.Networks) > 0 { + startMac, err := macaddr.GenerateMacAddress(true) + if err != nil { + return machine, err + } + + i := 0 + for _, network := range machine.Spec.Networks { + for _, iface := range network.Interfaces { + mac := iface.Spec.MacAddress + if mac == "" { + startMac = macaddr.IncrementMacAddress(startMac) + mac = startMac.String() + } + + nic, err := xenlight.NewDeviceNic() + if err != nil { + return nil, err + } + + nic.Ip = fmt.Sprintf("%s %s %s", iface.Spec.IP, network.Netmask, network.Gateway) + nic.Bridge = network.IfName + nic.Mac = xenlight.Mac([]byte(mac)) + + cfg.Nics = append(cfg.Nics, *nic) + i++ + } + } + } + + var fstab []string + + // TODO(andreistan26): Check if installed xen supports 9pfs + for i, vol := range machine.Spec.Volumes { + switch vol.Spec.Driver { + case "9pfs": + mounttag := fmt.Sprintf("fs%d", i+1) + p9Dev, err := xenlight.NewDeviceP9() + if err != nil { + return nil, err + } + + p9Dev.Tag = mounttag + p9Dev.Path = vol.Spec.Source + p9Dev.SecurityModel = "none" + + cfg.P9S = append(cfg.P9S, *p9Dev) + + fstab = append(fstab, vfscore.NewFstabEntry( + mounttag, + vol.Spec.Destination, + vol.Spec.Driver, + "", + "", + ).String()) + default: + return machine, fmt.Errorf("unsupported Xen volume driver: %v", vol.Spec.Driver) + } + } + + if len(fstab) > 0 { + kernelArgs = append(kernelArgs, vfscore.ParamVfsFstab.WithValue(fstab)) + } + + args := kernelArgs.Strings() + if len(args) > 0 { + args = append(args, "--") + } + + args = append(args, machine.Spec.ApplicationArgs...) + cfg.BInfo.Cmdline = strings.Join(args, " ") + + xenCtx, err := xenlight.NewContext() + if err != nil { + return nil, fmt.Errorf("could not create xen context: %v", err) + } + defer xenCtx.Close() + + machine.CreationTimestamp = metav1.Now() + + domID, err := xenCtx.DomainCreateNew(cfg) + if err != nil { + return machine, fmt.Errorf("could not create xen domain: %v", err) + } + + machine.Status.PlatformConfig = domID + + machine.Status.State = machinev1alpha1.MachineStateCreated + + return machine, nil +} +func (service *machineV1alpha1Service) Start(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + if machine.Status.PlatformConfig == nil { + return machine, fmt.Errorf("machine has no platform config") + } + + domId, ok := machine.Status.PlatformConfig.(xenlight.Domid) + if !ok { + return machine, fmt.Errorf("machine has no platform config") + } + + xenCtx, err := xenlight.NewContext() + if err != nil { + return nil, fmt.Errorf("could not create xen context: %v", err) + } + defer xenCtx.Close() + + err = xenCtx.DomainUnpause(domId) + if err != nil { + return machine, fmt.Errorf("could not unpause xen domain: %v", err) + } + + machine.Status.State = machinev1alpha1.MachineStateRunning + machine.Status.StartedAt = time.Now() + + return machine, nil +} +func (service *machineV1alpha1Service) Pause(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + if machine.Status.PlatformConfig == nil { + return machine, fmt.Errorf("machine has no platform config") + } + + domId, ok := machine.Status.PlatformConfig.(xenlight.Domid) + if !ok { + return machine, fmt.Errorf("machine has no platform config") + } + + xenCtx, err := xenlight.NewContext() + if err != nil { + return nil, fmt.Errorf("could not create xen context: %v", err) + } + defer xenCtx.Close() + + if err := xenCtx.DomainUnpause(domId); err != nil { + return machine, fmt.Errorf("could not unpause xen domain: %v", err) + } + + machine.Status.State = machinev1alpha1.MachineStatePaused + + return machine, nil +} +func (service *machineV1alpha1Service) Stop(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + if machine.Status.State == machinev1alpha1.MachineStateExited { + return machine, nil + } + + domId, ok := machine.Status.PlatformConfig.(xenlight.Domid) + if !ok { + return machine, fmt.Errorf("machine has no platform config") + } + + xenCtx, err := xenlight.NewContext() + if err != nil { + return nil, fmt.Errorf("could not create xen context: %v", err) + } + defer xenCtx.Close() + if err := xenCtx.DomainDestroy(domId); err != nil { + return machine, fmt.Errorf("could not destroy xen domain: %v", err) + } + + machine.Status.State = machinev1alpha1.MachineStateExited + machine.Status.ExitedAt = time.Now() + + return machine, nil +} +func (service *machineV1alpha1Service) Update(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + panic("not implemented: kraftkit.sh/machine/xen.machineV1alpha1Service.Update") +} +func (service *machineV1alpha1Service) Delete(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + err := os.RemoveAll(machine.Status.StateDir) + + return nil, err +} +func (service *machineV1alpha1Service) Get(ctx context.Context, machine *machinev1alpha1.Machine) (*machinev1alpha1.Machine, error) { + domId, ok := machine.Status.PlatformConfig.(xenlight.Domid) + if !ok { + return machine, fmt.Errorf("machine has no platform config") + } + + machine.Status.State = machinev1alpha1.MachineStateUnknown + machine.Status.ExitCode = -1 + + xenCtx, err := xenlight.NewContext() + if err != nil { + return nil, fmt.Errorf("could not create xen context: %v", err) + } + defer xenCtx.Close() + + dominfo := &xenlight.Dominfo{} + + // Should be done with xenCtx.DomainInfo, but it currently does not work + doms := xenCtx.ListDomain() + if err != nil { + return machine, fmt.Errorf("could not list xen domains: %v", err) + } + + index := slices.IndexFunc[[]xenlight.Dominfo, xenlight.Dominfo](doms, func(dominfo xenlight.Dominfo) bool { + return dominfo.Domid == domId + }) + + // if index is not present in the list probably it crashed + if index == -1 { + dominfo = &xenlight.Dominfo{Shutdown: true, ShutdownReason: xenlight.ShutdownReasonPoweroff} + } else { + dominfo = &doms[index] + } + + machine.Status.ExitCode, machine.Status.State = getXenState(dominfo) + + if machine.Status.ExitCode != -1 && machine.Status.ExitedAt.IsZero() { + machine.Status.ExitedAt = time.Now() + } + + return machine, nil + +} + +func (service *machineV1alpha1Service) List(ctx context.Context, machines *machinev1alpha1.MachineList) (*machinev1alpha1.MachineList, error) { + cached := machines.Items + machines.Items = make([]zip.Object[machinev1alpha1.MachineSpec, machinev1alpha1.MachineStatus], len(cached)) + for i, machine := range cached { + machine, err := service.Get(ctx, &machine) + if err != nil { + machines.Items = cached + return machines, err + } + + machines.Items[i] = *machine + } + + return machines, nil +} + +func (service *machineV1alpha1Service) Watch(ctx context.Context, machine *machinev1alpha1.Machine) (chan *machinev1alpha1.Machine, chan error, error) { + domId, ok := machine.Status.PlatformConfig.(xenlight.Domid) + if !ok { + return nil, nil, fmt.Errorf("machine has no platform config") + } + + w, err := NewWatcher(domId) + if err != nil { + return nil, nil, err + } + + // signals when xenstore tree was updated for domain + watch, err := w.Watch(ctx) + if err != nil { + return nil, nil, err + } + + events := make(chan *machinev1alpha1.Machine) + errs := make(chan error) + + go func() { + intialMachine, err := service.Get(ctx, machine) + if err != nil { + errs <- err + } + events <- intialMachine + + for { + select { + case <-ctx.Done(): + w.Close() + return + case <-watch: + machine, err := service.Get(ctx, machine) + if err != nil { + errs <- err + continue + } + + events <- machine + } + } + }() + + return events, errs, nil +} +func (service *machineV1alpha1Service) Logs(ctx context.Context, machine *machinev1alpha1.Machine) (chan string, chan error, error) { + domId, ok := machine.Status.PlatformConfig.(xenlight.Domid) + if !ok { + return nil, nil, fmt.Errorf("machine has no platform config") + } + + xenCtx, err := xenlight.NewContext() + if err != nil { + return nil, nil, fmt.Errorf("could not create xen context: %v", err) + } + defer xenCtx.Close() + + pts, err := xenCtx.PrimaryConsoleGetTty(uint32(domId)) + if err != nil { + return nil, nil, fmt.Errorf("could not get xen domain pts: %v", err) + } + + // Start appending pts output to logfile: pts -> chan -> log file + go func() { + ptsChan := make(chan []byte) + errChan := make(chan error) + + ptsFD, err := os.OpenFile(pts, os.O_RDONLY, 0o644) + if err != nil { + log.G(ctx).Errorf("could not open xen domain pts: %v", err) + return + } + + go func() { + buf := make([]byte, 1024) + for { + n, err := ptsFD.Read(buf) + if err != nil { + if err != os.ErrClosed { + errChan <- err + } + return + } + ptsChan <- buf[:n] + } + }() + + logFD, err := os.OpenFile(machine.Status.LogFile, os.O_APPEND|os.O_WRONLY, 0o644) + if err != nil { + log.G(ctx).Errorf("log file not found after create: %v", err) + return + } + + for { + select { + case err := <-errChan: + log.G(ctx).Errorf("could not read from pts: %v", err) + case line := <-ptsChan: + _, err := logFD.Write(line) + if err != nil { + log.G(ctx).Errorf("could not write to log file: %v", err) + } + case <-ctx.Done(): + logFD.Close() + ptsFD.Close() + return + } + } + }() + + return logtail.NewLogTail(ctx, machine.Status.LogFile) +} + +func getXenState(domInfo *xenlight.Dominfo) (int, machinev1alpha1.MachineState) { + if domInfo.Blocked || domInfo.Running { + return -1, machinev1alpha1.MachineStateRunning + } else if domInfo.Paused { + return -1, machinev1alpha1.MachineStatePaused + } else if domInfo.Dying { + return 0, machinev1alpha1.MachineStateExited + } else if domInfo.Shutdown { + switch domInfo.ShutdownReason { + case xenlight.ShutdownReasonCrash: + return 1, machinev1alpha1.MachineStateErrored + case xenlight.ShutdownReasonPoweroff: + return 0, machinev1alpha1.MachineStateExited + } + } + + return -1, machinev1alpha1.MachineStateUnknown +} + +func NewXenConfig() (*xenlight.DomainConfig, error) { + xcfg, err := xenlight.NewDomainConfig() + if err != nil { + return nil, err + } + + binfoPtr, err := xenlight.NewDomainBuildInfo(xenlight.DomainTypePv) + if err != nil { + return nil, err + } + + xcfg.BInfo = *binfoPtr + + cinfoPtr, err := xenlight.NewDomainCreateInfo() + if err != nil { + return nil, err + } + + xcfg.CInfo = *cinfoPtr + + xcfg.CInfo.Type = xenlight.DomainTypePv + + return xcfg, nil +} diff --git a/machine/xen/xenstore_client.go b/machine/xen/xenstore_client.go new file mode 100644 index 0000000000..7412df3080 --- /dev/null +++ b/machine/xen/xenstore_client.go @@ -0,0 +1,190 @@ +// SPDX-License-Identifier: BSD-3-Clause +// Copyright (c) 2022, Unikraft GmbH and The KraftKit Authors. +// Licensed under the BSD-3-Clause License (the "License"). +// You may not use this file except in compliance with the License. +package xen + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "fmt" + "net" + "os" + "strings" + + "kraftkit.sh/log" + "xenbits.xenproject.org/git-http/xen.git/tools/golang/xenlight" +) + +type XenstoreOperation uint32 + +const ( + WatchOp XenstoreOperation = 4 + UnwatchOp XenstoreOperation = 5 + WatchEvent XenstoreOperation = 15 + Error XenstoreOperation = 16 + + XenStorePathFmt = "/local/domain/%d" +) + +func XenstoreSocketPath() string { + xenstorepath := "/var/run/xenstored" + if path := os.Getenv("XENSTORED_PATH"); path != "" { + xenstorepath = path + } + + return strings.Join([]string{xenstorepath, "socket"}, "/") +} + +type XsHeader struct { + Op XenstoreOperation + ReqID uint32 + TxID uint32 + Lenght uint32 +} + +type XsPacket struct { + Header XsHeader + Data []byte +} + +type baseWatcher struct { + closeSignal chan struct{} + + domID uint32 + conn net.Conn + xsPath string + token string +} + +type Watcher interface { + Watch(ctx context.Context) (chan struct{}, error) + Close() +} + +func NewWatcher(domID xenlight.Domid) (Watcher, error) { + conn, err := net.Dial("unix", XenstoreSocketPath()) + if err != nil { + return nil, err + } + + return &baseWatcher{ + domID: uint32(domID), + conn: conn, + xsPath: fmt.Sprintf(XenStorePathFmt, uint32(domID)), + token: "kraftkit" + fmt.Sprintf("%d", uint32(domID)), + closeSignal: make(chan struct{}), + }, nil +} + +func (packet *XsPacket) pack() []byte { + data := make([]byte, 0) + data = binary.LittleEndian.AppendUint32(data, uint32(packet.Header.Op)) + data = binary.LittleEndian.AppendUint32(data, packet.Header.ReqID) + data = binary.LittleEndian.AppendUint32(data, packet.Header.TxID) + data = binary.LittleEndian.AppendUint32(data, packet.Header.Lenght) + + data = append(data, packet.Data...) + + return data +} + +func unpack(data []byte) XsPacket { + header := XsHeader{ + Op: XenstoreOperation(binary.LittleEndian.Uint32(data[0:4])), + ReqID: binary.LittleEndian.Uint32(data[4:8]), + TxID: binary.LittleEndian.Uint32(data[8:12]), + Lenght: binary.LittleEndian.Uint32(data[12:16]), + } + packet := XsPacket{ + Header: header, + Data: data[16 : 16+header.Lenght], + } + return packet +} + +func (w *baseWatcher) xsWatchRequest() error { + + data := append(append(append([]byte(w.xsPath), '\x00'), []byte(w.token)...), '\x00') + packet := XsPacket{ + Header: XsHeader{ + Op: WatchOp, + ReqID: 0, + TxID: 0, + Lenght: uint32(len(data)), + }, + Data: data, + } + + if _, err := w.conn.Write(packet.pack()); err != nil { + return err + } + + buffer := make([]byte, 4096) + if _, err := w.conn.Read(buffer); err != nil { + return err + } + + packet = unpack(buffer) + if packet.Header.Op == Error { + return fmt.Errorf("could not establish communication with xenstore") + } + + return nil +} + +func (w *baseWatcher) Watch(ctx context.Context) (chan struct{}, error) { + err := w.xsWatchRequest() + if err != nil { + return nil, err + } + + event := make(chan struct{}) + + go func() { + buffer := make([]byte, 4096) + for { + select { + case <-w.closeSignal: + close(w.closeSignal) + return + default: + if _, err := w.conn.Read(buffer); err != nil { + if !errors.Is(err, os.ErrClosed) { + log.G(ctx).Debugf("error reading from xenstore socket while listening for vm status events: %v", err) + } + continue + } + + packet := unpack(buffer) + strs := SplitData(packet) + + if packet.Header.Op != WatchEvent { + continue + } + + if w.token == string(strs[1]) && w.xsPath == string(strs[0]) { + event <- struct{}{} + } + } + } + }() + + return event, nil +} + +func (w *baseWatcher) Close() { + w.closeSignal <- struct{}{} + w.conn.Close() +} + +func SplitData(packet XsPacket) []string { + splitPayload := []string{} + for _, byteSl := range bytes.Split(packet.Data, []byte("\x00")) { + splitPayload = append(splitPayload, string(byteSl)) + } + + return splitPayload +}