Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitlab/test/e2e/e2e.yml
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ new-e2e-npm-packages:
variables:
TARGETS: ./tests/npm
TEAM: cloud-network-monitoring
EXTRA_PARAMS: --run "TestEC2(VM|VMSELinux|VMWKit)Suite"
EXTRA_PARAMS: --run "TestEC2(VM|VMSELinux|VMWKit|VMDirect)Suite"
ON_NIGHTLY_FIPS: "true"

new-e2e-npm-docker:
Expand Down
2 changes: 1 addition & 1 deletion .gitlab/test/kernel_matrix_testing/common.yml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
# ssh into each micro-vm and run initialization script. This script will also run the tests.
- ${CI_PROJECT_DIR}/tools/ci/retry.sh scp "$DD_AGENT_TESTING_DIR/job_env.txt" "metal_instance:/home/ubuntu/job_env-${ARCH}-${TAG}-${TEST_SET}.txt"
- ${CI_PROJECT_DIR}/tools/ci/retry.sh ssh metal_instance "scp /home/ubuntu/job_env-${ARCH}-${TAG}-${TEST_SET}.txt ${MICRO_VM_IP}:/job_env.txt"
- NESTED_VM_CMD="/home/ubuntu/connector -host ${MICRO_VM_IP} -user root -ssh-file /home/kernel-version-testing/ddvm_rsa -vm-cmd 'mount -o remount,size=5G /opt/kmt-ramfs && CI=true /root/fetch_dependencies.sh ${ARCH} && COLLECT_COMPLEXITY=${COLLECT_COMPLEXITY} /opt/micro-vm-init.sh -test-tools /opt/testing-tools -retry ${RETRY} -test-root /opt/${TEST_COMPONENT}-tests -packages-run-config /opt/${TEST_SET}.json'"
- NESTED_VM_CMD="/home/ubuntu/connector -host ${MICRO_VM_IP} -user root -ssh-file /home/kernel-version-testing/ddvm_rsa -vm-cmd 'mount -o remount,size=6G /opt/kmt-ramfs && CI=true /root/fetch_dependencies.sh ${ARCH} && COLLECT_COMPLEXITY=${COLLECT_COMPLEXITY} /opt/micro-vm-init.sh -test-tools /opt/testing-tools -retry ${RETRY} -test-root /opt/${TEST_COMPONENT}-tests -packages-run-config /opt/${TEST_SET}.json'"
- $CI_PROJECT_DIR/connector-$ARCH -host $INSTANCE_IP -user ubuntu -ssh-file $AWS_EC2_SSH_KEY_FILE -vm-cmd "${NESTED_VM_CMD}" -send-env-vars CI_COMMIT_SHA,DD_API_KEY # Allow DD_API_KEY to be passed to the metal instance, so we can use it to send metrics from the connector.
- NO_RETRY_EXIT_CODE=42 ${CI_PROJECT_DIR}/tools/ci/retry.sh ssh metal_instance "ssh ${MICRO_VM_IP} '/opt/testing-tools/test-json-review -flakes /opt/testing-tools/flakes.yaml -codeowners /opt/testing-tools/CODEOWNERS -test-root /opt/${TEST_COMPONENT}-tests'"
- "[ ! -f $CI_PROJECT_DIR/daemon-${ARCH}.log ] && ${CI_PROJECT_DIR}/tools/ci/retry.sh scp metal_instance:/home/ubuntu/daemon.log $CI_PROJECT_DIR/vm-metrics-daemon-${ARCH}.log"
Expand Down
8 changes: 4 additions & 4 deletions cmd/system-probe/modules/eventmonitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ func createEventMonitorModule(_ *sysconfigtypes.Config, deps module.FactoryDepen
log.Info("event monitoring network consumer initialized")

if netconfig.DirectSend {
dp, err := sender.NewDirectSenderConsumer(evm, deps.Log, deps.SysprobeConfig)
ds, err := sender.NewDirectSenderConsumer(evm, deps.Log, deps.SysprobeConfig)
if err != nil {
return nil, err
}
if dp != nil {
evm.RegisterEventConsumer(dp)
log.Info("event monitoring docker proxy consumer initialized")
if ds != nil {
evm.RegisterEventConsumer(ds)
log.Info("event monitoring direct sender consumer initialized")
}
}
}
Expand Down
39 changes: 38 additions & 1 deletion cmd/system-probe/modules/network_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/DataDog/datadog-agent/pkg/network"
networkconfig "github.com/DataDog/datadog-agent/pkg/network/config"
"github.com/DataDog/datadog-agent/pkg/network/encoding/marshal"
"github.com/DataDog/datadog-agent/pkg/network/sender"
"github.com/DataDog/datadog-agent/pkg/network/tracer"
"github.com/DataDog/datadog-agent/pkg/system-probe/api/module"
sysconfigtypes "github.com/DataDog/datadog-agent/pkg/system-probe/config/types"
Expand Down Expand Up @@ -53,8 +54,37 @@ func createNetworkTracerModule(_ *sysconfigtypes.Config, deps module.FactoryDepe
}

t, err := tracer.NewTracer(ncfg, deps.Telemetry, deps.Statsd)
if err != nil {
return nil, err
}

ctx, cancel := context.WithCancel(context.Background())
var connsSender sender.Sender
if ncfg.DirectSend {
connsSender, err = sender.New(ctx, t, sender.Dependencies{
Config: deps.CoreConfig,
Logger: deps.Log,
Sysprobeconfig: deps.SysprobeConfig,
Tagger: deps.Tagger,
Wmeta: deps.WMeta,
Hostname: deps.Hostname,
Forwarder: deps.ConnectionsForwarder,
NPCollector: deps.NPCollector,
})
if err != nil {
t.Stop()
cancel()
return nil, fmt.Errorf("create direct sender: %s", err)
}
}

return &networkTracer{tracer: t, cfg: ncfg}, err
return &networkTracer{
tracer: t,
cfg: ncfg,
connsSender: connsSender,
ctx: ctx,
cancelFunc: cancel,
}, nil
}

var _ module.Module = &networkTracer{}
Expand All @@ -63,6 +93,9 @@ type networkTracer struct {
tracer *tracer.Tracer
cfg *networkconfig.Config
restartTimer *time.Timer
connsSender sender.Sender
ctx context.Context
cancelFunc context.CancelFunc
}

func (nt *networkTracer) GetStats() map[string]interface{} {
Expand Down Expand Up @@ -203,7 +236,11 @@ func (nt *networkTracer) Register(httpMux *module.Router) error {

// Close will stop all system probe activities
func (nt *networkTracer) Close() {
if nt.connsSender != nil {
nt.connsSender.Stop()
}
nt.tracer.Stop()
nt.cancelFunc()
}

func logRequests(client string, count uint64, connectionsCount int, start time.Time) {
Expand Down
10 changes: 10 additions & 0 deletions cmd/system-probe/subcommands/run/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,12 @@ import (
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
workloadmetafx "github.com/DataDog/datadog-agent/comp/core/workloadmeta/fx"
"github.com/DataDog/datadog-agent/comp/dogstatsd/statsd"
connectionsforwarderfx "github.com/DataDog/datadog-agent/comp/forwarder/connectionsforwarder/fx"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatform/eventplatformimpl"
"github.com/DataDog/datadog-agent/comp/forwarder/eventplatformreceiver/eventplatformreceiverimpl"
"github.com/DataDog/datadog-agent/comp/networkpath/npcollector/npcollectorimpl"
localtraceroute "github.com/DataDog/datadog-agent/comp/networkpath/traceroute/fx-local"
rdnsquerierfx "github.com/DataDog/datadog-agent/comp/rdnsquerier/fx"
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient"
"github.com/DataDog/datadog-agent/comp/remote-config/rcclient/rcclientimpl"
logscompressionfx "github.com/DataDog/datadog-agent/comp/serializer/logscompression/fx"
Expand Down Expand Up @@ -168,6 +173,11 @@ func getSharedFxOption() fx.Option {
remoteagentfx.Module(),
fxinstrumentation.Module(),
localtraceroute.Module(),
connectionsforwarderfx.Module(),
eventplatformreceiverimpl.Module(),
eventplatformimpl.Module(eventplatformimpl.NewDefaultParams()),
rdnsquerierfx.Module(),
npcollectorimpl.Module(),
)
}

Expand Down
2 changes: 2 additions & 0 deletions comp/api/api/def/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ var AuthorizedConfigPathsCore = buildAuthorizedSet(
"runtime_security_config.endpoints.additional_endpoints",
"runtime_security_config.activity_dump.remote_storage.endpoints",
"compliance_config.endpoints",
"process_config.process_dd_url",
"process_config.additional_endpoints",
)

func buildAuthorizedSet(paths ...string) AuthorizedSet {
Expand Down
3 changes: 2 additions & 1 deletion comp/core/profiler/impl/profiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,9 @@ func (p profiler) processAgentEnabled() bool {
processChecksEnabled
npmEnabled := p.sysProbeCfg.GetBool("network_config.enabled")
usmEnabled := p.sysProbeCfg.GetBool("service_monitoring_config.enabled")
directSendEnabled := p.sysProbeCfg.GetBool("network_config.direct_send")

return processChecksInProcessAgent || npmEnabled || usmEnabled
return processChecksInProcessAgent || ((npmEnabled || usmEnabled) && !directSendEnabled)
}

func (p profiler) apmEnabled() bool {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,17 @@ import (
// MockModule defines the fx options for the mock component.
func MockModule() fxutil.Module {
return fxutil.Component(
fx.Provide(newMock),
fx.Provide(NewMock),
)
}

type npCollectorMock struct{}

func (s *npCollectorMock) ScheduleNetworkPathTests(_conns iter.Seq[npmodel.NetworkPathConnection]) {
panic("implement me")
}
func (s *npCollectorMock) ScheduleNetworkPathTests(_conns iter.Seq[npmodel.NetworkPathConnection]) {}

func newMock() provides {
func NewMock() Provides {
// Mock initialization
return provides{
return Provides{
Comp: &npCollectorMock{},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type dependencies struct {
Statsd statsd.ClientInterface
}

type provides struct {
// Provides defines the output of the npcollector component
type Provides struct {
fx.Out

Comp npcollector.Component
Expand All @@ -45,7 +46,7 @@ func Module() fxutil.Module {
)
}

func newNpCollector(deps dependencies) provides {
func newNpCollector(deps dependencies) Provides {
var collector *npCollectorImpl

configs := newConfig(deps.AgentConfig, deps.Logger)
Expand Down Expand Up @@ -86,7 +87,7 @@ func newNpCollector(deps dependencies) provides {
collector = newNoopNpCollectorImpl()
}

return provides{
return Provides{
Comp: collector,
}
}
3 changes: 2 additions & 1 deletion pkg/network/encoding/marshal/usm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package marshal

import (
"bytes"
"fmt"
"sync"

Expand All @@ -25,7 +26,7 @@ type USMEncoder interface {
// EncodeConnection encodes USM data for a given connection into the given builder. Returns static tags and dynamic tags.
EncodeConnection(network.ConnectionStats, *model.ConnectionBuilder) (uint64, map[string]struct{})
// EncodeConnectionDirect encodes USM data for a given connection directly onto the model.Connection object. Returns static tags and dynamic tags.
EncodeConnectionDirect(network.ConnectionStats, *model.Connection) (uint64, map[string]struct{})
EncodeConnectionDirect(network.ConnectionStats, *model.Connection, *bytes.Buffer) (uint64, map[string]struct{})
}

// USMConnectionIndex provides a generic container for USM data pre-aggregated by connection
Expand Down
5 changes: 2 additions & 3 deletions pkg/network/encoding/marshal/usm_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func newHTTPEncoder(httpPayloads map[http.Key]*http.RequestStats) *httpEncoder {
}
}

func (e *httpEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection) (staticTags uint64, dynamicTags map[string]struct{}) {
var buf bytes.Buffer
staticTags, dynamicTags = e.encodeData(c, &buf)
func (e *httpEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection, buf *bytes.Buffer) (staticTags uint64, dynamicTags map[string]struct{}) {
staticTags, dynamicTags = e.encodeData(c, buf)
conn.HttpAggregations = buf.Bytes()
return
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/network/encoding/marshal/usm_http2.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func newHTTP2Encoder(http2Payloads map[http.Key]*http.RequestStats) *http2Encode
}
}

func (e *http2Encoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection) (staticTags uint64, dynamicTags map[string]struct{}) {
var buf bytes.Buffer
staticTags, dynamicTags = e.encodeData(c, &buf)
func (e *http2Encoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection, buf *bytes.Buffer) (staticTags uint64, dynamicTags map[string]struct{}) {
staticTags, dynamicTags = e.encodeData(c, buf)
conn.Http2Aggregations = buf.Bytes()
return
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/network/encoding/marshal/usm_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func newKafkaEncoder(kafkaPayloads map[kafka.Key]*kafka.RequestStats) *kafkaEnco
}
}

func (e *kafkaEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection) (staticTags uint64, dynamicTags map[string]struct{}) {
var buf bytes.Buffer
staticTags = e.encodeData(c, &buf)
func (e *kafkaEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection, buf *bytes.Buffer) (staticTags uint64, dynamicTags map[string]struct{}) {
staticTags = e.encodeData(c, buf)
conn.DataStreamsAggregations = buf.Bytes()
return
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/network/encoding/marshal/usm_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func newPostgresEncoder(postgresPayloads map[postgres.Key]*postgres.RequestStat)
}
}

func (e *postgresEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection) (staticTags uint64, dynamicTags map[string]struct{}) {
var buf bytes.Buffer
staticTags = e.encodeData(c, &buf)
func (e *postgresEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection, buf *bytes.Buffer) (staticTags uint64, dynamicTags map[string]struct{}) {
staticTags = e.encodeData(c, buf)
conn.DatabaseAggregations = buf.Bytes()
return
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/network/encoding/marshal/usm_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@ func newRedisEncoder(redisPayloads map[redis.Key]*redis.RequestStats) *redisEnco
}
}

func (e *redisEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection) (staticTags uint64, dynamicTags map[string]struct{}) {
var buf bytes.Buffer
staticTags = e.encodeData(c, &buf)
func (e *redisEncoder) EncodeConnectionDirect(c network.ConnectionStats, conn *model.Connection, buf *bytes.Buffer) (staticTags uint64, dynamicTags map[string]struct{}) {
staticTags = e.encodeData(c, buf)
conn.DatabaseAggregations = buf.Bytes()
return
}
Expand Down
Loading