Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions aw_transform/__init__.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
from .chunk_events_by_key import chunk_events_by_key
from .classify import Rule, categorize, tag
from .filter_keyvals import filter_keyvals, filter_keyvals_regex
from .filter_period_intersect import filter_period_intersect, period_union, union
from .flood import flood
from .heartbeats import heartbeat_merge, heartbeat_reduce
from .merge_events_by_keys import merge_events_by_keys
from .chunk_events_by_key import chunk_events_by_key
from .simplify import simplify_string
from .sort_by import (
sort_by_timestamp,
sort_by_duration,
sum_durations,
concat,
limit_events,
sort_by_duration,
sort_by_timestamp,
sum_durations,
)
from .split_url_events import split_url_events
from .simplify import simplify_string
from .flood import flood
from .classify import categorize, tag, Rule
from .union_no_overlap import union_no_overlap
from .union_no_overlap import union_no_overlap, union_stack

__all__ = [
"flood",
Expand All @@ -26,6 +26,7 @@
"filter_period_intersect",
"union",
"union_no_overlap",
"union_stack",
"concat",
"sum_durations",
"sort_by_timestamp",
Expand Down
1 change: 1 addition & 0 deletions aw_transform/sort_by.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
from datetime import timedelta
from typing import List

from aw_core.models import Event

logger = logging.getLogger(__name__)
Expand Down
53 changes: 50 additions & 3 deletions aw_transform/union_no_overlap.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@
"""

from copy import deepcopy
from typing import List, Tuple, Optional
from datetime import datetime, timedelta, timezone

from timeslot import Timeslot
from typing import List, Optional, Tuple

from aw_core import Event
from timeslot import Timeslot


def _split_event(e: Event, dt: datetime) -> Tuple[Event, Optional[Event]]:
Expand Down Expand Up @@ -83,3 +82,51 @@ def union_no_overlap(events1: List[Event], events2: List[Event]) -> List[Event]:
events_union += events1[e1_i:]
events_union += events2[e2_i:]
return events_union


# TODO: rename
def union_stack(events: List[Event]) -> List[Event]:
"""Takes a list of events, where some may overlap, and returns a new list of events with no overlap.

The strategy to resolve overlaps is to walk through the events sorted by their starting timestamp and then iterate over the events,
adding/popping them from a stack depending on wether they haven't ended by the time of the next event start.

The event with the most recent starting time will always have precendence, but if a later event ends before a previous event,
it will fall back to creating a following event with data of the previous (still active) event.

Example:
events | 11111 1 |
| 222 222 |
| 3 3 |
result | 12321 2321 |
"""
events = deepcopy(events)
events.sort(key=lambda e: e.timestamp)
events_stack = []
result = []

# The current time is the time at which we have perfect information about past events, but no information about future events
# Events that end on or before the `current_time` are considered to be finished, and should not remain in the stack.
current_time = events[0].timestamp

for e in events:
# Before adding the event, we need to pop all events that have ended
# Pop events from the stack that have ended, and add them to the union
while (
events_stack
and events_stack[-1].timestamp + events_stack[-1].duration <= current_time
):
popped = events_stack.pop()
#
result.append(popped)

# If we have an active event in the stack, we need to create a new event
# with the data of the active event
if events_stack:
new_e = deepcopy(events_stack[-1])
new_e.duration = e.timestamp - current_time
result.append(new_e)

# Add the new event to the stack

return result
46 changes: 46 additions & 0 deletions tests/test_transforms.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from pprint import pprint
from datetime import datetime, timedelta, timezone
from typing import List

from aw_core.models import Event
from aw_transform import (
Expand All @@ -16,11 +17,13 @@
simplify_string,
union,
union_no_overlap,
union_stack,
categorize,
tag,
Rule,
)
from aw_transform.filter_period_intersect import _intersecting_eventpairs
from timeslot import Timeslot


def test_simplify_string():
Expand Down Expand Up @@ -449,3 +452,46 @@ def test_union_no_overlap():
dur = sum((e.duration for e in events_union), timedelta(0))
assert dur == timedelta(hours=5, minutes=0)
assert sorted(events_union, key=lambda e: e.timestamp)


def _has_overlap(events: List[Event]) -> bool:
"""Returns true if the events have overlap"""
for i, e1 in enumerate(events):
for e2 in events[i + 1 :]:
e1_p = Timeslot(e1.timestamp, e1.timestamp + e1.duration)
e2_p = Timeslot(e2.timestamp, e2.timestamp + e2.duration)
if e1_p.intersects(e2_p):
return True
return False


def test_union_stack():
# In this case, we should get a result where first 1 is active, then 2, then 3, then 1 again
events = [
Event(
timestamp=datetime(2023, 1, 1, 0, 0),
duration=timedelta(hours=3),
data={"a": 1},
),
Event(
timestamp=datetime(2023, 1, 1, 1, 0),
duration=timedelta(hours=1),
data={"a": 2},
),
Event(
timestamp=datetime(2023, 1, 1, 3, 0),
duration=timedelta(hours=1),
data={"a": 3},
),
]

events_union = union_stack(events)
assert len(events_union) == 4
assert events_union[0].data["a"] == 1
assert events_union[1].data["a"] == 2
assert events_union[2].data["a"] == 3
assert events_union[3].data["a"] == 1

# check for overlap
assert not _has_overlap(events_union)