From 22e8ba60fcaf2204781c27580ee72ef042423689 Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Tue, 3 Dec 2024 15:22:18 +0000 Subject: [PATCH] Cherry pick changes for 1.5.1 (#2207) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Fixed an issue in the `otlp.exporter.prometheus` component (#2102) * Fixed an issue in the `otlp.exporter.prometheus` component * Fixed an issue in the `otlp.exporter.prometheus` component * Fix potential deadlock in import statements (#2129) * Fix potential deadlock in import statements * change * typo * fix: race condition UI remotecfg (#2160) * Refactor ui remtoecfg components to avoid race condition * Fix accidental cast to pointer that should have been struct * Update changelog * fix: fully prevent panic in remotecfg ui (#2164) * Fully prevent panic in remotecfg ui * Address PR feedback * Fix deadlock due to infinite retry (#2174) * Fix deadlock due to infinite retry * changelog * Update ckit to fix memberlist logging issues (#2186) * Upgrade ckit and changelog * go mod tidy * `loki.source.podlogs`: Fix issue which disables clustering unintentionally. (#2187) * Fix issue which disables clustering unintentionally. * prometheus.operator.*: allow setting informer_sync_timeout (#2161) * prometheus.operator.*: allow setting informer_sync_timeout * default to 1m * docs * fix(pyroscope): allow slashes in tag name (#2172) * loki.source.podlogs: For clustering only take into account some labels (#2190) * Only take into account some labels * Reword docs * fix: crash when updating import.http config (#2204) * fix: crash when updating import.http config * fix key/pattern logic for the attribute processor (#2124) * fix: Update postgres exporter (#2019) * Update postgres exporter * Update changelog * Use postgres exporter branch that implements exporter package * Add TODO for future maintainers * Update VERSION file * Add missing changelog entry * Fix pyroscope.write issues with pyroscope.receive_http (#2201) * Fix pyroscope.write issues with pyroscope.receive_http The nodejs Pyroscope SDK sends profiles with a `Connection: close` header. This header was copied to the upstream request, causing connection churn towards Pyroscope, which can be quite bad on the CPU when using TLS. Do not copy the `Connection` header from the incoming request to fix this issue. Additionally, `pyroscope.write` had a single `http.Client` used for forwarding data from `pyroscope.receive_http`, which may not work if multiple endpoints are configured with different options. To fix this, store a `http.Client` for each endpoint. --------- Co-authored-by: YusifAghalar <41161340+YusifAghalar@users.noreply.github.com> Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> Co-authored-by: Sam DeHaan Co-authored-by: Craig Peterson <192540+captncraig@users.noreply.github.com> Co-authored-by: Marc Sanmiquel Co-authored-by: Sergei Nikolaev Co-authored-by: William Dumont Co-authored-by: Sam DeHaan Co-authored-by: Gergely Madarász --- CHANGELOG.md | 36 +++++ VERSION | 2 +- docs/sources/_index.md | 2 +- .../components/loki/loki.source.kubernetes.md | 2 +- .../components/loki/loki.source.podlogs.md | 15 ++ .../prometheus.operator.podmonitors.md | 1 + .../prometheus/prometheus.operator.probes.md | 1 + .../prometheus.operator.servicemonitors.md | 1 + go.mod | 23 +-- go.sum | 39 ++--- .../component/loki/source/podlogs/podlogs.go | 5 +- .../loki/source/podlogs/reconciler.go | 19 ++- .../component/otelcol/config_attraction.go | 46 +++++- .../otelcol/config_attraction_test.go | 54 +++++++ .../prometheus/internal/convert/convert.go | 2 +- .../processor/attributes/attributes.go | 4 + .../processor/attributes/attributes_test.go | 17 ++ .../prometheus/operator/common/crdmanager.go | 5 +- .../component/prometheus/operator/types.go | 3 + internal/component/pyroscope/write/parser.go | 2 +- .../component/pyroscope/write/parser_test.go | 150 ++++++++++++++++++ internal/component/pyroscope/write/write.go | 57 ++++--- .../runtime/internal/controller/loader.go | 32 ++-- .../internal/controller/node_config_import.go | 10 +- .../internal/importsource/import_http.go | 21 +-- internal/service/ui/ui.go | 5 +- internal/web/api/api.go | 113 +++++++++---- 27 files changed, 545 insertions(+), 122 deletions(-) create mode 100644 internal/component/pyroscope/write/parser_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 55bd9e2bbb..c56683a11a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,42 @@ This document contains a historical list of changes between releases. Only changes that impact end-user behavior are listed; changes to documentation or internal API changes are not present. +v1.5.1 +----------------- + +### Enhancements + +- Logs from underlying clustering library `memberlist` are now surfaced with correct level (@thampiotr) + +- Allow setting `informer_sync_timeout` in prometheus.operator.* components. (@captncraig) + +- For sharding targets during clustering, `loki.source.podlogs` now only takes into account some labels. (@ptodev) + +### Bugfixes + +- Fixed an issue in the `pyroscope.write` component to prevent TLS connection churn to Pyroscope when the `pyroscope.receive_http` clients don't request keepalive (@madaraszg-tulip) + +- Fixed an issue in the `pyroscope.write` component with multiple endpoints not working correctly for forwarding profiles from `pyroscope.receive_http` (@madaraszg-tulip) + +- Fixed issue with reloading configuration and prometheus metrics duplication in `prometheus.write.queue`. (@mattdurham) + +- Fixed a few race conditions that could lead to a deadlock when using `import` statements, which could lead to a memory leak on `/metrics` endpoint of an Alloy instance. (@thampiotr) + +- Fix a race condition where the ui service was dependent on starting after the remotecfg service, which is not guaranteed. (@dehaansa & @erikbaranowski) + +- Fixed an issue in the `otelcol.exporter.prometheus` component that would set series value incorrectly for stale metrics (@YusifAghalar) + +- `loki.source.podlogs`: Fixed a bug which prevented clustering from working and caused duplicate logs to be sent. + The bug only happened when no `selector` or `namespace_selector` blocks were specified in the Alloy configuration. (@ptodev) + +- Fixed an issue in the `pyroscope.write` component to allow slashes in application names in the same way it is done in the Pyroscope push API (@marcsanmi) + +- Fixed a crash when updating the configuration of `remote.http`. (@kinolaev) + +- Fixed an issue in the `otelcol.processor.attribute` component where the actions `delete` and `hash` could not be used with the `pattern` argument. (@wildum) + +- Fixed an issue in the `prometheus.exporter.postgres` component that would leak goroutines when the target was not reachable (@dehaansa) + v1.5.0 ----------------- diff --git a/VERSION b/VERSION index ca452642ff..b964911a61 100644 --- a/VERSION +++ b/VERSION @@ -20,4 +20,4 @@ # # Lines starting with "#" and blank lines are ignored. -v1.5.0 +v1.5.1 diff --git a/docs/sources/_index.md b/docs/sources/_index.md index b16eecb592..5bed5577b6 100644 --- a/docs/sources/_index.md +++ b/docs/sources/_index.md @@ -4,7 +4,7 @@ title: Grafana Alloy description: Grafana Alloy is a a vendor-neutral distribution of the OTel Collector weight: 350 cascade: - ALLOY_RELEASE: v1.5.0 + ALLOY_RELEASE: v1.5.1 OTEL_VERSION: v0.112.0 PROM_WIN_EXP_VERSION: v0.27.3 SNMP_VERSION: v0.26.0 diff --git a/docs/sources/reference/components/loki/loki.source.kubernetes.md b/docs/sources/reference/components/loki/loki.source.kubernetes.md index e0e3896a99..4cb86cc620 100644 --- a/docs/sources/reference/components/loki/loki.source.kubernetes.md +++ b/docs/sources/reference/components/loki/loki.source.kubernetes.md @@ -139,7 +139,7 @@ When {{< param "PRODUCT_NAME" >}} is [using clustering][], and `enabled` is set If {{< param "PRODUCT_NAME" >}} is _not_ running in clustered mode, then the block is a no-op and `loki.source.kubernetes` collects logs from every target it receives in its arguments. -Clustering only looks at the following labels for determining the shard key: +Clustering looks only at the following labels for determining the shard key: * `__pod_namespace__` * `__pod_name__` diff --git a/docs/sources/reference/components/loki/loki.source.podlogs.md b/docs/sources/reference/components/loki/loki.source.podlogs.md index 0ce4caac49..ae4703f675 100644 --- a/docs/sources/reference/components/loki/loki.source.podlogs.md +++ b/docs/sources/reference/components/loki/loki.source.podlogs.md @@ -231,6 +231,21 @@ cluster to distribute the load of log collection between all cluster nodes. If {{< param "PRODUCT_NAME" >}} is _not_ running in clustered mode, then the block is a no-op and `loki.source.podlogs` collects logs based on every PodLogs resource discovered. +Clustering looks only at the following labels for determining the shard key: + +* `__pod_namespace__` +* `__pod_name__` +* `__pod_container_name__` +* `__pod_uid__` +* `__meta_kubernetes_namespace` +* `__meta_kubernetes_pod_name` +* `__meta_kubernetes_pod_container_name` +* `__meta_kubernetes_pod_uid` +* `container` +* `pod` +* `job` +* `namespace` + [using clustering]: ../../../../get-started/clustering/ ## Exported fields diff --git a/docs/sources/reference/components/prometheus/prometheus.operator.podmonitors.md b/docs/sources/reference/components/prometheus/prometheus.operator.podmonitors.md index e39dcabf51..2ef2ca30b3 100644 --- a/docs/sources/reference/components/prometheus/prometheus.operator.podmonitors.md +++ b/docs/sources/reference/components/prometheus/prometheus.operator.podmonitors.md @@ -34,6 +34,7 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `forward_to` | `list(MetricsReceiver)` | List of receivers to send scraped metrics to. | | yes `namespaces` | `list(string)` | List of namespaces to search for PodMonitor resources. If not specified, all namespaces will be searched. || no +`informer_sync_timeout` | `duration` | Timeout for initial sync of PodMonitor resources. | `1m` | no ## Blocks diff --git a/docs/sources/reference/components/prometheus/prometheus.operator.probes.md b/docs/sources/reference/components/prometheus/prometheus.operator.probes.md index ab05421c5e..d1d6421173 100644 --- a/docs/sources/reference/components/prometheus/prometheus.operator.probes.md +++ b/docs/sources/reference/components/prometheus/prometheus.operator.probes.md @@ -37,6 +37,7 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `forward_to` | `list(MetricsReceiver)` | List of receivers to send scraped metrics to. | | yes `namespaces` | `list(string)` | List of namespaces to search for Probe resources. If not specified, all namespaces will be searched. || no +`informer_sync_timeout` | `duration` | Timeout for initial sync of Probe resources. | `1m` | no ## Blocks diff --git a/docs/sources/reference/components/prometheus/prometheus.operator.servicemonitors.md b/docs/sources/reference/components/prometheus/prometheus.operator.servicemonitors.md index c5182b7171..2e9c712c85 100644 --- a/docs/sources/reference/components/prometheus/prometheus.operator.servicemonitors.md +++ b/docs/sources/reference/components/prometheus/prometheus.operator.servicemonitors.md @@ -36,6 +36,7 @@ Name | Type | Description | Default | Required ---- | ---- | ----------- | ------- | -------- `forward_to` | `list(MetricsReceiver)` | List of receivers to send scraped metrics to. | | yes `namespaces` | `list(string)` | List of namespaces to search for ServiceMonitor resources. If not specified, all namespaces will be searched. || no +`informer_sync_timeout` | `duration` | Timeout for initial sync of ServiceMonitor resources. | `1m` | no ## Blocks diff --git a/go.mod b/go.mod index b8af8ca0cb..747aac86df 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( github.com/grafana/alloy/syntax v0.1.0 github.com/grafana/beyla v1.8.7 github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d - github.com/grafana/ckit v0.0.0-20241001124237-ee134485edd3 + github.com/grafana/ckit v0.0.0-20241128150632-1e6dfe1c6183 github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb github.com/grafana/go-gelf/v2 v2.0.1 @@ -94,6 +94,7 @@ require ( github.com/klauspost/compress v1.17.11 github.com/leodido/go-syslog/v4 v4.2.0 github.com/lib/pq v1.10.9 + github.com/mackerelio/go-osstat v0.2.5 github.com/magefile/mage v1.15.0 // indirect github.com/miekg/dns v1.1.61 github.com/mitchellh/mapstructure v1.5.1-0.20231216201459-8508981c8b6c @@ -177,7 +178,7 @@ require ( github.com/spaolacci/murmur3 v1.1.0 github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 github.com/testcontainers/testcontainers-go v0.33.0 github.com/tilinna/clock v1.1.0 github.com/ua-parser/uap-go v0.0.0-20240611065828-3a4781585db6 // indirect @@ -243,13 +244,13 @@ require ( go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/crypto v0.28.0 + golang.org/x/crypto v0.29.0 golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91 golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 - golang.org/x/net v0.30.0 + golang.org/x/net v0.31.0 golang.org/x/oauth2 v0.23.0 - golang.org/x/sys v0.26.0 - golang.org/x/text v0.19.0 + golang.org/x/sys v0.27.0 + golang.org/x/text v0.20.0 golang.org/x/time v0.6.0 golang.org/x/tools v0.25.0 google.golang.org/api v0.188.0 @@ -795,8 +796,8 @@ require ( go4.org/netipx v0.0.0-20230125063823-8449b0a6169f // indirect golang.org/x/arch v0.7.0 // indirect golang.org/x/mod v0.21.0 // indirect - golang.org/x/sync v0.8.0 - golang.org/x/term v0.25.0 // indirect + golang.org/x/sync v0.9.0 + golang.org/x/term v0.26.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect gonum.org/v1/gonum v0.15.1 // indirect @@ -842,8 +843,6 @@ require ( go.opentelemetry.io/otel/exporters/stdout/stdoutlog v0.7.0 // indirect ) -require github.com/mackerelio/go-osstat v0.2.5 - // NOTE: replace directives below must always be *temporary*. // // Adding a replace directive to change a module to a fork of a module will @@ -898,7 +897,9 @@ replace ( // https://github.com/grafana/cadvisor/tree/grafana-v0.47-noglobals github.com/google/cadvisor => github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 - github.com/prometheus-community/postgres_exporter => github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd + // TODO(dehaansa): integrate the changes from the exporter-package-v0.15.0 branch into at least the + // grafana fork of the exporter, or completely into upstream + github.com/prometheus-community/postgres_exporter => github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168 // TODO(marctc): remove once this PR is merged upstream: https://github.com/prometheus/mysqld_exporter/pull/774 github.com/prometheus/mysqld_exporter => github.com/grafana/mysqld_exporter v0.12.2-0.20231005125903-364b9c41e595 diff --git a/go.sum b/go.sum index 424d0eb4c7..45bcbea3fc 100644 --- a/go.sum +++ b/go.sum @@ -180,8 +180,8 @@ github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHg github.com/Code-Hex/go-generics-cache v1.5.1 h1:6vhZGc5M7Y/YD8cIUcY8kcuQLB4cHR7U+0KMqAA0KcU= github.com/Code-Hex/go-generics-cache v1.5.1/go.mod h1:qxcC9kRVrct9rHeiYpFWSoW1vxyillCVzX13KZG8dl4= github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= -github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= -github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/agent-payload/v5 v5.0.134 h1:h0oP3vDTOsjW1uKIZxKsCjOV/40jkY2Y+42GKAVH9ig= github.com/DataDog/agent-payload/v5 v5.0.134/go.mod h1:FgVQKmVdqdmZTbxIptqJC/l+xEzdiXsaAOs/vGAvWzs= github.com/DataDog/datadog-agent/cmd/agent/common/path v0.57.1 h1:9WpqKeK4bAc8pSb0sK4fY03bUOqhWUZdGoVh55KBaNI= @@ -1215,8 +1215,8 @@ github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2 h1:ju6EcY2aEobeBg github.com/grafana/cadvisor v0.0.0-20240729082359-1f04a91701e2/go.mod h1:8sLW/G7rcFe1CKMaA4pYT4mX3P1xQVGqM6luzEzx/2g= github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d h1:6sNPBwOokfCxAyateu7iLdtyWDUzaLLShPs7F4eTLfw= github.com/grafana/catchpoint-prometheus-exporter v0.0.0-20240606062944-e55f3668661d/go.mod h1:aGPSALDAkw18nn8M7gumhM/MbJG+zgOA3jNWTwPYtLg= -github.com/grafana/ckit v0.0.0-20241001124237-ee134485edd3 h1:t1oO5eBWAwnryyW5e+MvJocf88HRtR0x/UsR4M0z290= -github.com/grafana/ckit v0.0.0-20241001124237-ee134485edd3/go.mod h1:h3W376FaLy2VAhqm2w6IAkJjJxr2bGkdsdIcjadWMbs= +github.com/grafana/ckit v0.0.0-20241128150632-1e6dfe1c6183 h1:9ubc4+JkJ6lChOOxXsFalg0LMAqUX6eXZqu80luIvlA= +github.com/grafana/ckit v0.0.0-20241128150632-1e6dfe1c6183/go.mod h1:+Ddq4Cmuz2NnrPzhlZyEdxcDzsPhT6CW270t1aiCj5k= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2 h1:qhugDMdQ4Vp68H0tp/0iN17DM2ehRo1rLEdOFe/gB8I= github.com/grafana/cloudflare-go v0.0.0-20230110200409-c627cf6792f2/go.mod h1:w/aiO1POVIeXUQyl0VQSZjl5OAGDTL5aX+4v0RA1tcw= github.com/grafana/dskit v0.0.0-20240104111617-ea101a3b86eb h1:AWE6+kvtE18HP+lRWNUCyvymyrFSXs6TcS2vXIXGIuw= @@ -1254,8 +1254,8 @@ github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241 github.com/grafana/opentelemetry-collector/processor/batchprocessor v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:QLQ31rGjPuMc/nGw4rL4HzQI9F0jVAPEmC342chxoqA= github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a h1:ZycgUSrrwtB2x1fMdLD88J2k8886/HvX1tYHlaOH/hg= github.com/grafana/opentelemetry-collector/service v0.0.0-20241104164848-8ea9d0a3e17a/go.mod h1:VTLnax+DjHal3q7WKQO0ITjWdfPTq2txaoNRcVXYzgE= -github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd h1:vNHdecaOmYgSHMEQRgyzWacV++N38Jp8qLZg0RCsfFo= -github.com/grafana/postgres_exporter v0.15.1-0.20240417113938-9358270470dd/go.mod h1:kR16GJ0ZwWVQ2osW3pgtDJU1a/GXpufrwio0kLG14cg= +github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168 h1:I7FyVTtge/3G5YHVOMDG0l4If6W+kXbFDqtzj5gCSGs= +github.com/grafana/postgres_exporter v0.15.1-0.20241105053755-e0a51174f168/go.mod h1:dMrETGkSetWByp2XGsm8g6pRVh/ibnrDxKsN4BqnGNg= github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6 h1:kih3d3M3dxAmrpFLvnIxFzWx8KMQyKxQwKgWP67C/Fg= github.com/grafana/prometheus v1.8.2-0.20240514135907-13889ba362e6/go.mod h1:yv4MwOn3yHMQ6MZGHPg/U7Fcyqf+rxqiZfSur6myVtc= github.com/grafana/pyroscope-go/godeltaprof v0.1.8 h1:iwOtYXeeVSAeYefJNaxDytgjKtUuKQbJqgAIjlnicKg= @@ -2412,8 +2412,9 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807 h1:LUsDduamlucuNnWcaTbXQ6aLILFcLXADpOzeEH3U+OI= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= @@ -2832,8 +2833,8 @@ golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= -golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= +golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= +golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91 h1:Lyizcy9jX02jYR0ceBkL6S+jRys8Uepf7wt1vrz6Ras= golang.org/x/crypto/x509roots/fallback v0.0.0-20240208163226-62c9f1799c91/go.mod h1:kNa9WdvYnzFwC79zRpLRMJbdEFlhyM5RPFBBZp/wWH8= golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -2956,8 +2957,8 @@ golang.org/x/net v0.11.0/go.mod h1:2L/ixqYpgIVXmeoSA/4Lu7BzTG4KIyPIryS4IsOd1oQ= golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= -golang.org/x/net v0.30.0 h1:AcW1SDZMkb8IpzCdQUaIq2sP4sZ4zw+55h6ynffypl4= -golang.org/x/net v0.30.0/go.mod h1:2wGyMJ5iFasEhkwi13ChkO/t1ECNC4X4eBKkVFyYFlU= +golang.org/x/net v0.31.0 h1:68CPQngjLL0r2AlUKiSxtQFKvzRVbnzLwMUn5SzcLHo= +golang.org/x/net v0.31.0/go.mod h1:P4fl1q7dY2hnZFxEk4pPSkDHF+QqjitcnDjUQyMM+pM= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -2990,8 +2991,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.9.0 h1:fEo0HyrW1GIgZdpbhCRO0PkJajUS5H9IFUztCgEo2jQ= +golang.org/x/sync v0.9.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -3113,8 +3114,8 @@ golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= -golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= +golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= @@ -3128,8 +3129,8 @@ golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY= -golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= -golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= +golang.org/x/term v0.26.0 h1:WEQa6V3Gja/BhNxg540hBip/kkaYtRg3cxg4oXSw4AU= +golang.org/x/term v0.26.0/go.mod h1:Si5m1o57C5nBNQo5z1iq+XDijt21BDBDp2bK0QI8e3E= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -3150,8 +3151,8 @@ golang.org/x/text v0.10.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= -golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug= +golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/component/loki/source/podlogs/podlogs.go b/internal/component/loki/source/podlogs/podlogs.go index db1bca5d99..d903c5eaed 100644 --- a/internal/component/loki/source/podlogs/podlogs.go +++ b/internal/component/loki/source/podlogs/podlogs.go @@ -258,6 +258,10 @@ func (c *Component) updateTailer(args Arguments) error { // updateReconciler updates the state of the reconciler. This must only be // called after updateTailer. mut must be held when calling. func (c *Component) updateReconciler(args Arguments) error { + // The clustering settings should always be updated, + // even if the selectors haven't changed. + c.reconciler.SetDistribute(args.Clustering.Enabled) + var ( selectorChanged = !reflect.DeepEqual(c.args.Selector, args.Selector) namespaceSelectorChanged = !reflect.DeepEqual(c.args.NamespaceSelector, args.NamespaceSelector) @@ -276,7 +280,6 @@ func (c *Component) updateReconciler(args Arguments) error { } c.reconciler.UpdateSelectors(sel, nsSel) - c.reconciler.SetDistribute(args.Clustering.Enabled) // Request a reconcile so the new selectors get applied. c.controller.RequestReconcile() diff --git a/internal/component/loki/source/podlogs/reconciler.go b/internal/component/loki/source/podlogs/reconciler.go index 977cb57bba..65c74b49b3 100644 --- a/internal/component/loki/source/podlogs/reconciler.go +++ b/internal/component/loki/source/podlogs/reconciler.go @@ -3,6 +3,7 @@ package podlogs import ( "context" "fmt" + "slices" "sort" "strings" "sync" @@ -15,6 +16,7 @@ import ( "github.com/grafana/alloy/internal/service/cluster" "github.com/grafana/ckit/shard" "github.com/prometheus/common/model" + prom_lbls "github.com/prometheus/prometheus/model/labels" promlabels "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" "github.com/prometheus/prometheus/util/strutil" @@ -126,6 +128,17 @@ func (r *reconciler) Reconcile(ctx context.Context, cli client.Client) error { return nil } +func filterLabels(lbls prom_lbls.Labels, keysToKeep []string) prom_lbls.Labels { + var res prom_lbls.Labels + for _, k := range lbls { + if slices.Contains(keysToKeep, k.Name) { + res = append(res, prom_lbls.Label{Name: k.Name, Value: k.Value}) + } + } + sort.Sort(res) + return res +} + func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetail.Target { if c == nil { return targets @@ -140,7 +153,11 @@ func distributeTargets(c cluster.Cluster, targets []*kubetail.Target) []*kubetai res := make([]*kubetail.Target, 0, resCap) for _, target := range targets { - peers, err := c.Lookup(shard.StringKey(target.Labels().String()), 1, shard.OpReadWrite) + // Only take into account the labels necessary to uniquely identify a pod/container instance. + // If we take into account more labels than necessary, there may be issues due to labels changing + // over the lifetime of the pod. + clusteringLabels := filterLabels(target.DiscoveryLabels(), kubetail.ClusteringLabels) + peers, err := c.Lookup(shard.StringKey(clusteringLabels.String()), 1, shard.OpReadWrite) if err != nil { // This can only fail in case we ask for more owners than the // available peers. This will never happen, but in any case we fall diff --git a/internal/component/otelcol/config_attraction.go b/internal/component/otelcol/config_attraction.go index b009397112..b6c8b4082c 100644 --- a/internal/component/otelcol/config_attraction.go +++ b/internal/component/otelcol/config_attraction.go @@ -1,5 +1,16 @@ package otelcol +import ( + "errors" + "fmt" + "strings" +) + +const ( + delete = "delete" + hash = "hash" +) + type AttrActionKeyValueSlice []AttrActionKeyValue func (actions AttrActionKeyValueSlice) Convert() []interface{} { @@ -15,10 +26,27 @@ func (actions AttrActionKeyValueSlice) Convert() []interface{} { return res } +func (actions AttrActionKeyValueSlice) Validate() error { + var validationErrors []error + + for i, action := range actions { + if err := action.validate(); err != nil { + wrappedErr := fmt.Errorf("validation failed for action block number %d: %w", i+1, err) + validationErrors = append(validationErrors, wrappedErr) + } + } + + if len(validationErrors) > 0 { + return errors.Join(validationErrors...) + } + return nil +} + type AttrActionKeyValue struct { // Key specifies the attribute to act upon. - // This is a required field. - Key string `alloy:"key,attr"` + // The actions `delete` and `hash` can use the `pattern`` argument instead of/with the `key` argument. + // The field is required for all other actions. + Key string `alloy:"key,attr,optional"` // Value specifies the value to populate for the key. // The type of the value is inferred from the configuration. @@ -91,3 +119,17 @@ func (args *AttrActionKeyValue) convert() map[string]interface{} { "converted_type": args.ConvertedType, } } + +func (args *AttrActionKeyValue) validate() error { + switch strings.ToLower(args.Action) { + case delete, hash: + if args.Key == "" && args.RegexPattern == "" { + return fmt.Errorf("the action %s requires at least the key argument or the pattern argument to be set", args.Action) + } + default: + if args.Key == "" { + return fmt.Errorf("the action %s requires the key argument to be set", args.Action) + } + } + return nil +} diff --git a/internal/component/otelcol/config_attraction_test.go b/internal/component/otelcol/config_attraction_test.go index 5879bde815..3e6655b03f 100644 --- a/internal/component/otelcol/config_attraction_test.go +++ b/internal/component/otelcol/config_attraction_test.go @@ -1,6 +1,7 @@ package otelcol_test import ( + "strings" "testing" "github.com/grafana/alloy/internal/component/otelcol" @@ -58,3 +59,56 @@ func TestConvertAttrAction(t *testing.T) { result := inputActions.Convert() require.Equal(t, expectedActions, result) } + +func TestValidateAttrAction(t *testing.T) { + inputActions := otelcol.AttrActionKeyValueSlice{ + { + // ok - only key + Action: "insert", + Value: 123, + Key: "attribute1", + }, + { + // not ok - missing key + Action: "insert", + Value: 123, + RegexPattern: "pattern", // pattern is useless here + }, + { + // ok - only key + Action: "delete", + Key: "key", + }, + { + // ok - only pattern + Action: "delete", + RegexPattern: "pattern", + }, + { + // ok - both + Action: "delete", + Key: "key", + RegexPattern: "pattern", + }, + { + // not ok - missing key and pattern + Action: "delete", + }, + { + // ok - only pattern + Action: "hash", + RegexPattern: "pattern", + }, + { + // ok - with uppercase + Action: "HaSH", + RegexPattern: "pattern", + }, + } + + expectedErrors := []string{ + "validation failed for action block number 2: the action insert requires the key argument to be set", + "validation failed for action block number 6: the action delete requires at least the key argument or the pattern argument to be set", + } + require.EqualError(t, inputActions.Validate(), strings.Join(expectedErrors, "\n")) +} diff --git a/internal/component/otelcol/exporter/prometheus/internal/convert/convert.go b/internal/component/otelcol/exporter/prometheus/internal/convert/convert.go index 364f94e6ce..ad3928bd64 100644 --- a/internal/component/otelcol/exporter/prometheus/internal/convert/convert.go +++ b/internal/component/otelcol/exporter/prometheus/internal/convert/convert.go @@ -339,7 +339,7 @@ func writeSeries(app storage.Appender, series *memorySeries, dp otelcolDataPoint series.SetTimestamp(ts) if dp.Flags().NoRecordedValue() { - val = float64(value.StaleNaN) + val = math.Float64frombits(value.StaleNaN) } series.SetValue(val) diff --git a/internal/component/otelcol/processor/attributes/attributes.go b/internal/component/otelcol/processor/attributes/attributes.go index 48495b3152..4301f77927 100644 --- a/internal/component/otelcol/processor/attributes/attributes.go +++ b/internal/component/otelcol/processor/attributes/attributes.go @@ -55,6 +55,10 @@ func (args *Arguments) SetToDefault() { args.DebugMetrics.SetToDefault() } +func (args *Arguments) Validate() error { + return args.Actions.Validate() +} + // Convert implements processor.Arguments. func (args Arguments) Convert() (otelcomponent.Config, error) { input := make(map[string]interface{}) diff --git a/internal/component/otelcol/processor/attributes/attributes_test.go b/internal/component/otelcol/processor/attributes/attributes_test.go index e2019982cd..6412730700 100644 --- a/internal/component/otelcol/processor/attributes/attributes_test.go +++ b/internal/component/otelcol/processor/attributes/attributes_test.go @@ -134,6 +134,23 @@ func testRunProcessorWithContext(ctx context.Context, t *testing.T, processorCon processortest.TestRunProcessor(prc) } +// Test that the validate function is called. The validation logic for the actions is tested in the otelcol pkg. +func Test_Validate(t *testing.T) { + cfg := ` + action { + value = 111111 + action = "insert" + } + + output { + // no-op: will be overridden by test code. + } + ` + expectedErr := "validation failed for action block number 1: the action insert requires the key argument to be set" + var args attributes.Arguments + require.ErrorContains(t, syntax.Unmarshal([]byte(cfg), &args), expectedErr) +} + func Test_Insert(t *testing.T) { cfg := ` action { diff --git a/internal/component/prometheus/operator/common/crdmanager.go b/internal/component/prometheus/operator/common/crdmanager.go index f5c13da577..18e462c6b2 100644 --- a/internal/component/prometheus/operator/common/crdmanager.go +++ b/internal/component/prometheus/operator/common/crdmanager.go @@ -37,9 +37,6 @@ import ( "github.com/grafana/alloy/internal/util" ) -// Generous timeout period for configuring all informers -const informerSyncTimeout = 10 * time.Second - type crdManagerInterface interface { Run(ctx context.Context) error ClusteringUpdated() @@ -332,7 +329,7 @@ func (c *crdManager) configureInformers(ctx context.Context, informers cache.Inf return fmt.Errorf("unknown kind to configure Informers: %s", c.kind) } - informerCtx, cancel := context.WithTimeout(ctx, informerSyncTimeout) + informerCtx, cancel := context.WithTimeout(ctx, c.args.InformerSyncTimeout) defer cancel() informer, err := informers.GetInformer(informerCtx, prototype) diff --git a/internal/component/prometheus/operator/types.go b/internal/component/prometheus/operator/types.go index 8ed8cc4149..74dc55e882 100644 --- a/internal/component/prometheus/operator/types.go +++ b/internal/component/prometheus/operator/types.go @@ -32,6 +32,8 @@ type Arguments struct { RelabelConfigs []*alloy_relabel.Config `alloy:"rule,block,optional"` Scrape ScrapeOptions `alloy:"scrape,block,optional"` + + InformerSyncTimeout time.Duration `alloy:"informer_sync_timeout,attr,optional"` } // ScrapeOptions holds values that configure scraping behavior. @@ -54,6 +56,7 @@ var DefaultArguments = Arguments{ Client: kubernetes.ClientArguments{ HTTPClientConfig: config.DefaultHTTPClientConfig, }, + InformerSyncTimeout: time.Minute, } // SetToDefault implements syntax.Defaulter. diff --git a/internal/component/pyroscope/write/parser.go b/internal/component/pyroscope/write/parser.go index 385a241862..3bd97583b3 100644 --- a/internal/component/pyroscope/write/parser.go +++ b/internal/component/pyroscope/write/parser.go @@ -192,7 +192,7 @@ func validateAppName(n string) error { } func isAppNameRuneAllowed(r rune) bool { - return r == '-' || r == '.' || isTagKeyRuneAllowed(r) + return r == '-' || r == '.' || r == '/' || isTagKeyRuneAllowed(r) } func isTagKeyReserved(k string) bool { diff --git a/internal/component/pyroscope/write/parser_test.go b/internal/component/pyroscope/write/parser_test.go new file mode 100644 index 0000000000..66cd6e9cd0 --- /dev/null +++ b/internal/component/pyroscope/write/parser_test.go @@ -0,0 +1,150 @@ +package write + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestParseKey(t *testing.T) { + tests := []struct { + name string + input string + expected *Key + wantErr bool + }{ + { + name: "basic app name", + input: "simple-app", + expected: &Key{ + labels: map[string]string{ + "__name__": "simple-app", + }, + }, + }, + { + name: "app name with slashes and tags", + input: "my/service/name{environment=prod,version=1.0}", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "environment": "prod", + "version": "1.0", + }, + }, + }, + { + name: "multiple slashes and special characters", + input: "app/service/v1.0-beta/component{region=us-west}", + expected: &Key{ + labels: map[string]string{ + "__name__": "app/service/v1.0-beta/component", + "region": "us-west", + }, + }, + }, + { + name: "empty app name", + input: "{}", + wantErr: true, + }, + { + name: "invalid characters in tag key", + input: "my/service/name{invalid@key=value}", + wantErr: true, + }, + { + name: "whitespace handling", + input: "my/service/name{ tag1 = value1 , tag2 = value2 }", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "tag1": "value1", + "tag2": "value2", + }, + }, + }, + { + name: "dots in service name", + input: "my/service.name/v1.0{environment=prod}", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service.name/v1.0", + "environment": "prod", + }, + }, + }, + { + name: "app name with slashes", + input: "my/service/name{}", + expected: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := ParseKey(tt.input) + + if tt.wantErr { + assert.Error(t, err) + return + } + + require.NoError(t, err) + assert.Equal(t, tt.expected, got) + }) + } +} + +func TestKey_Normalized(t *testing.T) { + tests := []struct { + name string + key *Key + expected string + }{ + { + name: "simple normalization", + key: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + }, + }, + expected: "my/service/name{}", + }, + { + name: "normalization with tags", + key: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "environment": "prod", + "version": "1.0", + }, + }, + expected: "my/service/name{environment=prod,version=1.0}", + }, + { + name: "tags should be sorted", + key: &Key{ + labels: map[string]string{ + "__name__": "my/service/name", + "c": "3", + "b": "2", + "a": "1", + }, + }, + expected: "my/service/name{a=1,b=2,c=3}", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.key.Normalized() + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/internal/component/pyroscope/write/write.go b/internal/component/pyroscope/write/write.go index 157287f539..780fbe80d1 100644 --- a/internal/component/pyroscope/write/write.go +++ b/internal/component/pyroscope/write/write.go @@ -38,6 +38,17 @@ var ( return Arguments{} } _ component.Component = (*Component)(nil) + + // List of headers to ignore when copying headers from client to server connection + // https://datatracker.ietf.org/doc/html/rfc9113#name-connection-specific-header- + ignoreProxyHeaders = map[string]bool{ + "Connection": true, + "Proxy-Connection": true, + "Keep-Alive": true, + "Transfer-Encoding": true, + "Upgrade": true, + "TE": true, + } ) func init() { @@ -155,40 +166,38 @@ func (c *Component) Update(newConfig component.Arguments) error { type fanOutClient struct { // The list of push clients to fan out to. - clients []pushv1connect.PusherServiceClient - httpClient *http.Client - config Arguments - opts component.Options - metrics *metrics + pushClients []pushv1connect.PusherServiceClient + ingestClients map[*EndpointOptions]*http.Client + config Arguments + opts component.Options + metrics *metrics } // NewFanOut creates a new fan out client that will fan out to all endpoints. func NewFanOut(opts component.Options, config Arguments, metrics *metrics) (*fanOutClient, error) { - clients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) + pushClients := make([]pushv1connect.PusherServiceClient, 0, len(config.Endpoints)) + ingestClients := make(map[*EndpointOptions]*http.Client) uid := alloyseed.Get().UID - var httpClient *http.Client for _, endpoint := range config.Endpoints { if endpoint.Headers == nil { endpoint.Headers = map[string]string{} } endpoint.Headers[alloyseed.LegacyHeaderName] = uid endpoint.Headers[alloyseed.HeaderName] = uid - client, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) + httpClient, err := commonconfig.NewClientFromConfig(*endpoint.HTTPClientConfig.Convert(), endpoint.Name) if err != nil { return nil, err } - clients = append(clients, pushv1connect.NewPusherServiceClient(client, endpoint.URL, WithUserAgent(userAgent))) - if httpClient == nil { - httpClient = client - } + pushClients = append(pushClients, pushv1connect.NewPusherServiceClient(httpClient, endpoint.URL, WithUserAgent(userAgent))) + ingestClients[endpoint] = httpClient } return &fanOutClient{ - clients: clients, - httpClient: httpClient, - config: config, - opts: opts, - metrics: metrics, + pushClients: pushClients, + ingestClients: ingestClients, + config: config, + opts: opts, + metrics: metrics, }, nil } @@ -202,7 +211,7 @@ func (f *fanOutClient) Push(ctx context.Context, req *connect.Request[pushv1.Pus reqSize, profileCount = requestSize(req) ) - for i, client := range f.clients { + for i, client := range f.pushClients { var ( client = client i = i @@ -395,6 +404,11 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco // First set profile headers as defaults for k, v := range profile.Headers { + // Ignore this header as it may interfere with keepalives in the connection to pyroscope + // which may cause huge load due to tls renegotiation + if _, exists := ignoreProxyHeaders[k]; exists { + continue + } req.Header[k] = v } @@ -403,12 +417,17 @@ func (f *fanOutClient) AppendIngest(ctx context.Context, profile *pyroscope.Inco req.Header.Set(k, v) } - resp, err := f.httpClient.Do(req) + resp, err := f.ingestClients[endpoint].Do(req) if err != nil { return fmt.Errorf("do request: %w", err) } defer resp.Body.Close() + _, err = io.Copy(io.Discard, resp.Body) + if err != nil { + return fmt.Errorf("read response body: %w", err) + } + if resp.StatusCode != http.StatusOK { return &PyroscopeWriteError{StatusCode: resp.StatusCode} } diff --git a/internal/runtime/internal/controller/loader.go b/internal/runtime/internal/controller/loader.go index fae75f5865..9af5919722 100644 --- a/internal/runtime/internal/controller/loader.go +++ b/internal/runtime/internal/controller/loader.go @@ -10,6 +10,12 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/backoff" + "github.com/hashicorp/go-multierror" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" + "github.com/grafana/alloy/internal/featuregate" "github.com/grafana/alloy/internal/runtime/internal/dag" "github.com/grafana/alloy/internal/runtime/internal/worker" @@ -19,11 +25,6 @@ import ( "github.com/grafana/alloy/syntax/ast" "github.com/grafana/alloy/syntax/diag" "github.com/grafana/alloy/syntax/vm" - "github.com/grafana/dskit/backoff" - "github.com/hashicorp/go-multierror" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" ) // The Loader builds and evaluates ComponentNodes from Alloy blocks. @@ -92,10 +93,11 @@ func NewLoader(opts LoaderOptions) *Loader { componentNodeManager: NewComponentNodeManager(globals, reg), // This is a reasonable default which should work for most cases. If a component is completely stuck, we would - // retry and log an error every 10 seconds, at most. + // retry and log an error every 10 seconds, at most. We give up after some time to prevent lasting deadlocks. backoffConfig: backoff.Config{ MinBackoff: 1 * time.Millisecond, MaxBackoff: 10 * time.Second, + MaxRetries: 20, // Give up after 20 attempts - it could be a deadlock instead of an overload. }, graph: &dag.Graph{}, @@ -736,19 +738,31 @@ func (l *Loader) EvaluateDependants(ctx context.Context, updatedNodes []*QueuedN l.concurrentEvalFn(nodeRef, dependantCtx, tracer, parentRef) }) if err != nil { - level.Error(l.log).Log( - "msg", "failed to submit node for evaluation - Alloy is likely overloaded "+ - "and cannot keep up with evaluating components - will retry", + level.Warn(l.log).Log( + "msg", "failed to submit node for evaluation - will retry", "err", err, "node_id", n.NodeID(), "originator_id", parent.Node.NodeID(), "retries", retryBackoff.NumRetries(), ) + // When backing off, release the mut in case the evaluation requires to interact with the loader itself. + l.mut.RUnlock() retryBackoff.Wait() + l.mut.RLock() } else { break } } + if err != nil && !retryBackoff.Ongoing() { + level.Error(l.log).Log( + "msg", "retry attempts exhausted when submitting node for evaluation to the worker pool - "+ + "this could be a deadlock, performance bottleneck or severe overload leading to goroutine starvation", + "err", err, + "node_id", n.NodeID(), + "originator_id", parent.Node.NodeID(), + "retries", retryBackoff.NumRetries(), + ) + } span.SetAttributes(attribute.Int("retries", retryBackoff.NumRetries())) if err != nil { span.SetStatus(codes.Error, err.Error()) diff --git a/internal/runtime/internal/controller/node_config_import.go b/internal/runtime/internal/controller/node_config_import.go index 5d6e5a200a..30a480fa5b 100644 --- a/internal/runtime/internal/controller/node_config_import.go +++ b/internal/runtime/internal/controller/node_config_import.go @@ -47,12 +47,14 @@ type ImportConfigNode struct { importChildrenUpdateChan chan struct{} // used to trigger an update of the running children + // NOTE: To avoid deadlocks, whenever we need both locks we must always first lock the mut, then healthMut. mut sync.RWMutex importedContent map[string]string importConfigNodesChildren map[string]*ImportConfigNode importChildrenRunning bool importedDeclares map[string]ast.Body + // NOTE: To avoid deadlocks, whenever we need both locks we must always first lock the mut, then healthMut. healthMut sync.RWMutex evalHealth component.Health // Health of the last source evaluation runHealth component.Health // Health of running @@ -156,10 +158,14 @@ func (cn *ImportConfigNode) setContentHealth(t component.HealthType, msg string) // 4. Health reported from the source. // 5. Health reported from the nested imports. func (cn *ImportConfigNode) CurrentHealth() component.Health { - cn.healthMut.RLock() - defer cn.healthMut.RUnlock() + // NOTE: Since other code paths such as onContentUpdate -> setContentHealth will + // also end up acquiring both of these mutexes, it's _essential_ to keep the + // order in which they're locked consistent to avoid deadlocks. We must always first + // lock the mut, then healthMut. cn.mut.RLock() defer cn.mut.RUnlock() + cn.healthMut.RLock() + defer cn.healthMut.RUnlock() health := component.LeastHealthy( cn.runHealth, diff --git a/internal/runtime/internal/importsource/import_http.go b/internal/runtime/internal/importsource/import_http.go index f8cbfd469b..815748f8f6 100644 --- a/internal/runtime/internal/importsource/import_http.go +++ b/internal/runtime/internal/importsource/import_http.go @@ -66,17 +66,18 @@ func (im *ImportHTTP) Evaluate(scope *vm.Scope) error { if err := im.eval.Evaluate(scope, &arguments); err != nil { return fmt.Errorf("decoding configuration: %w", err) } + remoteHttpArguments := remote_http.Arguments{ + URL: arguments.URL, + PollFrequency: arguments.PollFrequency, + PollTimeout: arguments.PollTimeout, + Method: arguments.Method, + Headers: arguments.Headers, + Body: arguments.Body, + Client: arguments.Client, + } if im.managedRemoteHTTP == nil { var err error - im.managedRemoteHTTP, err = remote_http.New(im.managedOpts, remote_http.Arguments{ - URL: arguments.URL, - PollFrequency: arguments.PollFrequency, - PollTimeout: arguments.PollTimeout, - Method: arguments.Method, - Headers: arguments.Headers, - Body: arguments.Body, - Client: arguments.Client, - }) + im.managedRemoteHTTP, err = remote_http.New(im.managedOpts, remoteHttpArguments) if err != nil { return fmt.Errorf("creating http component: %w", err) } @@ -88,7 +89,7 @@ func (im *ImportHTTP) Evaluate(scope *vm.Scope) error { } // Update the existing managed component - if err := im.managedRemoteHTTP.Update(arguments); err != nil { + if err := im.managedRemoteHTTP.Update(remoteHttpArguments); err != nil { return fmt.Errorf("updating component: %w", err) } im.arguments = arguments diff --git a/internal/service/ui/ui.go b/internal/service/ui/ui.go index ae462ddfe7..ea69448a82 100644 --- a/internal/service/ui/ui.go +++ b/internal/service/ui/ui.go @@ -78,10 +78,7 @@ func (s *Service) Data() any { func (s *Service) ServiceHandler(host service.Host) (base string, handler http.Handler) { r := mux.NewRouter() - remotecfgSvc, _ := host.GetService(remotecfg_service.ServiceName) - remotecfgHost := remotecfgSvc.Data().(remotecfg_service.Data).Host - - fa := api.NewAlloyAPI(host, remotecfgHost, s.opts.CallbackManager) + fa := api.NewAlloyAPI(host, s.opts.CallbackManager) fa.RegisterRoutes(path.Join(s.opts.UIPrefix, "/api/v0/web"), r) ui.RegisterRoutes(s.opts.UIPrefix, r) diff --git a/internal/web/api/api.go b/internal/web/api/api.go index 3c6688d04e..53592b744a 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -18,19 +18,19 @@ import ( "github.com/grafana/alloy/internal/service" "github.com/grafana/alloy/internal/service/cluster" "github.com/grafana/alloy/internal/service/livedebugging" + "github.com/grafana/alloy/internal/service/remotecfg" "github.com/prometheus/prometheus/util/httputil" ) // AlloyAPI is a wrapper around the component API. type AlloyAPI struct { alloy service.Host - remotecfg service.Host CallbackManager livedebugging.CallbackManager } // NewAlloyAPI instantiates a new Alloy API. -func NewAlloyAPI(alloy, remotecfg service.Host, CallbackManager livedebugging.CallbackManager) *AlloyAPI { - return &AlloyAPI{alloy: alloy, remotecfg: remotecfg, CallbackManager: CallbackManager} +func NewAlloyAPI(alloy service.Host, CallbackManager livedebugging.CallbackManager) *AlloyAPI { + return &AlloyAPI{alloy: alloy, CallbackManager: CallbackManager} } // RegisterRoutes registers all the API's routes. @@ -40,13 +40,13 @@ func (a *AlloyAPI) RegisterRoutes(urlPrefix string, r *mux.Router) { // component IDs. r.Handle(path.Join(urlPrefix, "/modules/{moduleID:.+}/components"), httputil.CompressionHandler{Handler: listComponentsHandler(a.alloy)}) - r.Handle(path.Join(urlPrefix, "/remotecfg/modules/{moduleID:.+}/components"), httputil.CompressionHandler{Handler: listComponentsHandler(a.remotecfg)}) + r.Handle(path.Join(urlPrefix, "/remotecfg/modules/{moduleID:.+}/components"), httputil.CompressionHandler{Handler: listComponentsHandlerRemoteCfg(a.alloy)}) r.Handle(path.Join(urlPrefix, "/components"), httputil.CompressionHandler{Handler: listComponentsHandler(a.alloy)}) - r.Handle(path.Join(urlPrefix, "/remotecfg/components"), httputil.CompressionHandler{Handler: listComponentsHandler(a.remotecfg)}) + r.Handle(path.Join(urlPrefix, "/remotecfg/components"), httputil.CompressionHandler{Handler: listComponentsHandlerRemoteCfg(a.alloy)}) r.Handle(path.Join(urlPrefix, "/components/{id:.+}"), httputil.CompressionHandler{Handler: getComponentHandler(a.alloy)}) - r.Handle(path.Join(urlPrefix, "/remotecfg/components/{id:.+}"), httputil.CompressionHandler{Handler: getComponentHandler(a.remotecfg)}) + r.Handle(path.Join(urlPrefix, "/remotecfg/components/{id:.+}"), httputil.CompressionHandler{Handler: getComponentHandlerRemoteCfg(a.alloy)}) r.Handle(path.Join(urlPrefix, "/peers"), httputil.CompressionHandler{Handler: getClusteringPeersHandler(a.alloy)}) r.Handle(path.Join(urlPrefix, "/debug/{id:.+}"), liveDebugging(a.alloy, a.CallbackManager)) @@ -54,53 +54,96 @@ func (a *AlloyAPI) RegisterRoutes(urlPrefix string, r *mux.Router) { func listComponentsHandler(host service.Host) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - // moduleID is set from the /modules/{moduleID:.+}/components route above - // but not from the /components route. - var moduleID string - if vars := mux.Vars(r); vars != nil { - moduleID = vars["moduleID"] - } + listComponentsHandlerInternal(host, w, r) + } +} - components, err := host.ListComponents(moduleID, component.InfoOptions{ - GetHealth: true, - }) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) +func listComponentsHandlerRemoteCfg(host service.Host) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + svc, found := host.GetService(remotecfg.ServiceName) + if !found { + http.Error(w, "remote config service not available", http.StatusInternalServerError) return } - bb, err := json.Marshal(components) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + data := svc.Data().(remotecfg.Data) + if data.Host == nil { + http.Error(w, "remote config service startup in progress", http.StatusInternalServerError) return } - _, _ = w.Write(bb) + listComponentsHandlerInternal(data.Host, w, r) + } +} + +func listComponentsHandlerInternal(host service.Host, w http.ResponseWriter, r *http.Request) { + // moduleID is set from the /modules/{moduleID:.+}/components route above + // but not from the /components route. + var moduleID string + if vars := mux.Vars(r); vars != nil { + moduleID = vars["moduleID"] + } + + components, err := host.ListComponents(moduleID, component.InfoOptions{ + GetHealth: true, + }) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + bb, err := json.Marshal(components) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + _, _ = w.Write(bb) } func getComponentHandler(host service.Host) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - vars := mux.Vars(r) - requestedComponent := component.ParseID(vars["id"]) + getComponentHandlerInternal(host, w, r) + } +} - component, err := host.GetComponent(requestedComponent, component.InfoOptions{ - GetHealth: true, - GetArguments: true, - GetExports: true, - GetDebugInfo: true, - }) - if err != nil { - http.NotFound(w, r) +func getComponentHandlerRemoteCfg(host service.Host) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + svc, found := host.GetService(remotecfg.ServiceName) + if !found { + http.Error(w, "remote config service not available", http.StatusInternalServerError) return } - bb, err := json.Marshal(component) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) + data := svc.Data().(remotecfg.Data) + if data.Host == nil { + http.Error(w, "remote config service startup in progress", http.StatusInternalServerError) return } - _, _ = w.Write(bb) + + getComponentHandlerInternal(data.Host, w, r) + } +} + +func getComponentHandlerInternal(host service.Host, w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + requestedComponent := component.ParseID(vars["id"]) + + component, err := host.GetComponent(requestedComponent, component.InfoOptions{ + GetHealth: true, + GetArguments: true, + GetExports: true, + GetDebugInfo: true, + }) + if err != nil { + http.NotFound(w, r) + return + } + + bb, err := json.Marshal(component) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return } + _, _ = w.Write(bb) } func getClusteringPeersHandler(host service.Host) http.HandlerFunc {