Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions services/datamanager/builtin/builtin.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@ package builtin

import (
"context"
"encoding/json"
"errors"
"fmt"
"image"
"os"
"sync"
"time"

"github.com/benbjohnson/clock"
v1 "go.viam.com/api/app/datasync/v1"
goutils "go.viam.com/utils"
"google.golang.org/grpc"

"go.viam.com/rdk/components/sensor"
Expand Down Expand Up @@ -82,6 +86,11 @@ type builtIn struct {
capture *capture.Capture
sync *datasync.Sync
diskSummaryTracker *diskSummaryTracker

// capture control sensor fields
controlPoller *goutils.StoppableWorkers
controlSensor sensor.Sensor
controlSensorKey string
}

// New returns a new builtin data manager service for the given robot.
Expand Down Expand Up @@ -127,6 +136,17 @@ func New(
func (b *builtIn) Close(ctx context.Context) error {
b.logger.Info("Close START")
defer b.logger.Info("Close END")

// Stop the control poller before acquiring b.mu to avoid deadlock:
// the poller goroutine holds b.mu while calling capture.SetCaptureConfigs.
b.mu.Lock()
poller := b.controlPoller
b.controlPoller = nil
b.mu.Unlock()
if poller != nil {
poller.Stop()
}

b.mu.Lock()
defer b.mu.Unlock()
b.diskSummaryTracker.close()
Expand Down Expand Up @@ -199,18 +219,136 @@ func (b *builtIn) Reconfigure(ctx context.Context, deps resource.Dependencies, c
syncSensor, syncSensorEnabled := syncSensorFromDeps(c.SelectiveSyncerName, deps, b.logger)
syncConfig := c.syncConfig(syncSensor, syncSensorEnabled, b.logger)

controlSensor, controlSensorKey := captureControlSensorFromDeps(c.CaptureControlSensor, deps, b.logger)

// Stop the old control poller before acquiring b.mu to avoid deadlock:
// the poller goroutine holds b.mu while calling capture.SetCaptureConfigs.
b.mu.Lock()
oldPoller := b.controlPoller
b.controlPoller = nil
b.mu.Unlock()
if oldPoller != nil {
oldPoller.Stop()
}

b.mu.Lock()
defer b.mu.Unlock()
b.controlSensor = controlSensor
b.controlSensorKey = controlSensorKey

// These Reconfigure calls are the only methods in builtin.Reconfigure which create / destroy resources.
// It is important that no errors happen for a given Reconfigure call after we being callin Reconfigure on capture & sync
// or we could leak goroutines, wasting resources and causing bugs due to duplicate work.
b.diskSummaryTracker.reconfigure(syncConfig.SyncPaths())
b.capture.Reconfigure(ctx, collectorConfigsByResource, captureConfig)
b.sync.Reconfigure(ctx, syncConfig, cloudConnSvc)

if controlSensor != nil && !captureConfig.CaptureDisabled {
b.controlPoller = goutils.NewBackgroundStoppableWorkers(func(ctx context.Context) {
b.runCaptureControlPoller(ctx)
})
}

return nil
}

// captureControlSensorFromDeps resolves the control sensor from dependencies.
// Returns nil if no sensor is configured or the sensor cannot be found.
func captureControlSensorFromDeps(cfg *CaptureControlSensorConfig, deps resource.Dependencies,
logger logging.Logger,
) (sensor.Sensor, string) {
if cfg == nil || cfg.Name == "" {
return nil, ""
}
s, err := sensor.FromProvider(deps, cfg.Name)
if err != nil {
logger.Errorw(
"unable to initialize capture control sensor; controls will not apply until fixed or removed from config",
"error", err.Error())
return nil, ""
}
return s, cfg.Key
}

// parseOverridesFromReadings extracts capture config overrides from sensor readings.
func parseOverridesFromReadings(readings map[string]interface{}, key string) (map[string]datamanager.CaptureConfigReading, error) {
raw, ok := readings[key]
if !ok {
return nil, nil
}
jsonBytes, err := json.Marshal(raw)
if err != nil {
return nil, fmt.Errorf("failed to marshal reading: %w", err)
}
var controlList []datamanager.CaptureConfigReading
if err := json.Unmarshal(jsonBytes, &controlList); err != nil {
return nil, fmt.Errorf("failed to unmarshal reading: %w", err)
}
if len(controlList) == 0 {
return nil, nil
}
result := make(map[string]datamanager.CaptureConfigReading, len(controlList))
for _, o := range controlList {
result[fmt.Sprintf("%s/%s", o.ResourceName, o.Method)] = o
}
return result, nil
}

// runCaptureControlPoller polls the capture control sensor at 10 Hz and calls capture.SetCaptureConfigs
// whenever the parsed configs change. On invalid or missing readings, it reverts to
// the machine config by passing nil configs.
func (b *builtIn) runCaptureControlPoller(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
}

// Read the sensor reference under a brief lock.
b.mu.Lock()
s := b.controlSensor
key := b.controlSensorKey
b.mu.Unlock()

if s == nil {
return
}

// Call Readings without holding b.mu to avoid blocking Reconfigure.
readings, err := s.Readings(ctx, nil)
if ctx.Err() != nil {
return
}

var newConfigs map[string]datamanager.CaptureConfigReading
if err != nil {
b.logger.Debugw("error getting readings from capture control sensor, reverting to machine config", "error", err.Error())
} else {
var parseErr error
newConfigs, parseErr = parseOverridesFromReadings(readings, key)
if parseErr != nil {
b.logger.Warnw("failed to parse capture config from sensor reading, reverting to machine config", "error", parseErr)
} else if newConfigs == nil {
b.logger.Debugw("capture control sensor returned no configs", "key", key, "readings", readings)
} else {
b.logger.Debugw("capture control sensor parsed configs", "count", len(newConfigs))
}
}

// Apply under lock; SetCaptureConfigs is a no-op if configs haven't changed.
b.mu.Lock()
if ctx.Err() != nil {
b.mu.Unlock()
return
}
b.capture.SetCaptureConfigs(ctx, newConfigs)
b.mu.Unlock()
}
}

func syncSensorFromDeps(name string, deps resource.Dependencies, logger logging.Logger) (sensor.Sensor, bool) {
if name == "" {
return nil, false
Expand Down
55 changes: 36 additions & 19 deletions services/datamanager/builtin/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Capture struct {
maxCaptureFileSize int64
mongoMU sync.Mutex
mongo captureMongo

// baseCollectorConfigs and baseTags are stored after each Reconfigure so
// that SetCaptureConfigs can compute effective configs without requiring callers to pass them.
baseCollectorConfigs CollectorConfigsByResource
baseTags []string
}

type captureMongo struct {
Expand Down Expand Up @@ -238,6 +243,8 @@ func (c *Capture) Reconfigure(
}
c.collectors = newCollectors
c.collectorsMu.Unlock()
c.baseCollectorConfigs = collectorConfigsByResource
c.baseTags = config.Tags
c.captureDir = config.CaptureDir
c.maxCaptureFileSize = config.MaximumCaptureFileSizeBytes
}
Expand Down Expand Up @@ -276,12 +283,6 @@ func (c *Capture) initializeOrUpdateCollector(
config Config,
collection *mongo.Collection,
) (*collectorAndConfig, error) {
// TODO(DATA-451): validate method params
methodParams, err := protoutils.ConvertMapToProtoAny(collectorConfig.AdditionalParams)
if err != nil {
return nil, err
}

maxFileSizeChanged := c.maxCaptureFileSize != config.MaximumCaptureFileSizeBytes
if storedCollectorAndConfig, ok := c.collectors[md]; ok {
if storedCollectorAndConfig.Config.Equals(&collectorConfig) &&
Expand All @@ -294,13 +295,6 @@ func (c *Capture) initializeOrUpdateCollector(
c.logger.Debugf("%s closing collector as config changed", md)
storedCollectorAndConfig.Collector.Close()
}

// Get collector constructor for the component API and method.
collectorConstructor := data.CollectorLookup(md.MethodMetadata)
if collectorConstructor == nil {
return nil, errors.Errorf("failed to find collector constructor for %s", md.MethodMetadata)
}

if collectorConfig.CaptureQueueSize < 0 {
return nil, errors.Errorf("capture_queue_size can't be less than 0, current value: %d", collectorConfig.CaptureQueueSize)
}
Expand All @@ -310,15 +304,38 @@ func (c *Capture) initializeOrUpdateCollector(
}

metadataKey := generateMetadataKey(md.MethodMetadata.API.String(), md.MethodMetadata.MethodName)
additionalParamKey, ok := metadataToAdditionalParamFields[metadataKey]
if ok {
if additionalParamKey, ok := metadataToAdditionalParamFields[metadataKey]; ok {
if _, ok := collectorConfig.AdditionalParams[additionalParamKey]; !ok {
return nil, errors.Errorf("failed to validate additional_params for %s, must supply %s",
md.MethodMetadata.API, additionalParamKey)
}
}

targetDir := targetDir(config.CaptureDir, collectorConfig)
return c.buildCollector(res, md, collectorConfig, c.maxCaptureFileSize, collection)
}

// buildCollector constructs and starts a new collector.
// The override path (SetCaptureConfigs) calls this directly since the base config was already validated.
func (c *Capture) buildCollector(
res resource.Resource,
md collectorMetadata,
collectorConfig datamanager.DataCaptureConfig,
maxCaptureFileSize int64,
collection *mongo.Collection,
) (*collectorAndConfig, error) {
// TODO(DATA-451): validate method params
methodParams, err := protoutils.ConvertMapToProtoAny(collectorConfig.AdditionalParams)
if err != nil {
return nil, err
}

// Get collector constructor for the component API and method.
collectorConstructor := data.CollectorLookup(md.MethodMetadata)
if collectorConstructor == nil {
return nil, errors.Errorf("failed to find collector constructor for %s", md.MethodMetadata)
}

targetDir := targetDir(collectorConfig.CaptureDirectory, collectorConfig)
// Create a collector for this resource and method.
if err := os.MkdirAll(targetDir, 0o700); err != nil {
return nil, errors.Wrapf(err, "failed to create target directory %s with 700 file permissions", targetDir)
Expand All @@ -343,7 +360,7 @@ func (c *Capture) initializeOrUpdateCollector(
MethodName: collectorConfig.Method,
Interval: data.GetDurationFromHz(collectorConfig.CaptureFrequencyHz),
MethodParams: methodParams,
Target: data.NewCaptureBuffer(targetDir, captureMetadata, config.MaximumCaptureFileSizeBytes),
Target: data.NewCaptureBuffer(targetDir, captureMetadata, maxCaptureFileSize),
// Set queue size to defaultCaptureQueueSize if it was not set in the config.
QueueSize: queueSize,
BufferSize: bufferSize,
Expand All @@ -352,11 +369,11 @@ func (c *Capture) initializeOrUpdateCollector(
})
if err != nil {
return nil, errors.Wrapf(err, "constructor for collector %s failed with config: %s",
md, collectorConfigDescription(collectorConfig, targetDir, config.MaximumCaptureFileSizeBytes, queueSize, bufferSize))
md, collectorConfigDescription(collectorConfig, targetDir, maxCaptureFileSize, queueSize, bufferSize))
}

c.logger.Infof("collector initialized; collector: %s, config: %s",
md, collectorConfigDescription(collectorConfig, targetDir, config.MaximumCaptureFileSizeBytes, queueSize, bufferSize))
md, collectorConfigDescription(collectorConfig, targetDir, maxCaptureFileSize, queueSize, bufferSize))
collector.Collect()

return &collectorAndConfig{res, collector, collectorConfig}, nil
Expand Down
Loading