Skip to content

Commit

Permalink
Added additional metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
HolyPrapor committed Feb 2, 2022
1 parent a423dde commit b2b6704
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
10 changes: 5 additions & 5 deletions Vostok.Hercules.Consumers/BatchesStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ public BatchesStreamConsumer([NotNull] BatchesStreamConsumerSettings<T> settings
var bufferPool = new BufferPool(settings.MaxPooledBufferSize, settings.MaxPooledBuffersPerBucket);
client = new StreamApiRequestSender(settings.StreamApiCluster, log /*.WithErrorsTransformedToWarns()*/, bufferPool, settings.StreamApiClientAdditionalSetup);

var metricContext = settings.MetricContext ?? new DevNullMetricContext();
eventsMetric = metricContext.CreateIntegerGauge("events", "type", new IntegerGaugeConfig {ResetOnScrape = true});
iterationMetric = metricContext.CreateSummary("iteration", "type", new SummaryConfig {Quantiles = new[] {0.5, 0.75, 1}});
metricContext.CreateFuncGauge("events", "type").For("remaining").SetValueProvider(() => CountStreamRemainingEvents());
metricContext.CreateFuncGauge("buffer", "type").For("rented_reader").SetValueProvider(() => BufferPool.Rented);
var instanceMetricContext = settings.InstanceMetricContext ?? new DevNullMetricContext();
eventsMetric = instanceMetricContext.CreateIntegerGauge("events", "type", new IntegerGaugeConfig {ResetOnScrape = true});
iterationMetric = instanceMetricContext.CreateSummary("iteration", "type", new SummaryConfig {Quantiles = new[] {0.5, 0.75, 1}});
instanceMetricContext.CreateFuncGauge("events", "type").For("remaining").SetValueProvider(() => CountStreamRemainingEvents());
instanceMetricContext.CreateFuncGauge("buffer", "type").For("rented_reader").SetValueProvider(() => BufferPool.Rented);
}

public async Task RunAsync(CancellationToken cancellationToken)
Expand Down
5 changes: 4 additions & 1 deletion Vostok.Hercules.Consumers/BatchesStreamConsumerSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,10 @@ public BatchesStreamConsumerSettings(
public ClusterClientSetup StreamApiClientAdditionalSetup { get; set; }

[CanBeNull]
public IMetricContext MetricContext { get; set; }
public IMetricContext ApplicationMetricContext { get; set; }

[CanBeNull]
public IMetricContext InstanceMetricContext { get; set; }

[CanBeNull]
public ITracer Tracer { get; set; }
Expand Down
19 changes: 19 additions & 0 deletions Vostok.Hercules.Consumers/Helpers/HistogramHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
using Vostok.Metrics.Primitives.Timer;

namespace Vostok.Hercules.Consumers.Helpers
{
internal static class HistogramHelper
{
public static HistogramBuckets CreateDefaultBuckets()
{
var upperBounds = new double[30];

upperBounds[0] = 0;
upperBounds[1] = 1;
for (var i = 2; i < upperBounds.Length; i++)
upperBounds[i] = upperBounds[i - 1] * 2;

return new HistogramBuckets(upperBounds);
}
}
}
15 changes: 11 additions & 4 deletions Vostok.Hercules.Consumers/WindowedStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ public class WindowedStreamConsumer<T, TKey> : BatchesStreamConsumer<T>
private readonly ILog log;
private readonly ITracer tracer;
private readonly IMetricGroup1<IIntegerGauge> stateMetric;
private readonly ITimer eventLagMetric;
private readonly Dictionary<TKey, Windows<T, TKey>> windows;

private volatile StreamCoordinates leftCoordinates;

public WindowedStreamConsumer([NotNull] WindowedStreamConsumerSettings<T, TKey> settings, [CanBeNull] ILog log)
Expand All @@ -39,6 +39,12 @@ public WindowedStreamConsumer([NotNull] WindowedStreamConsumerSettings<T, TKey>

windows = new Dictionary<TKey, Windows<T, TKey>>();

var applicationMetricContext = settings.ApplicationMetricContext ?? new DevNullMetricContext();
eventLagMetric = applicationMetricContext.CreateHistogram("eventLag", new HistogramConfig {Buckets = HistogramHelper.CreateDefaultBuckets(), Unit = WellKnownUnits.Milliseconds});

var instanceMetricContext = settings.InstanceMetricContext ?? new DevNullMetricContext();
stateMetric = instanceMetricContext.CreateIntegerGauge("state", "type");

var settingsOnBatchEnd = settings.OnBatchEnd;
settings.OnBatchEnd = c =>
{
Expand All @@ -60,9 +66,6 @@ public WindowedStreamConsumer([NotNull] WindowedStreamConsumerSettings<T, TKey>
Restart(c).GetAwaiter().GetResult();
settingsOnRestart?.Invoke(c);
};

var metricContext = settings.MetricContext ?? new DevNullMetricContext();
stateMetric = metricContext.CreateIntegerGauge("state", "type");
}

private async Task Restart(StreamCoordinates rightCoordinates)
Expand Down Expand Up @@ -147,6 +150,10 @@ private async Task<long> RestartPartition(int partition, int partitionsCount, lo
private void AddEvent(T @event, StreamCoordinates queryCoordinates)
{
var key = settings.KeyProvider(@event);

var lag = DateTime.Now - settings.TimestampProvider(@event);
eventLagMetric.Report(lag);

if (!windows.ContainsKey(key))
windows[key] = new Windows<T, TKey>(key, settings);
if (!windows[key].AddEvent(@event, queryCoordinates) && !restart)
Expand Down

0 comments on commit b2b6704

Please sign in to comment.