diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..751d2a9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +*.pyc +*~ +.idea diff --git a/README.md b/README.md index a3ab42f..0df6234 100644 --- a/README.md +++ b/README.md @@ -7,8 +7,9 @@ The algorithms are Python implementations of the "classical" algorithms, as desc [An Online Algorithm for Segmenting Time Series][keogh], including: - the sliding window algorithm; -- the top-down algorithm; and -- the bottom-up algorithm. +- the top-down algorithm; +- the bottom-up algorithm; +- mSWAB algorithm, defined at *"Described in Van Laerhoven, Kristof, and Bernt Schiele."An Empirical Study of Time Series Approximation Algorithms for Wearable Accelerometers." (2009)."* The code is *not* optimized for performance in any way, but I've found it useful for experimenting and data exploration. diff --git a/example.py b/example.py index 8a6da58..71638b8 100644 --- a/example.py +++ b/example.py @@ -41,7 +41,11 @@ def draw_segments(segments): draw_plot(data,"Top-down with regression") draw_segments(segments) - +#swab with regression +figure() +segments = segment.m_swab(data, fit.regression, fit.sumsquared_error, max_error) +draw_plot(data, "mSwab with regression") +draw_segments(segments) #sliding window with simple interpolation figure() @@ -61,6 +65,12 @@ def draw_segments(segments): draw_plot(data,"Top-down with simple interpolation") draw_segments(segments) +#swab with simple interpolation +figure() +segments = segment.m_swab(data, fit.interpolate, fit.sumsquared_error, max_error) +draw_plot(data, "mSwab with simple interpolation") +draw_segments(segments) + show() diff --git a/segment.py b/segment.py index 65cc66b..ac46d02 100644 --- a/segment.py +++ b/segment.py @@ -1,3 +1,4 @@ +import numpy as np def slidingwindowsegment(sequence, create_segment, compute_error, max_error, seq_range=None): """ @@ -53,18 +54,19 @@ def bottomupsegment(sequence, create_segment, compute_error, max_error): while min(mergecosts) < max_error: idx = mergecosts.index(min(mergecosts)) - segments[idx] = mergesegments[idx] - del segments[idx+1] + + new_seg = create_segment(sequence, (segments[idx][0], segments[idx + 1][2])) + segments[idx] = new_seg + del segments[idx + 1] if idx > 0: - mergesegments[idx-1] = create_segment(sequence,(segments[idx-1][0],segments[idx][2])) - mergecosts[idx-1] = compute_error(sequence,mergesegments[idx-1]) + merge_seg = create_segment(sequence, (segments[idx - 1][0], segments[idx][2])) + mergecosts[idx - 1] = compute_error(sequence, merge_seg) - if idx+1 < len(mergecosts): - mergesegments[idx+1] = create_segment(sequence,(segments[idx][0],segments[idx+1][2])) - mergecosts[idx+1] = compute_error(sequence,mergesegments[idx]) + if idx + 1 < len(mergecosts): + merge_seg = create_segment(sequence, (segments[idx][0], segments[idx + 1][2])) + mergecosts[idx] = compute_error(sequence, merge_seg) - del mergesegments[idx] del mergecosts[idx] return segments @@ -111,3 +113,67 @@ def topdownsegment(sequence, create_segment, compute_error, max_error, seq_range rightsegs = topdownsegment(sequence, create_segment, compute_error, max_error, (bestidx,seq_range[1])) return leftsegs + rightsegs + + +def m_swab(sequence, create_segment, compute_error, max_error, buffer_size=80): + """ + Described in Van Laerhoven, Kristof, and Bernt Schiele. + "An Empirical Study of Time Series Approximation Algorithms for Wearable Accelerometers." (2009). + + Return a list of line segments that approximate the sequence. + + The list is computed using the bottom-up technique. + + Parameters + ---------- + sequence : sequence to segment + create_segment : a function of two arguments (sequence, sequence range) that returns a line segment that approximates the sequence data in the specified range + compute_error: a function of two argments (sequence, segment) that returns the error from fitting the specified line segment to the sequence data + max_error: the maximum allowable line segment fitting error + + """ + + segs = [] + win_left = 0 + win_right = buffer_size-1 + + seq_range = (0, len(sequence)-1) + + while True: #while new data: + swabbuf = sequence[win_left:win_right] + # Bottom-Up segmentation of buffer + T = bottomupsegment(swabbuf, create_segment, compute_error, max_error) + + # add left-most segment from BU: + segs.append(T[0]) + n = len(segs) + + # merge last segments if slope is equal + if n > 2: + last_slope = (segs[-1][3] - segs[-1][1]) // (segs[-1][2] - segs[-1][0]) + b_last_slope = (segs[-2][3] - segs[-2][1]) // (segs[-2][2] - segs[-2][0]) + if last_slope == b_last_slope: + # merge segments + new_segs = segs[:-2] + merged = create_segment(sequence, (segs[-2][0], segs[-1][2])) + new_segs.append(merged) + segs = new_segs + n -= 1 + + # shift left of buffer window: + win_left += segs[-1][0] + # shift right of buffer window: + if win_right < seq_range[1]: + i = win_right + 1 + # get sign of the slope + s = np.sign(sequence[i] - sequence[i-1]) + while np.sign(sequence[i] - sequence[i-1]) == s: + i += 1 + if i >= seq_range[1]: + i -= 1 + break + win_right = i + else: + # all done, flush buffer segments: + segs += T[1:] + return segs \ No newline at end of file