Skip to content

Commit

Permalink
log windows
Browse files Browse the repository at this point in the history
  • Loading branch information
kunga committed Aug 10, 2022
1 parent 3554a72 commit 5b2e286
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Vostok.Hercules.Consumers/Helpers/Window.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,8 @@ public bool ExistsForTooLong(bool restartPhase = false)

public void Flush() =>
implementation.Flush(End);

public override string ToString() =>
$"{nameof(FirstEventCoordinates)}: {FirstEventCoordinates}, {nameof(Start)}: {Start}, {nameof(End)}: {End}, {nameof(period)}: {period}, {nameof(lag)}: {lag}, {nameof(lastEventAdded)}: {lastEventAdded}, {nameof(EventsCount)}: {EventsCount}";
}
}
4 changes: 4 additions & 0 deletions Vostok.Hercules.Consumers/Helpers/Windows.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Vostok.Commons.Time;
using Vostok.Hercules.Client.Abstractions.Models;

Expand Down Expand Up @@ -84,5 +85,8 @@ private Window<T, TKey> CreateWindow(T @event, DateTimeOffset timestamp, StreamC
var result = new Window<T, TKey>(settings.CreateWindow(key), coordinates, start, start + period, period, lag);
return result;
}

public override string ToString() =>
$"{nameof(LastEventAdded)}: {LastEventAdded}, {nameof(key)}: {key}, {nameof(minimumAllowedTimestamp)}: {minimumAllowedTimestamp}, {nameof(maximumObservedTimestamp)}: {maximumObservedTimestamp} {nameof(windows)}:\n{string.Join("\n", windows.Select(w => w.ToString()))}";
}
}
5 changes: 5 additions & 0 deletions Vostok.Hercules.Consumers/WindowedStreamConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private async Task Stop(StreamCoordinates rightCoordinates)
{
await settings.LeftCoordinatesStorage.AdvanceAsync(leftCoordinates).ConfigureAwait(false);
LogCoordinates("Stop", leftCoordinates, rightCoordinates);
LogWindows();
}

private async Task Restart(StreamCoordinates rightCoordinates)
Expand Down Expand Up @@ -135,6 +136,7 @@ private async Task RestartWindows(StreamCoordinates rightCoordinates)
FlushWindows();

LogCoordinates("Restarted", leftCoordinates, rightCoordinates);
LogWindows();
}

private async Task<long> RestartPartition(int partition, int partitionsCount, long start, long end)
Expand Down Expand Up @@ -228,5 +230,8 @@ private void LogCoordinates(string message, StreamCoordinates left, StreamCoordi
LogCoordinates(message + " left", left);
LogCoordinates(message + " right", right);
}

private void LogWindows() =>
log.Info("Windows: {{Windows}}.", windows.ToString());
}
}

0 comments on commit 5b2e286

Please sign in to comment.