-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathintent_viewer.py
More file actions
111 lines (95 loc) · 3.54 KB
/
intent_viewer.py
File metadata and controls
111 lines (95 loc) · 3.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import os
import sys
import csv
from operator import itemgetter
from src.file_util import get_files
ARGS = None
def get_voice_intent(path, sth=0, mth=0, eth=float('inf')):
servers = dict()
values = dict()
with open(path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
if row['flag'] != 'up':
continue
if float(row['time']) < sth:
continue
if float(row['time']) > eth:
break
server = round(float(row['point']))
size = float(row['point'])-server
servers[server] = servers.get(server, 0) + size
if values.get(server, (0, 0))[1] < size:
values[server] = (float(row['time']), size)
voice_server = max(servers.items(), key=itemgetter(1))[0]
intent = mth
with open(path, 'r') as f:
reader = csv.DictReader(f)
down_set = set()
for row in reader:
if row['flag'] != 'down':
continue
if round(float(row['point'])) != voice_server:
continue
if float(row['time']) < sth:
continue
if float(row['time']) > eth:
break
size = abs(float(row['point'])-voice_server)
if float(row['time']) <= mth:
down_set.add(size)
continue
if size not in down_set:
intent = float(row['time'])
break
#elif intent == mth:
# intent = float(row['time'])
return voice_server, intent
def get_wav_timing(path):
wav_timing = dict()
with open(path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
device_command = '-'.join((row['device'], row['command']))
wav_timing[device_command] = \
{'commandStart': float(row['commandStart']),
'commandEnd': float(row['commandEnd']),
'serviceStart': float(row['serviceStart'])}
return wav_timing
def main():
wav_timing = dict()
for path in get_files(ARGS.timing, ext='_wav.csv'):
wav_timing.update(get_wav_timing(path))
of = open(ARGS.output, 'w')
writer = csv.writer(of)
writer.writerow(['device', 'command', 'intent'])
for path in get_files(ARGS.input, ext='.csv'):
device_command = '.'.join((path.split('/')[-1]).split('.')[:-1])
device, command = device_command.split('-')
commandStart = wav_timing[device_command]['commandStart']
commandEnd = wav_timing[device_command]['commandEnd']
serviceStart = wav_timing[device_command]['serviceStart']
try:
voice_server, intent = get_voice_intent(path,
float(commandStart), float(commandEnd), float(serviceStart))
except:
print('Error', path)
writer.writerow([device, command, intent])
of.close()
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('-i', '--input',
type=str,
required=True,
help='Input waterfall data directory')
parser.add_argument('-t', '--timing',
type=str,
required=True,
help='Input wav timing data directory')
parser.add_argument('-o', '--output',
type=str,
required=True,
help='Output file')
ARGS = parser.parse_args()
main()