diff --git a/services/datamanager/builtin/builtin.go b/services/datamanager/builtin/builtin.go index 5ecda0b8df0..a93b1977867 100644 --- a/services/datamanager/builtin/builtin.go +++ b/services/datamanager/builtin/builtin.go @@ -11,13 +11,17 @@ package builtin import ( "context" + "encoding/json" "errors" + "fmt" "image" "os" "sync" + "time" "github.com/benbjohnson/clock" v1 "go.viam.com/api/app/datasync/v1" + goutils "go.viam.com/utils" "google.golang.org/grpc" "go.viam.com/rdk/components/sensor" @@ -82,6 +86,11 @@ type builtIn struct { capture *capture.Capture sync *datasync.Sync diskSummaryTracker *diskSummaryTracker + + // capture control sensor fields + controlPoller *goutils.StoppableWorkers + controlSensor sensor.Sensor + controlSensorKey string } // New returns a new builtin data manager service for the given robot. @@ -127,6 +136,17 @@ func New( func (b *builtIn) Close(ctx context.Context) error { b.logger.Info("Close START") defer b.logger.Info("Close END") + + // Stop the control poller before acquiring b.mu to avoid deadlock: + // the poller goroutine holds b.mu while calling capture.SetCaptureConfigs. + b.mu.Lock() + poller := b.controlPoller + b.controlPoller = nil + b.mu.Unlock() + if poller != nil { + poller.Stop() + } + b.mu.Lock() defer b.mu.Unlock() b.diskSummaryTracker.close() @@ -199,8 +219,23 @@ func (b *builtIn) Reconfigure(ctx context.Context, deps resource.Dependencies, c syncSensor, syncSensorEnabled := syncSensorFromDeps(c.SelectiveSyncerName, deps, b.logger) syncConfig := c.syncConfig(syncSensor, syncSensorEnabled, b.logger) + controlSensor, controlSensorKey := captureControlSensorFromDeps(c.CaptureControlSensor, deps, b.logger) + + // Stop the old control poller before acquiring b.mu to avoid deadlock: + // the poller goroutine holds b.mu while calling capture.SetCaptureConfigs. + b.mu.Lock() + oldPoller := b.controlPoller + b.controlPoller = nil + b.mu.Unlock() + if oldPoller != nil { + oldPoller.Stop() + } + b.mu.Lock() defer b.mu.Unlock() + b.controlSensor = controlSensor + b.controlSensorKey = controlSensorKey + // These Reconfigure calls are the only methods in builtin.Reconfigure which create / destroy resources. // It is important that no errors happen for a given Reconfigure call after we being callin Reconfigure on capture & sync // or we could leak goroutines, wasting resources and causing bugs due to duplicate work. @@ -208,9 +243,112 @@ func (b *builtIn) Reconfigure(ctx context.Context, deps resource.Dependencies, c b.capture.Reconfigure(ctx, collectorConfigsByResource, captureConfig) b.sync.Reconfigure(ctx, syncConfig, cloudConnSvc) + if controlSensor != nil && !captureConfig.CaptureDisabled { + b.controlPoller = goutils.NewBackgroundStoppableWorkers(func(ctx context.Context) { + b.runCaptureControlPoller(ctx) + }) + } + return nil } +// captureControlSensorFromDeps resolves the control sensor from dependencies. +// Returns nil if no sensor is configured or the sensor cannot be found. +func captureControlSensorFromDeps(cfg *CaptureControlSensorConfig, deps resource.Dependencies, + logger logging.Logger, +) (sensor.Sensor, string) { + if cfg == nil || cfg.Name == "" { + return nil, "" + } + s, err := sensor.FromProvider(deps, cfg.Name) + if err != nil { + logger.Errorw( + "unable to initialize capture control sensor; controls will not apply until fixed or removed from config", + "error", err.Error()) + return nil, "" + } + return s, cfg.Key +} + +// parseOverridesFromReadings extracts capture config overrides from sensor readings. +func parseOverridesFromReadings(readings map[string]interface{}, key string) (map[string]datamanager.CaptureConfigReading, error) { + raw, ok := readings[key] + if !ok { + return nil, nil + } + jsonBytes, err := json.Marshal(raw) + if err != nil { + return nil, fmt.Errorf("failed to marshal reading: %w", err) + } + var controlList []datamanager.CaptureConfigReading + if err := json.Unmarshal(jsonBytes, &controlList); err != nil { + return nil, fmt.Errorf("failed to unmarshal reading: %w", err) + } + if len(controlList) == 0 { + return nil, nil + } + result := make(map[string]datamanager.CaptureConfigReading, len(controlList)) + for _, o := range controlList { + result[fmt.Sprintf("%s/%s", o.ResourceName, o.Method)] = o + } + return result, nil +} + +// runCaptureControlPoller polls the capture control sensor at 10 Hz and calls capture.SetCaptureConfigs +// whenever the parsed configs change. On invalid or missing readings, it reverts to +// the machine config by passing nil configs. +func (b *builtIn) runCaptureControlPoller(ctx context.Context) { + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + + // Read the sensor reference under a brief lock. + b.mu.Lock() + s := b.controlSensor + key := b.controlSensorKey + b.mu.Unlock() + + if s == nil { + return + } + + // Call Readings without holding b.mu to avoid blocking Reconfigure. + readings, err := s.Readings(ctx, nil) + if ctx.Err() != nil { + return + } + + var newConfigs map[string]datamanager.CaptureConfigReading + if err != nil { + b.logger.Debugw("error getting readings from capture control sensor, reverting to machine config", "error", err.Error()) + } else { + var parseErr error + newConfigs, parseErr = parseOverridesFromReadings(readings, key) + if parseErr != nil { + b.logger.Warnw("failed to parse capture config from sensor reading, reverting to machine config", "error", parseErr) + } else if newConfigs == nil { + b.logger.Debugw("capture control sensor returned no configs", "key", key, "readings", readings) + } else { + b.logger.Debugw("capture control sensor parsed configs", "count", len(newConfigs)) + } + } + + // Apply under lock; SetCaptureConfigs is a no-op if configs haven't changed. + b.mu.Lock() + if ctx.Err() != nil { + b.mu.Unlock() + return + } + b.capture.SetCaptureConfigs(ctx, newConfigs) + b.mu.Unlock() + } +} + func syncSensorFromDeps(name string, deps resource.Dependencies, logger logging.Logger) (sensor.Sensor, bool) { if name == "" { return nil, false diff --git a/services/datamanager/builtin/capture/capture.go b/services/datamanager/builtin/capture/capture.go index 92557ef53a0..3b303dedead 100644 --- a/services/datamanager/builtin/capture/capture.go +++ b/services/datamanager/builtin/capture/capture.go @@ -64,6 +64,11 @@ type Capture struct { maxCaptureFileSize int64 mongoMU sync.Mutex mongo captureMongo + + // baseCollectorConfigs and baseTags are stored after each Reconfigure so + // that SetCaptureConfigs can compute effective configs without requiring callers to pass them. + baseCollectorConfigs CollectorConfigsByResource + baseTags []string } type captureMongo struct { @@ -238,6 +243,8 @@ func (c *Capture) Reconfigure( } c.collectors = newCollectors c.collectorsMu.Unlock() + c.baseCollectorConfigs = collectorConfigsByResource + c.baseTags = config.Tags c.captureDir = config.CaptureDir c.maxCaptureFileSize = config.MaximumCaptureFileSizeBytes } @@ -276,12 +283,6 @@ func (c *Capture) initializeOrUpdateCollector( config Config, collection *mongo.Collection, ) (*collectorAndConfig, error) { - // TODO(DATA-451): validate method params - methodParams, err := protoutils.ConvertMapToProtoAny(collectorConfig.AdditionalParams) - if err != nil { - return nil, err - } - maxFileSizeChanged := c.maxCaptureFileSize != config.MaximumCaptureFileSizeBytes if storedCollectorAndConfig, ok := c.collectors[md]; ok { if storedCollectorAndConfig.Config.Equals(&collectorConfig) && @@ -294,13 +295,6 @@ func (c *Capture) initializeOrUpdateCollector( c.logger.Debugf("%s closing collector as config changed", md) storedCollectorAndConfig.Collector.Close() } - - // Get collector constructor for the component API and method. - collectorConstructor := data.CollectorLookup(md.MethodMetadata) - if collectorConstructor == nil { - return nil, errors.Errorf("failed to find collector constructor for %s", md.MethodMetadata) - } - if collectorConfig.CaptureQueueSize < 0 { return nil, errors.Errorf("capture_queue_size can't be less than 0, current value: %d", collectorConfig.CaptureQueueSize) } @@ -310,15 +304,38 @@ func (c *Capture) initializeOrUpdateCollector( } metadataKey := generateMetadataKey(md.MethodMetadata.API.String(), md.MethodMetadata.MethodName) - additionalParamKey, ok := metadataToAdditionalParamFields[metadataKey] - if ok { + if additionalParamKey, ok := metadataToAdditionalParamFields[metadataKey]; ok { if _, ok := collectorConfig.AdditionalParams[additionalParamKey]; !ok { return nil, errors.Errorf("failed to validate additional_params for %s, must supply %s", md.MethodMetadata.API, additionalParamKey) } } - targetDir := targetDir(config.CaptureDir, collectorConfig) + return c.buildCollector(res, md, collectorConfig, c.maxCaptureFileSize, collection) +} + +// buildCollector constructs and starts a new collector. +// The override path (SetCaptureConfigs) calls this directly since the base config was already validated. +func (c *Capture) buildCollector( + res resource.Resource, + md collectorMetadata, + collectorConfig datamanager.DataCaptureConfig, + maxCaptureFileSize int64, + collection *mongo.Collection, +) (*collectorAndConfig, error) { + // TODO(DATA-451): validate method params + methodParams, err := protoutils.ConvertMapToProtoAny(collectorConfig.AdditionalParams) + if err != nil { + return nil, err + } + + // Get collector constructor for the component API and method. + collectorConstructor := data.CollectorLookup(md.MethodMetadata) + if collectorConstructor == nil { + return nil, errors.Errorf("failed to find collector constructor for %s", md.MethodMetadata) + } + + targetDir := targetDir(collectorConfig.CaptureDirectory, collectorConfig) // Create a collector for this resource and method. if err := os.MkdirAll(targetDir, 0o700); err != nil { return nil, errors.Wrapf(err, "failed to create target directory %s with 700 file permissions", targetDir) @@ -343,7 +360,7 @@ func (c *Capture) initializeOrUpdateCollector( MethodName: collectorConfig.Method, Interval: data.GetDurationFromHz(collectorConfig.CaptureFrequencyHz), MethodParams: methodParams, - Target: data.NewCaptureBuffer(targetDir, captureMetadata, config.MaximumCaptureFileSizeBytes), + Target: data.NewCaptureBuffer(targetDir, captureMetadata, maxCaptureFileSize), // Set queue size to defaultCaptureQueueSize if it was not set in the config. QueueSize: queueSize, BufferSize: bufferSize, @@ -352,11 +369,11 @@ func (c *Capture) initializeOrUpdateCollector( }) if err != nil { return nil, errors.Wrapf(err, "constructor for collector %s failed with config: %s", - md, collectorConfigDescription(collectorConfig, targetDir, config.MaximumCaptureFileSizeBytes, queueSize, bufferSize)) + md, collectorConfigDescription(collectorConfig, targetDir, maxCaptureFileSize, queueSize, bufferSize)) } c.logger.Infof("collector initialized; collector: %s, config: %s", - md, collectorConfigDescription(collectorConfig, targetDir, config.MaximumCaptureFileSizeBytes, queueSize, bufferSize)) + md, collectorConfigDescription(collectorConfig, targetDir, maxCaptureFileSize, queueSize, bufferSize)) collector.Collect() return &collectorAndConfig{res, collector, collectorConfig}, nil diff --git a/services/datamanager/builtin/capture/capture_control.go b/services/datamanager/builtin/capture/capture_control.go new file mode 100644 index 00000000000..6345ebc3a32 --- /dev/null +++ b/services/datamanager/builtin/capture/capture_control.go @@ -0,0 +1,115 @@ +package capture + +import ( + "context" + "fmt" + + "go.viam.com/rdk/services/datamanager" +) + +// captureConfigKey returns the lookup key for a per-resource capture config map. +func captureConfigKey(resourceString, method string) string { + return fmt.Sprintf("%s/%s", resourceString, method) +} + +// fmtFloat32Ptr formats a *float32 for logging. +func fmtFloat32Ptr(f *float32) string { + if f == nil { + return "" + } + return fmt.Sprintf("%f", *f) +} + +// SetCaptureConfigs applies dynamic per-resource capture configs without triggering a full Reconfigure. +// Only collectors whose effective config (base + override) has changed are updated. +// Passing nil or an empty map reverts all collectors to their base machine configs. +// configs is keyed by "resourceName/method" (e.g. "camera-1/GetImages"). +func (c *Capture) SetCaptureConfigs(ctx context.Context, configs map[string]datamanager.CaptureConfigReading) { + type collectorUpdate struct { + md collectorMetadata + cac *collectorAndConfig // nil means remove + } + var toClose []*collectorAndConfig + var updates []collectorUpdate + + for res, cfgs := range c.baseCollectorConfigs { + for _, cfg := range cfgs { + key := captureConfigKey(cfg.Name.ShortName(), cfg.Method) + + // Apply service-level tags. + cfg.Tags = c.baseTags + + // Apply per-resource override if present, otherwise use base config as-is. + if config, ok := configs[key]; ok { + c.logger.Infof("applying capture config for %s: capture_frequency_hz=%s tags=%v", + key, fmtFloat32Ptr(config.CaptureFrequencyHz), config.Tags) + wasDisabled := cfg.Disabled + if config.CaptureFrequencyHz != nil { + oldFreq := cfg.CaptureFrequencyHz + cfg.CaptureFrequencyHz = *config.CaptureFrequencyHz + if cfg.CaptureFrequencyHz != oldFreq { + c.logger.Infof("capture config changing capture_frequency_hz for %s: %f -> %f", + key, oldFreq, cfg.CaptureFrequencyHz) + } + } + + if cfg.CaptureFrequencyHz > 0 { + cfg.Disabled = false + } + if wasDisabled && !cfg.Disabled { + c.logger.Infof("capture config enabling previously disabled collector for %s", key) + } + if config.Tags != nil { + c.logger.Infof("capture config changing tags for %s: %v -> %v", key, cfg.Tags, config.Tags) + cfg.Tags = config.Tags + } + } + + md := newCollectorMetadata(cfg) + existing := c.collectors[md] + + if cfg.Disabled || cfg.CaptureFrequencyHz <= 0 { + if existing != nil { + c.logger.Infof("capture config disabling collector for %s", key) + toClose = append(toClose, existing) + updates = append(updates, collectorUpdate{md, nil}) + } + continue + } + + // Skip if the effective config is unchanged. + if existing != nil && existing.Config.Equals(&cfg) && res == existing.Resource { + continue + } + + coll, err := c.buildCollector(res, md, cfg, c.maxCaptureFileSize, c.mongo.collection) + if err != nil { + c.logger.Warnw("failed to build collector for capture config", "error", err, "key", key) + continue + } + if existing != nil { + toClose = append(toClose, existing) + } + updates = append(updates, collectorUpdate{md, coll}) + } + } + + if len(updates) == 0 { + return + } + + // Update the collectors map atomically, then close replaced collectors. + c.collectorsMu.Lock() + for _, u := range updates { + if u.cac != nil { + c.collectors[u.md] = u.cac + } else { + delete(c.collectors, u.md) + } + } + c.collectorsMu.Unlock() + + for _, old := range toClose { + old.Collector.Close() + } +} diff --git a/services/datamanager/builtin/capture/capture_control_test.go b/services/datamanager/builtin/capture/capture_control_test.go new file mode 100644 index 00000000000..90e3c2acbaa --- /dev/null +++ b/services/datamanager/builtin/capture/capture_control_test.go @@ -0,0 +1,143 @@ +package capture + +import ( + "context" + "sync" + "testing" + + "github.com/benbjohnson/clock" + "go.viam.com/test" + + "go.viam.com/rdk/data" + "go.viam.com/rdk/logging" + "go.viam.com/rdk/resource" + "go.viam.com/rdk/services/datamanager" +) + +func float32Ptr(f float32) *float32 { return &f } + +// mockCollector implements data.Collector for testing without real capture infrastructure. +type mockCollector struct { + closed bool +} + +func (m *mockCollector) Close() { m.closed = true } +func (m *mockCollector) Flush() {} +func (m *mockCollector) Collect() {} + +// cameraAPI is constructed without importing the camera package to avoid triggering its +// init(), which would conflict with our mock collector registration below. +var cameraAPI = resource.APINamespaceRDK.WithComponentType("camera") + +// registerCameraCollector registers a no-op collector constructor for camera/GetImages so that +// tests which trigger collector rebuilds don't fail on a missing constructor lookup. +var registerCameraCollectorOnce sync.Once + +func registerCameraCollector() { + registerCameraCollectorOnce.Do(func() { + data.RegisterCollector( + data.MethodMetadata{API: cameraAPI, MethodName: "GetImages"}, + func(_ interface{}, _ data.CollectorParams) (data.Collector, error) { + return &mockCollector{}, nil + }, + ) + }) +} + +// newTestCapture returns a Capture with the given baseCollectorConfigs and +// an optional pre-populated collectors map. captureDir is always a temp dir. +func newTestCapture( + t *testing.T, + baseCollectorConfigs CollectorConfigsByResource, + existingCollectors collectors, +) *Capture { + t.Helper() + if existingCollectors == nil { + existingCollectors = make(collectors) + } + return &Capture{ + logger: logging.NewTestLogger(t), + clk: clock.New(), + collectors: existingCollectors, + captureDir: t.TempDir(), + maxCaptureFileSize: 256 * 1024, + baseCollectorConfigs: baseCollectorConfigs, + } +} + +func TestSetCaptureConfig(t *testing.T) { + cameraCfg := datamanager.DataCaptureConfig{ + Name: resource.NewName(cameraAPI, "camera-1"), + Method: "GetImages", + CaptureFrequencyHz: 1.0, + } + cameraMD := newCollectorMetadata(cameraCfg) + mock1 := &mockCollector{} // used by "disables collector" case + mock2 := &mockCollector{} // used by "service-level tags" case + mock3 := &mockCollector{} // used by "no-op" case + + for _, tc := range []struct { + name string + baseConfigs CollectorConfigsByResource + existingColls collectors + baseTags []string + input map[string]datamanager.CaptureConfigReading + expectedClosed *mockCollector + expectedNotClosed *mockCollector + expectedCollectorCount int + expectedNewTags []string + }{ + { + name: "no-op when effective config is unchanged", + baseConfigs: CollectorConfigsByResource{nil: []datamanager.DataCaptureConfig{cameraCfg}}, + existingColls: collectors{cameraMD: {Collector: mock3, Config: cameraCfg}}, + input: map[string]datamanager.CaptureConfigReading{"camera-1/GetImages": {CaptureFrequencyHz: float32Ptr(1.0)}}, + expectedNotClosed: mock3, + expectedCollectorCount: 1, + }, + { + name: "disables collector on zero frequency", + baseConfigs: CollectorConfigsByResource{nil: []datamanager.DataCaptureConfig{cameraCfg}}, + existingColls: collectors{cameraMD: {Collector: mock1, Config: cameraCfg}}, + input: map[string]datamanager.CaptureConfigReading{"camera-1/GetImages": {CaptureFrequencyHz: float32Ptr(0)}}, + expectedClosed: mock1, + expectedCollectorCount: 0, + }, + { + name: "reverts to base config on nil input", + baseConfigs: CollectorConfigsByResource{nil: []datamanager.DataCaptureConfig{ + {Name: resource.NewName(cameraAPI, "camera-1"), Method: "GetImages", CaptureFrequencyHz: 1.0, Disabled: true}, + }}, + input: nil, + expectedCollectorCount: 0, + }, + { + name: "service-level tags are overridden by capture config tags", + baseConfigs: CollectorConfigsByResource{nil: []datamanager.DataCaptureConfig{cameraCfg}}, + existingColls: collectors{cameraMD: {Collector: mock2, Config: cameraCfg}}, + baseTags: []string{"service-tag"}, + input: map[string]datamanager.CaptureConfigReading{"camera-1/GetImages": {Tags: []string{"override-tag"}}}, + expectedClosed: mock2, + expectedCollectorCount: 1, + expectedNewTags: []string{"override-tag"}, + }, + } { + t.Run(tc.name, func(t *testing.T) { + registerCameraCollector() + c := newTestCapture(t, tc.baseConfigs, tc.existingColls) + c.baseTags = tc.baseTags + c.SetCaptureConfigs(context.Background(), tc.input) + + if tc.expectedClosed != nil { + test.That(t, tc.expectedClosed.closed, test.ShouldBeTrue) + } + if tc.expectedNotClosed != nil { + test.That(t, tc.expectedNotClosed.closed, test.ShouldBeFalse) + } + test.That(t, len(c.collectors), test.ShouldEqual, tc.expectedCollectorCount) + if tc.expectedNewTags != nil { + test.That(t, c.collectors[cameraMD].Config.Tags, test.ShouldResemble, tc.expectedNewTags) + } + }) + } +} diff --git a/services/datamanager/builtin/config.go b/services/datamanager/builtin/config.go index 50a4959d9f6..2acecfb23e2 100644 --- a/services/datamanager/builtin/config.go +++ b/services/datamanager/builtin/config.go @@ -41,6 +41,14 @@ const ( // Default maximum size in bytes of a data capture file. var defaultMaxCaptureSize = int64(256 * 1024) +// CaptureControlSensorConfig describes a sensor that provides dynamic capture controls. +type CaptureControlSensorConfig struct { + // Name is the resource name of the sensor. + Name string `json:"name"` + // Key is the key in the sensor's Readings map that contains the controls array. + Key string `json:"key"` +} + // Config describes how to configure the service. // See sync.Config and capture.Config for docs on what each field does // to both sync & capture respectively. @@ -63,6 +71,9 @@ type Config struct { ScheduledSyncDisabled bool `json:"sync_disabled"` SelectiveSyncerName string `json:"selective_syncer_name"` SyncIntervalMins float64 `json:"sync_interval_mins"` + // CaptureControlSensor when set specifies a sensor to poll for dynamic + // capture configurations. + CaptureControlSensor *CaptureControlSensorConfig `json:"capture_control_sensor,omitempty"` } // Validate returns components which will be depended upon weakly due to the above matcher. diff --git a/services/datamanager/data_manager.go b/services/datamanager/data_manager.go index 1088cd8d0ba..974f564e63e 100644 --- a/services/datamanager/data_manager.go +++ b/services/datamanager/data_manager.go @@ -195,3 +195,17 @@ func CreateShouldSyncReading(toSync bool) map[string]interface{} { readings[ShouldSyncKey] = toSync return readings } + +// CaptureConfigReading defines a dynamic capture config for a specific resource/method pair, +// as emitted by the capture_control_sensor. +type CaptureConfigReading struct { + // ResourceName is the name of the resource (e.g. "camera-1"). + ResourceName string `json:"resource_name"` + // Method is the capture method name (e.g. "GetImages"). + Method string `json:"method"` + // CaptureFrequencyHz, when non-nil, overrides the capture frequency for this resource/method pair. + // A value of 0 disables capture. + CaptureFrequencyHz *float32 `json:"capture_frequency_hz,omitempty"` + // Tags, when non-nil, overrides the data manager's tags for this resource/method pair. + Tags []string `json:"tags"` +}