-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmacro_trend_exec.py
More file actions
145 lines (113 loc) · 4.36 KB
/
macro_trend_exec.py
File metadata and controls
145 lines (113 loc) · 4.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Thu Apr 6 11:13:18 2017
Executable for macro_trend function
Returns high level trends, t2 / t1
t1 and t2 MUST be dictionaries of the form:
t2 = {'start':'2017-03-03', 'end':'2017-03-05'}
returns 5 objects:
t1pv - keen API return: page views, articles and obsessions, t1
t2pv - keen API return: page views, articles and obsessions, t2
t1time - keen API return: total time, articles and obsessions, t1
t2time - keen API return: total time, articles and obsessions, t2
tc - sorted DataFrame, merging pvs and time, calculating difference
@author: csaunders
"""
import os
import pickle
import pandas as pd
from standard_imports import time_API
from API_calls import time_spent
from API_calls import articles_obsessions
from queue import Queue
from threading import Thread
def combine_pv_time(df_pv, df_time):
"""returns merged dataframe of page views and times
"""
df_pv = df_pv.groupby('article.obsessions', sort=False).sum().reset_index()
df_time = df_time.groupby('article.obsessions', sort=False).sum().reset_index()
df = df_pv.merge(df_time, on='article.obsessions')
df.columns = ['obsession', 'page views', 'time']
df = df.sort_values('page views', ascending=False)
return df
class DownloadWorker1(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
func, start, end, dump_dir = self.queue.get()
pump_and_dump(func, start, end, dump_dir)
self.queue.task_done()
def run_thread(func, timeframe, dump_dir):
"""func - the API call to run;
timeframe - needs to be a tuple of start, end;
dump_dir - where to temp store the data
"""
queue = Queue()
for x in range(8):
worker = DownloadWorker1(queue)
worker.daemon = True
worker.start()
for start, end in timeframe:
queue.put((func, start, end, dump_dir))
queue.join()
def pump_and_dump(func, start, end, dump_dir):
"""makes a KEEN API call, and then saves the file to the given dump_dir
"""
data = func(start, end)
dump_dir = dump_dir
ref = start[:16] + '--' + end[:16] + '--' + 'name?'
file = dump_dir +'/' + ref + '.pickle'
with open(file, 'wb') as f:
pickle.dump(data, f)
def read_data(dump_dir):
"""used to collect the files that are dumped by the threading
"""
os.chdir(dump_dir)
file_list = os.listdir()
file_list = [i for i in file_list if 'name?' in i]
storage = []
for file in file_list:
with open(file, 'rb') as f:
x1 = pickle.load(f)
df = pd.DataFrame(x1)
storage.append(df)
os.remove(file)
return pd.concat(storage)
def main(t1, t2, dump_dir, API_interval=24):
"""
"""
time = time_API()
# t2 time setting
time.start = t2['start']
time.end = t2['end']
time.set_time()
timeframe = time.custom_time(API_interval)
# t2 API calls via threading
run_thread(articles_obsessions, timeframe, dump_dir)
t2_pv_data = read_data(dump_dir).sort_values('result', ascending=False)
run_thread(time_spent, timeframe, dump_dir)
t2_time_data = read_data(dump_dir).sort_values('result', ascending=False)
# t1 time setting
time.start = t1['start']
time.end = t1['end']
time.set_time()
timeframe = time.custom_time(API_interval)
# t1 API calls via threading
run_thread(articles_obsessions, timeframe, dump_dir)
t1_pv_data = read_data(dump_dir).sort_values('result', ascending=False)
run_thread(time_spent, timeframe, dump_dir)
t1_time_data = read_data(dump_dir).sort_values('result', ascending=False)
# make tc
df1_tot = combine_pv_time(t1_pv_data, t1_time_data)
df2_tot = combine_pv_time(t2_pv_data, t2_time_data)
tc = df2_tot.merge(df1_tot, on='obsession', how='left')
# clean up columns
tc.columns=['obsession', 'pv t2', 'time t2', 'pv t1', 'time t1']
# caluculate pv and time changes of t2 / t1; then sort
tc['pv % change'] = round((tc['pv t2'] / tc['pv t1'] - 1) *100, 1)
tc['time % change'] = round((tc['time t2'] / tc['time t1'] -1) * 100, 1)
tc = tc.sort_values('pv t2', ascending=False)
return t1_pv_data, t2_pv_data, t1_time_data, t2_time_data, tc