@@ -357,27 +357,33 @@ impl DenoPeriodicReader {
357
357
. with_temporality ( PushMetricExporter :: temporality ( & exporter) )
358
358
. build ( ) ;
359
359
360
- let collect_and_export = || async {
361
- let mut resource_metrics =
362
- opentelemetry_sdk:: metrics:: data:: ResourceMetrics {
363
- resource : Default :: default ( ) ,
364
- scope_metrics : Default :: default ( ) ,
365
- } ;
366
- let callbacks = {
367
- let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS . lock ( ) . unwrap ( ) ;
368
- std:: mem:: take ( & mut * callbacks)
369
- } ;
370
- let mut futures = JoinSet :: new ( ) ;
371
- for callback in callbacks {
372
- let ( tx, rx) = oneshot:: channel ( ) ;
373
- if let Ok ( ( ) ) = callback. send ( tx) {
374
- futures. spawn ( rx) ;
360
+ let collect_and_export = |collect_observed : bool | {
361
+ let inner = & inner;
362
+ let exporter = & exporter;
363
+ async move {
364
+ let mut resource_metrics =
365
+ opentelemetry_sdk:: metrics:: data:: ResourceMetrics {
366
+ resource : Default :: default ( ) ,
367
+ scope_metrics : Default :: default ( ) ,
368
+ } ;
369
+ if collect_observed {
370
+ let callbacks = {
371
+ let mut callbacks = OTEL_PRE_COLLECT_CALLBACKS . lock ( ) . unwrap ( ) ;
372
+ std:: mem:: take ( & mut * callbacks)
373
+ } ;
374
+ let mut futures = JoinSet :: new ( ) ;
375
+ for callback in callbacks {
376
+ let ( tx, rx) = oneshot:: channel ( ) ;
377
+ if let Ok ( ( ) ) = callback. send ( tx) {
378
+ futures. spawn ( rx) ;
379
+ }
380
+ }
381
+ while futures. join_next ( ) . await . is_some ( ) { }
375
382
}
383
+ inner. collect ( & mut resource_metrics) ?;
384
+ exporter. export ( & mut resource_metrics) . await ?;
385
+ Ok ( ( ) )
376
386
}
377
- while let Some ( _) = futures. join_next ( ) . await { }
378
- inner. collect ( & mut resource_metrics) ?;
379
- exporter. export ( & mut resource_metrics) . await ?;
380
- Ok ( ( ) )
381
387
} ;
382
388
383
389
let mut ticker = tokio:: time:: interval ( interval) ;
@@ -403,7 +409,7 @@ impl DenoPeriodicReader {
403
409
name: "DenoPeriodicReader.ExportTriggered" ,
404
410
message = "Export message received." ,
405
411
) ;
406
- if let Err ( err) = collect_and_export ( ) . await {
412
+ if let Err ( err) = collect_and_export ( true ) . await {
407
413
otel_error ! (
408
414
name: "DenoPeriodicReader.ExportFailed" ,
409
415
message = "Failed to export metrics" ,
@@ -415,7 +421,7 @@ impl DenoPeriodicReader {
415
421
name: "DenoPeriodicReader.ForceFlushCalled" ,
416
422
message = "Flush message received." ,
417
423
) ;
418
- let res = collect_and_export ( ) . await ;
424
+ let res = collect_and_export ( false ) . await ;
419
425
if let Err ( send_error) = sender. send ( res) {
420
426
otel_debug ! (
421
427
name: "DenoPeriodicReader.Flush.SendResultError" ,
@@ -429,7 +435,7 @@ impl DenoPeriodicReader {
429
435
name: "DenoPeriodicReader.ShutdownCalled" ,
430
436
message = "Shutdown message received" ,
431
437
) ;
432
- let res = collect_and_export ( ) . await ;
438
+ let res = collect_and_export ( false ) . await ;
433
439
let _ = exporter. shutdown ( ) ;
434
440
if let Err ( send_error) = sender. send ( res) {
435
441
otel_debug ! (
@@ -1409,7 +1415,7 @@ fn op_otel_metric_record0(
1409
1415
Instrument :: UpDownCounter ( counter) => counter. add ( value, attributes) ,
1410
1416
Instrument :: Gauge ( gauge) => gauge. record ( value, attributes) ,
1411
1417
Instrument :: Histogram ( histogram) => histogram. record ( value, attributes) ,
1412
- _ => return ,
1418
+ _ => { }
1413
1419
}
1414
1420
}
1415
1421
@@ -1448,10 +1454,11 @@ fn op_otel_metric_record1(
1448
1454
Instrument :: UpDownCounter ( counter) => counter. add ( value, attributes) ,
1449
1455
Instrument :: Gauge ( gauge) => gauge. record ( value, attributes) ,
1450
1456
Instrument :: Histogram ( histogram) => histogram. record ( value, attributes) ,
1451
- _ => return ,
1457
+ _ => { }
1452
1458
}
1453
1459
}
1454
1460
1461
+ #[ allow( clippy:: too_many_arguments) ]
1455
1462
#[ op2( fast) ]
1456
1463
fn op_otel_metric_record2 (
1457
1464
state : & mut OpState ,
@@ -1495,10 +1502,11 @@ fn op_otel_metric_record2(
1495
1502
Instrument :: UpDownCounter ( counter) => counter. add ( value, attributes) ,
1496
1503
Instrument :: Gauge ( gauge) => gauge. record ( value, attributes) ,
1497
1504
Instrument :: Histogram ( histogram) => histogram. record ( value, attributes) ,
1498
- _ => return ,
1505
+ _ => { }
1499
1506
}
1500
1507
}
1501
1508
1509
+ #[ allow( clippy:: too_many_arguments) ]
1502
1510
#[ op2( fast) ]
1503
1511
fn op_otel_metric_record3 (
1504
1512
state : & mut OpState ,
@@ -1552,7 +1560,7 @@ fn op_otel_metric_record3(
1552
1560
Instrument :: UpDownCounter ( counter) => counter. add ( value, attributes) ,
1553
1561
Instrument :: Gauge ( gauge) => gauge. record ( value, attributes) ,
1554
1562
Instrument :: Histogram ( histogram) => histogram. record ( value, attributes) ,
1555
- _ => return ,
1563
+ _ => { }
1556
1564
}
1557
1565
}
1558
1566
@@ -1564,12 +1572,9 @@ fn op_otel_metric_observable_record0(
1564
1572
) {
1565
1573
let values = state. try_take :: < MetricAttributes > ( ) ;
1566
1574
let attributes = values. map ( |attr| attr. attributes ) . unwrap_or_default ( ) ;
1567
- match instrument {
1568
- Instrument :: Observable ( data_share) => {
1569
- let mut data = data_share. lock ( ) . unwrap ( ) ;
1570
- data. insert ( attributes, value) ;
1571
- }
1572
- _ => return ,
1575
+ if let Instrument :: Observable ( data_share) = instrument {
1576
+ let mut data = data_share. lock ( ) . unwrap ( ) ;
1577
+ data. insert ( attributes, value) ;
1573
1578
}
1574
1579
}
1575
1580
@@ -1599,15 +1604,13 @@ fn op_otel_metric_observable_record1(
1599
1604
if let Some ( kv1) = attr1 {
1600
1605
attributes. push ( kv1) ;
1601
1606
}
1602
- match & * instrument {
1603
- Instrument :: Observable ( data_share) => {
1604
- let mut data = data_share. lock ( ) . unwrap ( ) ;
1605
- data. insert ( attributes, value) ;
1606
- }
1607
- _ => return ,
1607
+ if let Instrument :: Observable ( data_share) = & * instrument {
1608
+ let mut data = data_share. lock ( ) . unwrap ( ) ;
1609
+ data. insert ( attributes, value) ;
1608
1610
}
1609
1611
}
1610
1612
1613
+ #[ allow( clippy:: too_many_arguments) ]
1611
1614
#[ op2( fast) ]
1612
1615
fn op_otel_metric_observable_record2 (
1613
1616
state : & mut OpState ,
@@ -1640,15 +1643,13 @@ fn op_otel_metric_observable_record2(
1640
1643
if let Some ( kv2) = attr2 {
1641
1644
attributes. push ( kv2) ;
1642
1645
}
1643
- match & * instrument {
1644
- Instrument :: Observable ( data_share) => {
1645
- let mut data = data_share. lock ( ) . unwrap ( ) ;
1646
- data. insert ( attributes, value) ;
1647
- }
1648
- _ => return ,
1646
+ if let Instrument :: Observable ( data_share) = & * instrument {
1647
+ let mut data = data_share. lock ( ) . unwrap ( ) ;
1648
+ data. insert ( attributes, value) ;
1649
1649
}
1650
1650
}
1651
1651
1652
+ #[ allow( clippy:: too_many_arguments) ]
1652
1653
#[ op2( fast) ]
1653
1654
fn op_otel_metric_observable_record3 (
1654
1655
state : & mut OpState ,
@@ -1687,15 +1688,13 @@ fn op_otel_metric_observable_record3(
1687
1688
if let Some ( kv3) = attr3 {
1688
1689
attributes. push ( kv3) ;
1689
1690
}
1690
- match & * instrument {
1691
- Instrument :: Observable ( data_share) => {
1692
- let mut data = data_share. lock ( ) . unwrap ( ) ;
1693
- data. insert ( attributes, value) ;
1694
- }
1695
- _ => return ,
1691
+ if let Instrument :: Observable ( data_share) = & * instrument {
1692
+ let mut data = data_share. lock ( ) . unwrap ( ) ;
1693
+ data. insert ( attributes, value) ;
1696
1694
}
1697
1695
}
1698
1696
1697
+ #[ allow( clippy:: too_many_arguments) ]
1699
1698
#[ op2( fast) ]
1700
1699
fn op_otel_metric_attribute3 < ' s > (
1701
1700
scope : & mut v8:: HandleScope < ' s > ,
0 commit comments