diff --git a/aw_transform/__init__.py b/aw_transform/__init__.py index 1297e42..0382e0f 100644 --- a/aw_transform/__init__.py +++ b/aw_transform/__init__.py @@ -1,20 +1,20 @@ +from .chunk_events_by_key import chunk_events_by_key +from .classify import Rule, categorize, tag from .filter_keyvals import filter_keyvals, filter_keyvals_regex from .filter_period_intersect import filter_period_intersect, period_union, union +from .flood import flood from .heartbeats import heartbeat_merge, heartbeat_reduce from .merge_events_by_keys import merge_events_by_keys -from .chunk_events_by_key import chunk_events_by_key +from .simplify import simplify_string from .sort_by import ( - sort_by_timestamp, - sort_by_duration, - sum_durations, concat, limit_events, + sort_by_duration, + sort_by_timestamp, + sum_durations, ) from .split_url_events import split_url_events -from .simplify import simplify_string -from .flood import flood -from .classify import categorize, tag, Rule -from .union_no_overlap import union_no_overlap +from .union_no_overlap import union_no_overlap, union_stack __all__ = [ "flood", @@ -26,6 +26,7 @@ "filter_period_intersect", "union", "union_no_overlap", + "union_stack", "concat", "sum_durations", "sort_by_timestamp", diff --git a/aw_transform/sort_by.py b/aw_transform/sort_by.py index 5f7b691..25b7c74 100644 --- a/aw_transform/sort_by.py +++ b/aw_transform/sort_by.py @@ -1,6 +1,7 @@ import logging from datetime import timedelta from typing import List + from aw_core.models import Event logger = logging.getLogger(__name__) diff --git a/aw_transform/union_no_overlap.py b/aw_transform/union_no_overlap.py index b1e4723..ab32f80 100644 --- a/aw_transform/union_no_overlap.py +++ b/aw_transform/union_no_overlap.py @@ -3,12 +3,11 @@ """ from copy import deepcopy -from typing import List, Tuple, Optional from datetime import datetime, timedelta, timezone - -from timeslot import Timeslot +from typing import List, Optional, Tuple from aw_core import Event +from timeslot import Timeslot def _split_event(e: Event, dt: datetime) -> Tuple[Event, Optional[Event]]: @@ -83,3 +82,51 @@ def union_no_overlap(events1: List[Event], events2: List[Event]) -> List[Event]: events_union += events1[e1_i:] events_union += events2[e2_i:] return events_union + + +# TODO: rename +def union_stack(events: List[Event]) -> List[Event]: + """Takes a list of events, where some may overlap, and returns a new list of events with no overlap. + + The strategy to resolve overlaps is to walk through the events sorted by their starting timestamp and then iterate over the events, + adding/popping them from a stack depending on wether they haven't ended by the time of the next event start. + + The event with the most recent starting time will always have precendence, but if a later event ends before a previous event, + it will fall back to creating a following event with data of the previous (still active) event. + + Example: + events | 11111 1 | + | 222 222 | + | 3 3 | + result | 12321 2321 | + """ + events = deepcopy(events) + events.sort(key=lambda e: e.timestamp) + events_stack = [] + result = [] + + # The current time is the time at which we have perfect information about past events, but no information about future events + # Events that end on or before the `current_time` are considered to be finished, and should not remain in the stack. + current_time = events[0].timestamp + + for e in events: + # Before adding the event, we need to pop all events that have ended + # Pop events from the stack that have ended, and add them to the union + while ( + events_stack + and events_stack[-1].timestamp + events_stack[-1].duration <= current_time + ): + popped = events_stack.pop() + # + result.append(popped) + + # If we have an active event in the stack, we need to create a new event + # with the data of the active event + if events_stack: + new_e = deepcopy(events_stack[-1]) + new_e.duration = e.timestamp - current_time + result.append(new_e) + + # Add the new event to the stack + + return result diff --git a/tests/test_transforms.py b/tests/test_transforms.py index 90c57eb..2415445 100644 --- a/tests/test_transforms.py +++ b/tests/test_transforms.py @@ -1,5 +1,6 @@ from pprint import pprint from datetime import datetime, timedelta, timezone +from typing import List from aw_core.models import Event from aw_transform import ( @@ -16,11 +17,13 @@ simplify_string, union, union_no_overlap, + union_stack, categorize, tag, Rule, ) from aw_transform.filter_period_intersect import _intersecting_eventpairs +from timeslot import Timeslot def test_simplify_string(): @@ -449,3 +452,46 @@ def test_union_no_overlap(): dur = sum((e.duration for e in events_union), timedelta(0)) assert dur == timedelta(hours=5, minutes=0) assert sorted(events_union, key=lambda e: e.timestamp) + + +def _has_overlap(events: List[Event]) -> bool: + """Returns true if the events have overlap""" + for i, e1 in enumerate(events): + for e2 in events[i + 1 :]: + e1_p = Timeslot(e1.timestamp, e1.timestamp + e1.duration) + e2_p = Timeslot(e2.timestamp, e2.timestamp + e2.duration) + if e1_p.intersects(e2_p): + return True + return False + + +def test_union_stack(): + # In this case, we should get a result where first 1 is active, then 2, then 3, then 1 again + events = [ + Event( + timestamp=datetime(2023, 1, 1, 0, 0), + duration=timedelta(hours=3), + data={"a": 1}, + ), + Event( + timestamp=datetime(2023, 1, 1, 1, 0), + duration=timedelta(hours=1), + data={"a": 2}, + ), + Event( + timestamp=datetime(2023, 1, 1, 3, 0), + duration=timedelta(hours=1), + data={"a": 3}, + ), + ] + + events_union = union_stack(events) + assert len(events_union) == 4 + assert events_union[0].data["a"] == 1 + assert events_union[1].data["a"] == 2 + assert events_union[2].data["a"] == 3 + assert events_union[3].data["a"] == 1 + + # check for overlap + assert not _has_overlap(events_union) +