diff --git a/aw_transform/flood.py b/aw_transform/flood.py index f1a5a88..4b27265 100644 --- a/aw_transform/flood.py +++ b/aw_transform/flood.py @@ -8,23 +8,107 @@ logger = logging.getLogger(__name__) +def _flood(e1: Event, e2: Event): + """Floods the larger event over the smaller event, in-place""" + # Prioritize flooding from the longer event + # NOTE: Perhaps better to flood from the former event? + e2_end = e2.timestamp + e2.duration + if e1.duration >= e2.duration: + if e1.data == e2.data: + # Extend e1 to the end of e2 + # Set duration of e2 to zero (mark to delete) + e1.duration = e2_end - e1.timestamp + e2.timestamp = e2_end + e2.duration = timedelta(0) + else: + # Extend e1 to the start of e2 + e1.duration = e2.timestamp - e1.timestamp + else: + if e1.data == e2.data: + # Extend e2 to the start of e1, discard e1 + e2.timestamp = e1.timestamp + e2.duration = e2_end - e2.timestamp + e1.duration = timedelta(0) + else: + # Extend e2 backwards to end of e1 + e2.timestamp = e1.timestamp + e1.duration + e2.duration = e2_end - e2.timestamp + + +def _flood_first(e1: Event, e2: Event): + """Floods the larger event over the smaller event, in-place""" + # Prioritize flooding from the longer event + # NOTE: Perhaps better to flood from the former event? + e2_end = e2.timestamp + e2.duration + if e1.duration >= e2.duration: + if e1.data == e2.data: + # Extend e1 to the end of e2 + # Set duration of e2 to zero (mark to delete) + e1.duration = e2_end - e1.timestamp + e2.timestamp = e2_end + e2.duration = timedelta(0) + else: + # Extend e1 to the start of e2 + e1.duration = e2.timestamp - e1.timestamp + else: + if e1.data == e2.data: + # Extend e2 to the start of e1, discard e1 + e2.timestamp = e1.timestamp + e2.duration = e2_end - e2.timestamp + e1.duration = timedelta(0) + else: + # Extend e2 backwards to end of e1 + e2.timestamp = e1.timestamp + e1.duration + e2.duration = e2_end - e2.timestamp + + +def _trim(e1: Event, e2: Event): + """Trims the part of a smaller event covered by a larger event, in-place""" + e1_end = e1.timestamp + e1.duration + e2_end = e2.timestamp + e2.duration + + if e1.duration > e2.duration: + # Trim e2 to remove overlap + e2.timestamp = e1_end + e2.duration = e2_end - e1_end + else: + # Trim e1 to remove overlap + e1.duration = e2.timestamp - e1.timestamp + + def flood(events: List[Event], pulsetime: float = 5) -> List[Event]: """ - Takes a list of events and "floods" any empty space between events by extending one of the surrounding events to cover the empty space. + Floods event to the nearest neighbouring event if within the specified ``pulsetime``. + + Takes a list of ``events`` and "floods" empty space between events smaller than ``pulsetime``, by extending one of the surrounding events to cover the empty space. + + Also merges events if they have the same data and are within the pulsetime + + Originally written in aw-research: https://github.com/ActivityWatch/aw-analysis/blob/7da1f2cd8552f866f643501de633d74cdecab168/aw_analysis/flood.py + + Also implemented in aw-server-rust: https://github.com/ActivityWatch/aw-server-rust/blob/master/aw-transform/src/flood.rs + + # Example + + ```ignore + pulsetime: 1 second (one space) + input: [a] [a] [b][b] [b][c] + output: [a ][b ] [b][c] + ``` For more details on flooding, see this issue: - https://github.com/ActivityWatch/activitywatch/issues/124 + + NOTE: This algorithm has a lot of smaller details that need to be + carefully considered by anyone wishing to edit it, see: + - https://github.com/ActivityWatch/aw-core/pull/73 """ - # Originally written in aw-research: https://github.com/ActivityWatch/aw-analysis/blob/7da1f2cd8552f866f643501de633d74cdecab168/aw_analysis/flood.py - # NOTE: This algorithm has a lot of smaller details that need to be - # carefully considered by anyone wishing to edit it, see: - # - https://github.com/ActivityWatch/aw-core/pull/73 events = deepcopy(events) - events = sorted(events, key=lambda e: e.timestamp) + events = sorted(events, key=lambda e: (e.timestamp, e.duration)) # If negative gaps are smaller than this, prune them to become zero - negative_gap_trim_thres = timedelta(seconds=0.1) + gap_trim_thres = timedelta(seconds=0.1) warned_about_negative_gap_safe = False warned_about_negative_gap_unsafe = False @@ -44,45 +128,26 @@ def flood(events: List[Event], pulsetime: float = 5) -> List[Event]: e2.timestamp, e2.duration = end, timedelta(0) if not warned_about_negative_gap_safe: logger.warning( - "Gap was of negative duration but could be safely merged ({}s). This message will only show once per batch.".format( - gap.total_seconds() - ) + f"Gap was negative but could be safely merged ({gap.total_seconds()}s). Will only warn once per batch." # {e1.data} ) + logger.debug(f"{e1.data}") warned_about_negative_gap_safe = True - elif gap < -negative_gap_trim_thres and not warned_about_negative_gap_unsafe: + elif gap < -gap_trim_thres: # Events with negative gap but differing data cannot be merged safely - logger.warning( - "Gap was of negative duration and could NOT be safely merged ({}s). This warning will only show once per batch.".format( - gap.total_seconds() + # We still need to get rid of the gap however, so we will trim the smaller event. + # TODO: This might be a bad idea, could lead to a huge chunk of non-AFK time getting whacked, or vice versa. + _trim(e1, e2) + + if not warned_about_negative_gap_unsafe: + logger.warning( + f"Gap was negative and could NOT be safely merged ({gap.total_seconds()}s). Will only warn once per batch." ) - ) - warned_about_negative_gap_unsafe = True - # logger.warning("Event 1 (id {}): {} {}".format(e1.id, e1.timestamp, e1.duration)) - # logger.warning("Event 2 (id {}): {} {}".format(e2.id, e2.timestamp, e2.duration)) - elif -negative_gap_trim_thres < gap <= timedelta(seconds=pulsetime): - e2_end = e2.timestamp + e2.duration - - # Prioritize flooding from the longer event - if e1.duration >= e2.duration: - if e1.data == e2.data: - # Extend e1 to the end of e2 - # Set duration of e2 to zero (mark to delete) - e1.duration = e2_end - e1.timestamp - e2.timestamp = e2_end - e2.duration = timedelta(0) - else: - # Extend e1 to the start of e2 - e1.duration = e2.timestamp - e1.timestamp - else: - if e1.data == e2.data: - # Extend e2 to the start of e1, discard e1 - e2.timestamp = e1.timestamp - e2.duration = e2_end - e2.timestamp - e1.duration = timedelta(0) - else: - # Extend e2 backwards to end of e1 - e2.timestamp = e1.timestamp + e1.duration - e2.duration = e2_end - e2.timestamp + logger.debug(f"{e1.data} != {e2.data}") + warned_about_negative_gap_unsafe = True + # logger.warning("Event 1 (id {}): {} {}".format(e1.id, e1.timestamp, e1.duration)) + # logger.warning("Event 2 (id {}): {} {}".format(e2.id, e2.timestamp, e2.duration)) + elif -gap_trim_thres < gap <= timedelta(seconds=pulsetime): + _flood(e1, e2) # Filter out remaining zero-duration events events = [e for e in events if e.duration > timedelta(0)] diff --git a/tests/test_flood.py b/tests/test_flood.py index e0d9ebf..224fe8a 100644 --- a/tests/test_flood.py +++ b/tests/test_flood.py @@ -67,7 +67,7 @@ def test_flood_negative_gap_differing_data(): Event(timestamp=now, duration=100, data={"b": 1}), ] flooded = flood(events) - assert flooded == events + assert flooded == [events[1]] def test_flood_negative_small_gap_differing_data(): @@ -78,3 +78,32 @@ def test_flood_negative_small_gap_differing_data(): flooded = flood(events) duration = sum((e.duration for e in flooded), timedelta(0)) assert duration == timedelta(seconds=100 + 99.99) + + +def test_flood_idempotent(): + events = [ + # slight overlap, same data + Event(timestamp=now, duration=10, data={"a": 0}), + Event(timestamp=now + 9 * td1s, duration=5, data={"a": 0}), + # different data, no overlap + Event(timestamp=now + 15 * td1s, duration=5, data={"b": 0}), + ] + flood_first = flood(events, pulsetime=0) + flooded = flood_first + for i in range(2): + flooded = flood(flooded, pulsetime=0) + assert flood_first == flooded + + assert sum((e.duration for e in flooded), timedelta(0)) == 19 * td1s + + +def test_flood_unsafe_gap(): + events = [ + # slight overlap, different data + Event(timestamp=now, duration=10, data={"a": 0}), + Event(timestamp=now + 9 * td1s, duration=5, data={"b": 0}), + ] + flooded = flood(events, pulsetime=0) + + # The total duration should not exceed the range duration + assert sum((e.duration for e in flooded), timedelta(0)) == 14 * td1s