Skip to content

Commit 84717f2

Browse files
committed
Detect out of order messages
* compare the message timestamp with the previous message's timestamp * log if older * new metric for out of order messages * print priority when a message received
1 parent c0451e2 commit 84717f2

File tree

6 files changed

+78
-19
lines changed

6 files changed

+78
-19
lines changed

cmd/cmd_test.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func TestLatencyCalculationNano(t *testing.T) {
124124
testMsg := utils.MessageBody(100)
125125
utils.UpdatePayload(false, &testMsg)
126126
time.Sleep(1 * time.Microsecond)
127-
latency := utils.CalculateEndToEndLatency(false, &testMsg)
127+
_, latency := utils.CalculateEndToEndLatency(false, &testMsg)
128128
// not very precise but we just care about the order of magnitude
129129
assert.Greater(t, latency, 0.000001)
130130
assert.Less(t, latency, 0.001)
@@ -134,7 +134,7 @@ func TestLatencyCalculationMillis(t *testing.T) {
134134
testMsg := utils.MessageBody(100)
135135
utils.UpdatePayload(true, &testMsg)
136136
time.Sleep(2 * time.Millisecond)
137-
latency := utils.CalculateEndToEndLatency(true, &testMsg)
137+
_, latency := utils.CalculateEndToEndLatency(true, &testMsg)
138138
// not very precise but we just care about the order of magnitude
139139
assert.Greater(t, latency, 0.001)
140140
assert.Less(t, latency, 0.010)
@@ -170,7 +170,7 @@ func BenchmarkLatencyCalculation(b *testing.B) {
170170
utils.UpdatePayload(false, &testMsg)
171171

172172
for i := 0; i < b.N; i++ {
173-
_ = utils.CalculateEndToEndLatency(false, &testMsg)
173+
_, _ = utils.CalculateEndToEndLatency(false, &testMsg)
174174
}
175175
}
176176

@@ -192,7 +192,8 @@ func BenchmarkObservingLatency(b *testing.B) {
192192

193193
for i := 0; i < b.N; i++ {
194194
utils.UpdatePayload(false, &testMsg)
195-
metric.With(prometheus.Labels{"protocol": "foo"}).Observe(utils.CalculateEndToEndLatency(false, &testMsg))
195+
_, latency := utils.CalculateEndToEndLatency(false, &testMsg)
196+
metric.With(prometheus.Labels{"protocol": "foo"}).Observe(latency)
196197
}
197198
}
198199

@@ -211,6 +212,7 @@ func BenchmarkObservingLatencyMillis(b *testing.B) {
211212

212213
for i := 0; i < b.N; i++ {
213214
utils.UpdatePayload(true, &testMsg)
214-
metric.With(prometheus.Labels{"protocol": "foo"}).Observe(utils.CalculateEndToEndLatency(false, &testMsg))
215+
_, latency := utils.CalculateEndToEndLatency(false, &testMsg)
216+
metric.With(prometheus.Labels{"protocol": "foo"}).Observe(latency)
215217
}
216218
}

pkg/amqp10_client/consumer.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
8080
m := metrics.EndToEndLatency
8181

8282
log.Info("consumer started", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)
83+
previousMessageTimeSent := time.Unix(0, 0)
8384

8485
for i := 1; i <= c.Config.ConsumeCount; i++ {
8586
select {
@@ -95,12 +96,22 @@ func (c Amqp10Consumer) Start(ctx context.Context, subscribed chan bool) {
9596

9697
payload := msg.GetData()
9798
priority := strconv.Itoa(int(msg.Header.Priority))
98-
m.With(prometheus.Labels{"protocol": "amqp-1.0"}).Observe(utils.CalculateEndToEndLatency(c.Config.UseMillis, &payload))
99+
timeSent, latency := utils.CalculateEndToEndLatency(c.Config.UseMillis, &payload)
100+
m.With(prometheus.Labels{"protocol": "amqp-1.0"}).Observe(latency)
99101

100-
log.Debug("message received", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic, "size", len(payload))
102+
if timeSent.Before(previousMessageTimeSent) {
103+
metrics.MessagesConsumedOutOfOrder.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
104+
log.Info("Out of order message received. This message was sent before the previous message", "this messsage",timeSent, "previous message", previousMessageTimeSent)
105+
}
106+
previousMessageTimeSent = timeSent
107+
108+
log.Debug("message received", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic, "size", len(payload), "priority", priority, "sent", timeSent)
109+
110+
if c.Config.ConsumerLatency > 0 {
111+
log.Debug("consumer latency", "protocol", "amqp-1.0", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
112+
time.Sleep(c.Config.ConsumerLatency)
113+
}
101114

102-
log.Debug("consumer latency", "protocol", "amqp-1.0", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
103-
time.Sleep(c.Config.ConsumerLatency)
104115
err = receiver.AcceptMessage(ctx, msg)
105116
if err != nil {
106117
log.Error("message NOT accepted", "protocol", "amqp-1.0", "consumerId", c.Id, "terminus", c.Topic)

pkg/metrics/metrics.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ var metricsServer *MetricsServer
3030
var (
3131
MessagesPublished *prometheus.CounterVec
3232
MessagesConsumed *prometheus.CounterVec
33+
MessagesConsumedOutOfOrder *prometheus.CounterVec
3334
PublishingLatency *prometheus.SummaryVec
3435
EndToEndLatency *prometheus.HistogramVec
3536
)
@@ -49,6 +50,13 @@ func RegisterMetrics(globalLabels prometheus.Labels) {
4950
ConstLabels: globalLabels,
5051
}, []string{"protocol", "priority"})
5152
}
53+
if MessagesConsumedOutOfOrder == nil {
54+
MessagesConsumedOutOfOrder = promauto.NewCounterVec(prometheus.CounterOpts{
55+
Name: "omq_messages_consumed_out_of_order",
56+
Help: "The number of messages consumed out of order",
57+
ConstLabels: globalLabels,
58+
}, []string{"protocol", "priority"})
59+
}
5260
if PublishingLatency == nil {
5361
PublishingLatency = promauto.NewSummaryVec(prometheus.SummaryOpts{
5462
Name: "omq_publishing_latency_seconds",

pkg/mqtt_client/consumer.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,20 @@ func (c MqttConsumer) Start(ctx context.Context, subscribed chan bool) {
5959

6060
msgsReceived := 0
6161

62+
previousMessageTimeSent := time.Unix(0, 0)
63+
6264
handler := func(client mqtt.Client, msg mqtt.Message) {
6365
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "mqtt", "priority": ""}).Inc()
6466
payload := msg.Payload()
65-
m.Observe(utils.CalculateEndToEndLatency(c.Config.UseMillis, &payload))
67+
timeSent, latency := utils.CalculateEndToEndLatency(c.Config.UseMillis, &payload)
68+
m.Observe(latency)
69+
70+
if timeSent.Before(previousMessageTimeSent) {
71+
metrics.MessagesConsumedOutOfOrder.With(prometheus.Labels{"protocol": "mqtt"}).Inc()
72+
log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
73+
}
74+
previousMessageTimeSent = timeSent
75+
6676
msgsReceived++
6777
log.Debug("message received", "protocol", "mqtt", "consumerId", c.Id, "topic", c.Topic, "size", len(payload))
6878
}

pkg/stomp_client/consumer.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,42 @@ func (c StompConsumer) Start(ctx context.Context, subscribed chan bool) {
6262
close(subscribed)
6363

6464
m := metrics.EndToEndLatency.With(prometheus.Labels{"protocol": "stomp"})
65+
6566
log.Info("consumer started", "protocol", "STOMP", "consumerId", c.Id, "destination", c.Topic)
67+
previousMessageTimeSent := time.Unix(0, 0)
68+
6669
for i := 1; i <= c.Config.ConsumeCount; i++ {
6770
select {
6871
case msg := <-sub.C:
6972
if msg.Err != nil {
7073
log.Error("failed to receive a message", "protocol", "STOMP", "consumerId", c.Id, "c.Topic", c.Topic, "error", msg.Err)
7174
return
7275
}
73-
m.Observe(utils.CalculateEndToEndLatency(c.Config.UseMillis, &msg.Body))
74-
log.Debug("message received", "protocol", "stomp", "consumerId", c.Id, "destination", c.Topic, "size", len(msg.Body), "ack required", msg.ShouldAck())
7576

76-
log.Debug("consumer latency", "protocol", "stomp", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
77-
time.Sleep(c.Config.ConsumerLatency)
77+
timeSent, latency := utils.CalculateEndToEndLatency(c.Config.UseMillis, &msg.Body)
78+
m.Observe(latency)
79+
80+
priority := msg.Header.Get("priority")
81+
82+
if timeSent.Before(previousMessageTimeSent) {
83+
metrics.MessagesConsumedOutOfOrder.With(prometheus.Labels{"protocol": "amqp-1.0", "priority": priority}).Inc()
84+
log.Info("Out of order message received. This message was sent before the previous message", "this messsage", timeSent, "previous message", previousMessageTimeSent)
85+
}
86+
previousMessageTimeSent = timeSent
87+
88+
log.Debug("message received", "protocol", "stomp", "consumerId", c.Id, "destination", c.Topic, "size", len(msg.Body), "ack required", msg.ShouldAck(), "priority", priority, "sent", timeSent)
89+
90+
if c.Config.ConsumerLatency > 0 {
91+
log.Debug("consumer latency", "protocol", "stomp", "consumerId", c.Id, "latency", c.Config.ConsumerLatency)
92+
time.Sleep(c.Config.ConsumerLatency)
93+
}
94+
7895
err = c.Connection.Ack(msg)
7996
if err != nil {
8097
log.Error("message NOT acknowledged", "protocol", "stomp", "consumerId", c.Id, "destination", c.Topic)
8198

8299
}
83-
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "stomp", "priority": msg.Header.Get("priority")}).Inc()
100+
metrics.MessagesConsumed.With(prometheus.Labels{"protocol": "stomp", "priority": priority}).Inc()
84101
case <-ctx.Done():
85102
c.Stop("time limit reached")
86103
return

pkg/utils/utils.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,33 @@ func UpdatePayload(useMillis bool, payload *[]byte) *[]byte {
2020
return payload
2121
}
2222

23-
func CalculateEndToEndLatency(useMillis bool, payload *[]byte) float64 {
23+
func CalculateEndToEndLatency(useMillis bool, payload *[]byte) (time.Time, float64) {
2424
if len(*payload) < 12 {
2525
// message sent without latency tracking
26-
return 0
26+
return time.Unix(0, 0), 0
2727
}
2828
timeSent := binary.BigEndian.Uint64((*payload)[4:])
2929

3030
if useMillis {
3131
// less precise but necessary when a different process publishes and consumes
3232
now := uint64(time.Now().UnixMilli())
3333
latency := now - timeSent
34-
return (float64(latency) / 1000)
34+
return FormatTimestamp(timeSent), (float64(latency) / 1000)
3535
} else {
3636
// nanoseconds - more precise when the same process publishes and consumes
3737
now := uint64(time.Now().UnixNano())
3838
latency := now - timeSent
39-
return (float64(latency) / 1000000000)
39+
return FormatTimestamp(timeSent), (float64(latency) / 1000000000)
4040
}
4141
}
42+
43+
func FormatTimestamp(timestamp uint64) time.Time {
44+
var t time.Time
45+
// should be updated before the year 2100 ;)
46+
if timestamp < 4102441200000 {
47+
t = time.UnixMilli(int64(timestamp))
48+
} else {
49+
t = time.Unix(0, int64(timestamp))
50+
}
51+
return t
52+
}

0 commit comments

Comments
 (0)