@@ -1212,3 +1212,245 @@ func TestFlowAggregator_getSequenceDelta(t *testing.T) {
12121212 })
12131213 }
12141214}
1215+
1216+ func TestAggregatorFlushing (t * testing.T ) {
1217+ t .Run ("it respects FlowCollectionDuration when rescheduling flows" , func (t * testing.T ) {
1218+ // This test verifies that the aggregator correctly passes the flush config to the flow scheduler
1219+ // by checking that flows are rescheduled with the correct interval after being flushed.
1220+ //
1221+ // Context: The bug that prompted this test was in aggregator.go:98-100 where the ImmediateFlowScheduler
1222+ // was created without passing the flushConfig. This caused RefreshFlushTime() to use a zero-valued
1223+ // FlowCollectionDuration, breaking the flow scheduling logic.
1224+ //
1225+ // This is a behavior-driven test that verifies:
1226+ // 1. A flow is flushed immediately on first occurrence
1227+ // 2. When the same flow arrives again, it's not flushed until FlowCollectionDuration has elapsed
1228+ // 3. After FlowCollectionDuration has elapsed, the flow is flushed correctly
1229+
1230+ flushTime , _ := time .Parse (time .RFC3339 , "2019-02-18T16:00:00Z" )
1231+ sender := mocksender .NewMockSender ("" )
1232+ sender .On ("Gauge" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1233+ sender .On ("Count" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1234+ sender .On ("MonotonicCount" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1235+ sender .On ("Commit" ).Return ()
1236+
1237+ conf := config.NetflowConfig {
1238+ StopTimeout : 10 ,
1239+ AggregatorBufferSize : 20 ,
1240+ AggregatorFlushInterval : 2 , // 2 seconds FlowCollectionDuration
1241+ AggregatorPortRollupThreshold : 10 ,
1242+ AggregatorRollupTrackerRefreshInterval : 3600 ,
1243+ AggregatorMaxFlowsPerPeriod : 0 , // Use ImmediateFlowScheduler
1244+ }
1245+
1246+ ctrl := gomock .NewController (t )
1247+ epForwarder := eventplatformimpl .NewMockEventPlatformForwarder (ctrl )
1248+ epForwarder .EXPECT ().SendEventPlatformEventBlocking (gomock .Any (), gomock .Any ()).Return (nil ).AnyTimes ()
1249+
1250+ logger := logmock .New (t )
1251+ rdnsQuerier := fxutil .Test [rdnsquerier.Component ](t , rdnsquerierfxmock .MockModule ())
1252+
1253+ aggregator := NewFlowAggregator (sender , epForwarder , & conf , "test-hostname" , logger , rdnsQuerier )
1254+ aggregator .TimeNowFunction = func () time.Time {
1255+ return flushTime
1256+ }
1257+
1258+ // Create a flow that will be sent multiple times
1259+ flow := & common.Flow {
1260+ Namespace : "test-ns" ,
1261+ FlowType : common .TypeNetFlow9 ,
1262+ ExporterAddr : []byte {127 , 0 , 0 , 1 },
1263+ StartTimestamp : 1234568 ,
1264+ EndTimestamp : 1234569 ,
1265+ Bytes : 100 ,
1266+ Packets : 10 ,
1267+ SrcAddr : []byte {10 , 10 , 10 , 10 },
1268+ DstAddr : []byte {10 , 10 , 10 , 20 },
1269+ IPProtocol : uint32 (6 ),
1270+ SrcPort : 2000 ,
1271+ DstPort : 80 ,
1272+ EtherType : uint32 (0x0800 ),
1273+ }
1274+
1275+ // First flush: Add flow and flush immediately
1276+ setMockTimeNow (flushTime )
1277+ aggregator .flowAcc .add (flow )
1278+
1279+ flushCtx1 := common.FlushContext {
1280+ FlushTime : flushTime ,
1281+ LastFlushedAt : time.Time {},
1282+ NumFlushes : 1 ,
1283+ }
1284+ flushedCount := aggregator .flush (flushCtx1 )
1285+ assert .Equal (t , 1 , flushedCount , "First flush should return 1 flow" )
1286+
1287+ // Second flush: Add the same flow again and attempt to flush before FlowCollectionDuration
1288+ // The flow should NOT be flushed yet because it's scheduled for later
1289+ earlyFlushTime := flushTime .Add (1 * time .Second ) // Only 1 second passed, but FlowCollectionDuration is 2 seconds
1290+ setMockTimeNow (earlyFlushTime )
1291+
1292+ flow2 := * flow // Copy the flow
1293+ flow2 .Bytes = 200
1294+ flow2 .Packets = 20
1295+ aggregator .flowAcc .add (& flow2 )
1296+
1297+ flushCtx2 := common.FlushContext {
1298+ FlushTime : earlyFlushTime ,
1299+ LastFlushedAt : flushTime ,
1300+ NumFlushes : 1 ,
1301+ }
1302+ flushedCount = aggregator .flush (flushCtx2 )
1303+ assert .Equal (t , 0 , flushedCount , "Second flush should return 0 flows because FlowCollectionDuration hasn't elapsed yet" )
1304+
1305+ // Third flush: Flush after FlowCollectionDuration has passed
1306+ // Now the flow should be flushed
1307+ correctFlushTime := flushTime .Add (2 * time .Second ) // FlowCollectionDuration = 2 seconds
1308+ setMockTimeNow (correctFlushTime )
1309+
1310+ flushCtx3 := common.FlushContext {
1311+ FlushTime : correctFlushTime ,
1312+ LastFlushedAt : earlyFlushTime ,
1313+ NumFlushes : 1 ,
1314+ }
1315+ flushedCount = aggregator .flush (flushCtx3 )
1316+ assert .Equal (t , 1 , flushedCount , "Third flush should return 1 flow after FlowCollectionDuration has elapsed" )
1317+ })
1318+
1319+ t .Run ("it respects FlowCollectionDuration when using TopN/JitterFlowScheduler" , func (t * testing.T ) {
1320+ // This test verifies that when Top-N is enabled, flushConfig is properly passed to JitterFlowScheduler.
1321+ //
1322+ // Test approach:
1323+ // 1. Add a flow and tick through flushes until it gets flushed
1324+ // 2. Record the flush time (t_flush)
1325+ // 3. Add another flow with the same key
1326+ // 4. Verify it's NOT ready before t_flush + FlowCollectionDuration
1327+ // 5. Verify it IS ready at t_flush + FlowCollectionDuration
1328+ //
1329+ // This directly tests RefreshFlushTime behavior: after flushing, flows should be
1330+ // rescheduled for nextFlush + FlowCollectionDuration (with NO jitter).
1331+
1332+ startTime , _ := time .Parse (time .RFC3339 , "2019-02-18T16:00:00Z" )
1333+ sender := mocksender .NewMockSender ("" )
1334+ sender .On ("Gauge" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1335+ sender .On ("Count" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1336+ sender .On ("MonotonicCount" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1337+ sender .On ("Histogram" , mock .Anything , mock .Anything , mock .Anything , mock .Anything ).Return ()
1338+ sender .On ("Commit" ).Return ()
1339+
1340+ conf := config.NetflowConfig {
1341+ StopTimeout : 10 ,
1342+ AggregatorBufferSize : 20 ,
1343+ AggregatorFlushInterval : 30 , // 30 seconds FlowCollectionDuration
1344+ AggregatorPortRollupThreshold : 10 ,
1345+ AggregatorRollupTrackerRefreshInterval : 3600 ,
1346+ AggregatorMaxFlowsPerPeriod : 100 , // High limit so TopN doesn't interfere
1347+ }
1348+
1349+ ctrl := gomock .NewController (t )
1350+ epForwarder := eventplatformimpl .NewMockEventPlatformForwarder (ctrl )
1351+ epForwarder .EXPECT ().SendEventPlatformEventBlocking (gomock .Any (), gomock .Any ()).Return (nil ).AnyTimes ()
1352+
1353+ logger := logmock .New (t )
1354+ rdnsQuerier := fxutil .Test [rdnsquerier.Component ](t , rdnsquerierfxmock .MockModule ())
1355+
1356+ aggregator := NewFlowAggregator (sender , epForwarder , & conf , "test-hostname" , logger , rdnsQuerier )
1357+ aggregator .TimeNowFunction = func () time.Time {
1358+ return startTime
1359+ }
1360+
1361+ // Step 1: Create a flow that will be aggregated
1362+ flow := & common.Flow {
1363+ Namespace : "test-ns" ,
1364+ FlowType : common .TypeNetFlow9 ,
1365+ ExporterAddr : []byte {127 , 0 , 0 , 1 },
1366+ StartTimestamp : 1234568 ,
1367+ EndTimestamp : 1234569 ,
1368+ Bytes : 100 ,
1369+ Packets : 10 ,
1370+ SrcAddr : []byte {10 , 10 , 10 , 10 },
1371+ DstAddr : []byte {10 , 10 , 10 , 20 },
1372+ IPProtocol : uint32 (6 ),
1373+ SrcPort : 2000 ,
1374+ DstPort : 80 ,
1375+ EtherType : uint32 (0x0800 ),
1376+ }
1377+
1378+ setMockTimeNow (startTime )
1379+ aggregator .flowAcc .add (flow )
1380+
1381+ // Step 2: Tick through flushes until the flow is flushed
1382+ // JitterFlowScheduler schedules with random jitter [0, FlowCollectionDuration)
1383+ // So we need to tick up to the full FlowCollectionDuration to guarantee it's flushed
1384+ var actualFlushTime time.Time
1385+ flushInterval := 10 * time .Second // FlushTickFrequency
1386+
1387+ for i := 0 ; i < 4 ; i ++ { // Tick 4 times (0s, 10s, 20s, 30s)
1388+ currentTime := startTime .Add (time .Duration (i ) * flushInterval )
1389+ setMockTimeNow (currentTime )
1390+
1391+ flushCtx := common.FlushContext {
1392+ FlushTime : currentTime ,
1393+ LastFlushedAt : startTime .Add (time .Duration (i - 1 ) * flushInterval ),
1394+ NumFlushes : 1 ,
1395+ }
1396+
1397+ if i == 0 {
1398+ flushCtx .LastFlushedAt = time.Time {}
1399+ }
1400+
1401+ flushedCount := aggregator .flush (flushCtx )
1402+ if flushedCount > 0 {
1403+ actualFlushTime = currentTime
1404+ assert .Equal (t , 1 , flushedCount , "Should flush exactly 1 flow" )
1405+ break
1406+ }
1407+ }
1408+
1409+ assert .False (t , actualFlushTime .IsZero (), "Flow should have been flushed within FlowCollectionDuration" )
1410+
1411+ // Step 3: Add another flow with the same key (will be aggregated with the first)
1412+ flow2 := * flow
1413+ flow2 .Bytes = 200
1414+ flow2 .Packets = 20
1415+
1416+ aggregator .flowAcc .add (& flow2 )
1417+
1418+ // Step 4: Verify flow is NOT ready before actualFlushTime + FlowCollectionDuration
1419+ // This is the critical test: RefreshFlushTime should add FlowCollectionDuration.
1420+ // It should not flush at t + 10s nor t + 20s
1421+ tick1 := actualFlushTime .Add (10 * time .Second )
1422+ setMockTimeNow (tick1 )
1423+ flushCtx := common.FlushContext {
1424+ FlushTime : tick1 ,
1425+ LastFlushedAt : actualFlushTime ,
1426+ NumFlushes : 1 ,
1427+ }
1428+ flushedCount := aggregator .flush (flushCtx )
1429+ assert .Equal (t , 0 , flushedCount , "Flow should NOT be ready before actualFlushTime + FlowCollectionDuration" )
1430+
1431+ tick2 := actualFlushTime .Add (20 * time .Second )
1432+ setMockTimeNow (tick2 )
1433+ flushCtx = common.FlushContext {
1434+ FlushTime : tick2 ,
1435+ LastFlushedAt : actualFlushTime .Add (10 * time .Second ),
1436+ NumFlushes : 1 ,
1437+ }
1438+ flushedCount = aggregator .flush (flushCtx )
1439+ assert .Equal (t , 0 , flushedCount , "Flow should NOT be ready before actualFlushTime + FlowCollectionDuration" )
1440+
1441+ // Step 5: Verify flow IS ready at actualFlushTime + FlowCollectionDuration
1442+ tick3 := actualFlushTime .Add (30 * time .Second ) // Full FlowCollectionDuration
1443+ setMockTimeNow (tick3 )
1444+ flushCtx = common.FlushContext {
1445+ FlushTime : tick3 ,
1446+ LastFlushedAt : actualFlushTime .Add (20 * time .Second ),
1447+ NumFlushes : 1 ,
1448+ }
1449+ flushedCount = aggregator .flush (flushCtx )
1450+ assert .Equal (t , 1 , flushedCount , "Flow should be ready at actualFlushTime + FlowCollectionDuration" )
1451+
1452+ // Verify TopN metrics were submitted
1453+ sender .AssertCalled (t , "Histogram" , "datadog.netflow.flow_truncation.runtime_ms" , mock .Anything , mock .Anything , mock .Anything )
1454+ sender .AssertCalled (t , "Gauge" , "datadog.netflow.flow_truncation.threshold_value" , float64 (100 ), mock .Anything , mock .Anything )
1455+ })
1456+ }
0 commit comments