-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnon_obsession_exec.py
More file actions
122 lines (93 loc) · 3.57 KB
/
non_obsession_exec.py
File metadata and controls
122 lines (93 loc) · 3.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
non_obsession_exec
"""
import os
import pickle
import pandas as pd
from API_calls import non_obsession_info
from queue import Queue
from threading import Thread
class DownloadWorker1(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
func, start, end, dump_dir = self.queue.get()
pump_and_dump(func, start, end, dump_dir)
self.queue.task_done()
def run_thread(func, timeframe, dump_dir):
"""func - the API call to run;
timeframe - needs to be a tuple of start, end;
dump_dir - where to temp store the data
"""
queue = Queue()
for x in range(8):
worker = DownloadWorker1(queue)
worker.daemon = True
worker.start()
for start, end in timeframe:
queue.put((func, start, end, dump_dir))
queue.join()
def pump_and_dump(func, start, end, dump_dir):
"""makes a KEEN API call, and then saves the file to the given dump_dir
"""
data = func(start, end)
dump_dir = dump_dir
ref = start[:16] + '--' + end[:16] + '--' + 'name?'
file = dump_dir +'/' + ref + '.pickle'
with open(file, 'wb') as f:
pickle.dump(data, f)
def read_data(dump_dir):
"""used to collect the files that are dumped by the threading
"""
os.chdir(dump_dir)
file_list = os.listdir()
file_list = [i for i in file_list if 'name?' in i]
storage = []
for file in file_list:
with open(file, 'rb') as f:
x1 = pickle.load(f)
df = pd.DataFrame(x1)
storage.append(df)
os.remove(file)
return pd.concat(storage)
def main(pv_data, time_data, timeframe, dump_dir, num):
"""
timeframe needs to cover the timerange of pv_data and time_data
"""
timeframe = timeframe[::int(len(timeframe) / 8)]
run_thread(non_obsession_info, timeframe, dump_dir)
df = read_data(dump_dir)
df['dupe'] = df['article.id'].duplicated()
dfz = df[df['dupe'] == False]
del dfz['dupe']
del dfz['result']
df_pv = pv_data[pv_data['article.obsessions'] == '']
df_pv = df_pv.groupby('article.id').sum().reset_index()
dfz = dfz.merge(df_pv, on='article.id')
df_t = df_t = time_data[time_data['article.obsessions'] == '']
df_t = df_t.groupby('article.id').sum().reset_index()
dfz = dfz.merge(df_t, on='article.id')
dfz.columns = ['article.id', 'article.headline.content',
'article.content.words.count', 'article.topic', 'page views', 'time (s)']
dfz = dfz[['article.id', 'article.headline.content', 'article.topic',
'article.content.words.count', 'page views', 'time (s)']]
storage = {}
dfzz = dfz[dfz['page views'] >= num]
thresh_string = ('article count >= ' + str(num) + 'pvs')
for topic in set(dfzz['article.topic']):
dft = dfzz[dfzz['article.topic'] == topic]
storage.setdefault(thresh_string, []).append(dft['article.id'].nunique())
storage.setdefault('total time (h)', []).append(
int(dft['time (s)'].sum() / 3600))
storage.setdefault('page views', []).append(dft['page views'].sum())
storage.setdefault('topic', []).append(topic)
df = pd.DataFrame(storage)
cumul_string = 'avg cumulative time per article (h)'
df[cumul_string] = df['total time (h)'] / df[thresh_string]
df[cumul_string] = df[cumul_string].apply(lambda x: int(x))
df = df.sort_values(cumul_string, ascending=False)
return dfz, df