-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathanalysis_abnormal_detection_loop.py
More file actions
118 lines (90 loc) · 3.14 KB
/
analysis_abnormal_detection_loop.py
File metadata and controls
118 lines (90 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
"""
Anomaly Detection (ad) Using hp filter and mad test
liubenyuan <liubenyuan@gmail.com>
"""
import numpy as np
import pandas as pd
from scipy import sparse, stats
import matplotlib.pyplot as plt
from datetime import datetime
import os
import configparser
parser = configparser.ConfigParser()
parser.read('config.ini')
current_dir = os.path.dirname(os.path.realpath(__file__))
stock_symbol = parser.get('analysis','stock_symbol')
base_dir = parser.get('directory','base_dir')
in_dir = parser.get('directory','company_stock_marketprice_baseprice_prefilter')
out_dir = parser.get('directory','company_stock_marketprice_processed')
df= pd.read_csv(current_dir+"/"+base_dir+"/"+in_dir+"/"+in_dir+'_'+stock_symbol+'.csv')
data = pd.DataFrame({'date': df['date'].map(lambda x: datetime.strptime(str(x),'%Y-%m-%d'))})
# Hodrick Prescott filter
def hp_filter(x, lamb=5000):
w = len(x)
b = [[1]*w, [-2]*w, [1]*w]
D = sparse.spdiags(b, [0, 1, 2], w-2, w)
I = sparse.eye(w)
B = (I + lamb*(D.transpose()*D))
return sparse.linalg.dsolve.spsolve(B, x)
def mad(data, axis=None):
return np.mean(np.abs(data - np.mean(data, axis)), axis)
def AnomalyDetection(x, alpha=0.2, lamb=5000):
"""
x : pd.Series
alpha : The level of statistical significance with which to
accept or reject anomalies. (expon distribution)
lamb : penalize parameter for hp filter
return r : Data frame containing the index of anomaly
"""
# calculate residual
xhat = hp_filter(x, lamb=lamb)
resid = x - xhat
# drop NA values
ds = pd.Series(resid)
ds = ds.dropna()
# Remove the seasonal and trend component,
# and the median of the data to create the univariate remainder
md = np.median(x)
data = ds - md
# process data, using median filter
ares = (data - data.median()).abs()
data_sigma = data.mad() + 1e-12
ares = ares/data_sigma
# compute significance
p = 1. - alpha
R = stats.expon.interval(p, loc=ares.mean(), scale=ares.std())
threshold = R[1]
# extract index, np.argwhere(ares > md).ravel()
r_id = ares.index[ares > threshold]
return r_id
# demo
def main():
# fix
np.random.seed(42)
# sample signals
N = 1024 # number of sample points
t = data['date']
y = df['close']
# outliers are assumed to be step/jump events at sampling points
M = 3 # number of outliers
for ii, vv in zip(np.random.rand(M)*N, np.random.randn(M)):
y[int(ii):] += vv
# detect anomaly
r_idx = AnomalyDetection(y, alpha=0.1)
#for x in range(0,len(r_idx)):
# print('Abnormality Detect on '+str(data['date'][r_idx[x]]))
# plot the result
plt.xlabel('Date')
#plt.plot(y, 'b-')
plt.plot(data['date'][r_idx], y[r_idx], 'ro')
plt.twinx()
plt.ylabel("Price")
plt.tick_params(axis="y")
plt.ylabel("Price")
plt.plot(data['date'], df['close'], "b-", linewidth=1)
plt.title(stock_symbol+" Anomaly Graph")
#plt.show()
plt.savefig(current_dir+"/"+base_dir+"/"+out_dir+'/'+stock_symbol+"/"+out_dir+'_anomaly_'+stock_symbol+'.png')
plt.close()
if __name__ == "__main__":
main()