Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*.pyc
*~
.idea
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ The algorithms are Python implementations of the "classical" algorithms, as desc
[An Online Algorithm for Segmenting Time Series][keogh], including:

- the sliding window algorithm;
- the top-down algorithm; and
- the bottom-up algorithm.
- the top-down algorithm;
- the bottom-up algorithm;
- mSWAB algorithm, defined at *"Described in Van Laerhoven, Kristof, and Bernt Schiele."An Empirical Study of Time Series Approximation Algorithms for Wearable Accelerometers." (2009)."*

The code is *not* optimized for performance in any way, but I've found it useful for
experimenting and data exploration.
Expand Down
12 changes: 11 additions & 1 deletion example.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ def draw_segments(segments):
draw_plot(data,"Top-down with regression")
draw_segments(segments)


#swab with regression
figure()
segments = segment.m_swab(data, fit.regression, fit.sumsquared_error, max_error)
draw_plot(data, "mSwab with regression")
draw_segments(segments)

#sliding window with simple interpolation
figure()
Expand All @@ -61,6 +65,12 @@ def draw_segments(segments):
draw_plot(data,"Top-down with simple interpolation")
draw_segments(segments)

#swab with simple interpolation
figure()
segments = segment.m_swab(data, fit.interpolate, fit.sumsquared_error, max_error)
draw_plot(data, "mSwab with simple interpolation")
draw_segments(segments)


show()

82 changes: 74 additions & 8 deletions segment.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import numpy as np

def slidingwindowsegment(sequence, create_segment, compute_error, max_error, seq_range=None):
"""
Expand Down Expand Up @@ -53,18 +54,19 @@ def bottomupsegment(sequence, create_segment, compute_error, max_error):

while min(mergecosts) < max_error:
idx = mergecosts.index(min(mergecosts))
segments[idx] = mergesegments[idx]
del segments[idx+1]

new_seg = create_segment(sequence, (segments[idx][0], segments[idx + 1][2]))
segments[idx] = new_seg
del segments[idx + 1]

if idx > 0:
mergesegments[idx-1] = create_segment(sequence,(segments[idx-1][0],segments[idx][2]))
mergecosts[idx-1] = compute_error(sequence,mergesegments[idx-1])
merge_seg = create_segment(sequence, (segments[idx - 1][0], segments[idx][2]))
mergecosts[idx - 1] = compute_error(sequence, merge_seg)

if idx+1 < len(mergecosts):
mergesegments[idx+1] = create_segment(sequence,(segments[idx][0],segments[idx+1][2]))
mergecosts[idx+1] = compute_error(sequence,mergesegments[idx])
if idx + 1 < len(mergecosts):
merge_seg = create_segment(sequence, (segments[idx][0], segments[idx + 1][2]))
mergecosts[idx] = compute_error(sequence, merge_seg)

del mergesegments[idx]
del mergecosts[idx]

return segments
Expand Down Expand Up @@ -111,3 +113,67 @@ def topdownsegment(sequence, create_segment, compute_error, max_error, seq_range
rightsegs = topdownsegment(sequence, create_segment, compute_error, max_error, (bestidx,seq_range[1]))

return leftsegs + rightsegs


def m_swab(sequence, create_segment, compute_error, max_error, buffer_size=80):
"""
Described in Van Laerhoven, Kristof, and Bernt Schiele.
"An Empirical Study of Time Series Approximation Algorithms for Wearable Accelerometers." (2009).

Return a list of line segments that approximate the sequence.

The list is computed using the bottom-up technique.

Parameters
----------
sequence : sequence to segment
create_segment : a function of two arguments (sequence, sequence range) that returns a line segment that approximates the sequence data in the specified range
compute_error: a function of two argments (sequence, segment) that returns the error from fitting the specified line segment to the sequence data
max_error: the maximum allowable line segment fitting error

"""

segs = []
win_left = 0
win_right = buffer_size-1

seq_range = (0, len(sequence)-1)

while True: #while new data:
swabbuf = sequence[win_left:win_right]
# Bottom-Up segmentation of buffer
T = bottomupsegment(swabbuf, create_segment, compute_error, max_error)

# add left-most segment from BU:
segs.append(T[0])
n = len(segs)

# merge last segments if slope is equal
if n > 2:
last_slope = (segs[-1][3] - segs[-1][1]) // (segs[-1][2] - segs[-1][0])
b_last_slope = (segs[-2][3] - segs[-2][1]) // (segs[-2][2] - segs[-2][0])
if last_slope == b_last_slope:
# merge segments
new_segs = segs[:-2]
merged = create_segment(sequence, (segs[-2][0], segs[-1][2]))
new_segs.append(merged)
segs = new_segs
n -= 1

# shift left of buffer window:
win_left += segs[-1][0]
# shift right of buffer window:
if win_right < seq_range[1]:
i = win_right + 1
# get sign of the slope
s = np.sign(sequence[i] - sequence[i-1])
while np.sign(sequence[i] - sequence[i-1]) == s:
i += 1
if i >= seq_range[1]:
i -= 1
break
win_right = i
else:
# all done, flush buffer segments:
segs += T[1:]
return segs