Skip to content

Commit c819395

Browse files
authored
Discovery observers Start failures should not stop nor crash the collector (#5299)
* Discovery observers Start failures should not stop nor crash the collector * Changelog * fieldaligment for discoverer_test.go * Add comment to GetExtensions
1 parent 35a4217 commit c819395

File tree

3 files changed

+21
-28
lines changed

3 files changed

+21
-28
lines changed

CHANGELOG.md

+4
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,10 @@
66

77
- (Splunk) Deprecate the nagios monitor ([#5172](https://github.com/signalfx/splunk-otel-collector/pull/5172))
88

9+
### 🧰 Bug fixes 🧰
10+
11+
- (Splunk) Discovery observers start failures should not stop the collector ([#5299](https://github.com/signalfx/splunk-otel-collector/pull/5299))
12+
913
## v0.108.0
1014

1115
This Splunk OpenTelemetry Collector release includes changes from the [opentelemetry-collector v0.108.1](https://github.com/open-telemetry/opentelemetry-collector/releases/tag/v0.108.1) and the [opentelemetry-collector-contrib v0.108.0](https://github.com/open-telemetry/opentelemetry-collector-contrib/releases/tag/v0.108.0) releases where appropriate.

internal/confmapprovider/discovery/discoverer.go

+10-13
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type discoverer struct {
6767
factories otelcol.Factories
6868
// receiverID -> observerID -> config
6969
unexpandedReceiverEntries map[component.ID]map[component.ID]map[string]any
70-
extensions map[component.ID]otelcolextension.Extension
70+
operationalObservers map[component.ID]otelcolextension.Extension // Only extensions successfully started should be added to this map.
7171
logger *zap.Logger
7272
discoveredReceivers map[component.ID]discovery.StatusType
7373
configs map[string]*Config
@@ -104,7 +104,6 @@ func newDiscoverer(logger *zap.Logger) (*discoverer, error) {
104104
logger: logger,
105105
info: info,
106106
factories: factories,
107-
extensions: map[component.ID]otelcolextension.Extension{},
108107
configs: map[string]*Config{},
109108
duration: duration,
110109
mu: sync.Mutex{},
@@ -182,10 +181,7 @@ func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
182181
return nil, nil
183182
}
184183

185-
err = d.performDiscovery(discoveryReceivers, discoveryObservers)
186-
if err != nil {
187-
return nil, err
188-
}
184+
d.performDiscovery(discoveryReceivers, discoveryObservers)
189185

190186
discoveryConfig, err := d.discoveryConfig(cfg)
191187
if err != nil {
@@ -194,7 +190,7 @@ func (d *discoverer) discover(cfg *Config) (map[string]any, error) {
194190
return discoveryConfig, nil
195191
}
196192

197-
func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelcolreceiver.Logs, discoveryObservers map[component.ID]otelcolextension.Extension) error {
193+
func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelcolreceiver.Logs, discoveryObservers map[component.ID]otelcolextension.Extension) {
198194
var cancels []context.CancelFunc
199195

200196
defer func() {
@@ -203,6 +199,8 @@ func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelco
203199
}
204200
}()
205201

202+
d.operationalObservers = make(map[component.ID]component.Component, len(discoveryObservers))
203+
206204
for observerID, observer := range discoveryObservers {
207205
d.logger.Debug(fmt.Sprintf("starting observer %q", observerID))
208206
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -212,7 +210,7 @@ func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelco
212210
fmt.Sprintf("%q startup failed. Won't proceed with %q-based discovery", observerID, observerID.Type()),
213211
zap.Error(e),
214212
)
215-
return e
213+
continue
216214
}
217215
defer func(obsID component.ID, obsExt otelcolextension.Extension) {
218216
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -221,6 +219,7 @@ func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelco
221219
d.logger.Warn(fmt.Sprintf("error shutting down observer %q", obsID), zap.Error(e))
222220
}
223221
}(observerID, observer)
222+
d.operationalObservers[observerID] = observer
224223
}
225224

226225
for receiverID, receiver := range discoveryReceivers {
@@ -232,7 +231,7 @@ func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelco
232231
fmt.Sprintf("%q startup failed.", receiverID),
233232
zap.Error(err),
234233
)
235-
return err
234+
continue
236235
}
237236
defer func(rcvID component.ID, rcv otelcolreceiver.Logs) {
238237
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
@@ -249,8 +248,6 @@ func (d *discoverer) performDiscovery(discoveryReceivers map[component.ID]otelco
249248
case <-context.Background().Done():
250249
}
251250
_, _ = fmt.Fprintf(os.Stderr, "Discovery complete.\n")
252-
253-
return nil
254251
}
255252

256253
func (d *discoverer) createDiscoveryReceiversAndObservers(cfg *Config) (map[component.ID]otelcolreceiver.Logs, map[component.ID]otelcolextension.Extension, error) {
@@ -268,7 +265,6 @@ func (d *discoverer) createDiscoveryReceiversAndObservers(cfg *Config) (map[comp
268265
// disabled by property
269266
continue
270267
}
271-
d.extensions[observerID] = observer
272268
discoveryObservers[observerID] = observer
273269

274270
discoveryReceiverDefaultConfig := discoveryReceiverFactory.CreateDefaultConfig()
@@ -627,8 +623,9 @@ func (d *discoverer) GetFactory(kind component.Kind, componentType component.Typ
627623
}
628624

629625
// GetExtensions is a component.Host method used to forward discovery observers.
626+
// This method only returns operational extensions, i.e., those that have been successfully started.
630627
func (d *discoverer) GetExtensions() map[component.ID]otelcolextension.Extension {
631-
return d.extensions
628+
return d.operationalObservers
632629
}
633630

634631
// GetExporters is a component.Host method.

internal/confmapprovider/discovery/discoverer_test.go

+7-15
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,9 @@ import (
3535

3636
func TestInnerDiscoveryExecution(t *testing.T) {
3737
tests := []struct {
38-
name string
39-
observers map[component.ID]otelcolextension.Extension
40-
receivers map[component.ID]otelcolreceiver.Logs
41-
expectedMsg string
38+
observers map[component.ID]otelcolextension.Extension
39+
receivers map[component.ID]otelcolreceiver.Logs
40+
name string
4241
}{
4342
{
4443
name: "happy_path",
@@ -62,7 +61,6 @@ func TestInnerDiscoveryExecution(t *testing.T) {
6261
receivers: map[component.ID]otelcolreceiver.Logs{
6362
component.MustNewID("receiver00"): &mockReceiverLogs{},
6463
},
65-
expectedMsg: "extension_start_error",
6664
},
6765
{
6866
name: "fail_start_receiver",
@@ -75,10 +73,9 @@ func TestInnerDiscoveryExecution(t *testing.T) {
7573
component.MustNewID("receiver01"): &mockReceiverLogs{mockComponent{startErr: fmt.Errorf("receiver_start_error")}},
7674
component.MustNewID("receiver02"): &mockReceiverLogs{},
7775
},
78-
expectedMsg: "receiver_start_error",
7976
},
8077
{
81-
name: "fail_shutdown_no_error_msg",
78+
name: "fail_shutdown_no_crash",
8279
observers: map[component.ID]otelcolextension.Extension{
8380
component.MustNewID("observer00"): &mockExtension{},
8481
component.MustNewID("observer01"): &mockExtension{},
@@ -98,14 +95,15 @@ func TestInnerDiscoveryExecution(t *testing.T) {
9895
require.NotNil(t, d)
9996

10097
d.duration = 1 * time.Second
101-
err = d.performDiscovery(tt.receivers, tt.observers)
98+
d.performDiscovery(tt.receivers, tt.observers)
10299

103-
for _, observer := range tt.observers {
100+
for id, observer := range tt.observers {
104101
mockExtension := observer.(*mockExtension)
105102
if mockExtension.started {
106103
assert.True(t, mockExtension.shutdown)
107104
} else {
108105
assert.False(t, mockExtension.shutdown)
106+
assert.NotContains(t, d.operationalObservers, id)
109107
}
110108
}
111109

@@ -117,12 +115,6 @@ func TestInnerDiscoveryExecution(t *testing.T) {
117115
assert.False(t, mockReceiver.shutdown)
118116
}
119117
}
120-
121-
if tt.expectedMsg != "" {
122-
assert.ErrorContains(t, err, tt.expectedMsg)
123-
} else {
124-
assert.NoError(t, err)
125-
}
126118
})
127119
}
128120
}

0 commit comments

Comments
 (0)