Skip to content

Commit

Permalink
lwcevents: cleanup groups with no data (#1669)
Browse files Browse the repository at this point in the history
Update group by data converter to remove simple aggregate
converters when flushing if they have no data. Otherwise
the set of groups can grow unbounded.
  • Loading branch information
brharrington authored Jun 19, 2024
1 parent 0576122 commit d013537
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ private[events] trait DatapointConverter {

/** Flush the data for a given timestamp. */
def flush(timestamp: Long): Unit

/** Returns true if the converter has no recent data. */
def hasNoData: Boolean
}

private[events] object DatapointConverter {
Expand Down Expand Up @@ -115,6 +118,22 @@ private[events] object DatapointConverter {
}
}

private[events] def addNaN(value: AtomicDouble, amount: Double): Unit = {
if (amount.isNaN)
return

var set = false
while (!set) {
val v = value.get()
if (v.isNaN) {
set = value.compareAndSet(v, amount)
} else {
value.addAndGet(amount)
set = true
}
}
}

case class Params(
id: String,
tags: Map[String, String],
Expand All @@ -127,15 +146,15 @@ private[events] object DatapointConverter {
/** Compute sum for a counter as a rate per second. */
case class Sum(params: Params) extends DatapointConverter {

private val buffer = new StepDouble(0.0, params.clock, params.step)
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
update(params.valueMapper(event))
}

override def update(value: Double): Unit = {
if (value.isFinite && value >= 0.0) {
buffer.getCurrent.addAndGet(value)
addNaN(buffer.getCurrent, value)
}
}

Expand All @@ -147,19 +166,24 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute count of contributing events. */
case class Count(params: Params) extends DatapointConverter {

private val buffer = new StepDouble(0.0, params.clock, params.step)
private val buffer = new StepDouble(Double.NaN, params.clock, params.step)

override def update(event: LwcEvent): Unit = {
buffer.getCurrent.addAndGet(1.0)
update(1.0)
}

override def update(value: Double): Unit = {
buffer.getCurrent.addAndGet(1.0)
addNaN(buffer.getCurrent, 1.0)
}

override def flush(timestamp: Long): Unit = {
Expand All @@ -170,6 +194,11 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute max value from contributing events. */
Expand All @@ -195,6 +224,11 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute min value from contributing events. */
Expand Down Expand Up @@ -233,6 +267,11 @@ private[events] object DatapointConverter {
params.consumer(params.id, event)
}
}

override def hasNoData: Boolean = {
val now = params.clock.wallTime()
buffer.getCurrent(now).get().isNaN && buffer.poll(now).isNaN
}
}

/** Compute set of data points, one for each distinct group. */
Expand Down Expand Up @@ -312,7 +351,15 @@ private[events] object DatapointConverter {
}

override def flush(timestamp: Long): Unit = {
groups.values().forEach(_.flush(timestamp))
val it = groups.values().iterator()
while (it.hasNext) {
val converter = it.next()
converter.flush(timestamp)
if (converter.hasNoData)
it.remove()
}
}

override def hasNoData: Boolean = groups.isEmpty
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class LwcEventClientSuite extends FunSuite {
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
val vs = output.result()
assertEquals(vs.size, 2)
assertEquals(vs.size, 1)
}

test("analytics, basic aggregate extract value") {
Expand Down Expand Up @@ -139,7 +139,7 @@ class LwcEventClientSuite extends FunSuite {
client.process(sampleLwcEvent)
clock.setWallTime(step)
client.process(LwcEvent.HeartbeatLwcEvent(step))
assertEquals(output.result().size, 2)
assertEquals(output.result().size, 1)

// Sync expressions, same set
(2 until 10).foreach { i =>
Expand All @@ -148,7 +148,7 @@ class LwcEventClientSuite extends FunSuite {
client.process(sampleLwcEvent)
clock.setWallTime(step * i)
client.process(LwcEvent.HeartbeatLwcEvent(step * i))
assertEquals(output.result().size, 2)
assertEquals(output.result().size, 1)
}

// Sync expressions, subset
Expand Down

0 comments on commit d013537

Please sign in to comment.