-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathweb.py
More file actions
124 lines (91 loc) · 3.26 KB
/
web.py
File metadata and controls
124 lines (91 loc) · 3.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from flask import Flask, render_template, request, redirect, url_for
import csv
import json
import cgi
from datetime import datetime
from model import db_session, ObjectChange
app = Flask(__name__)
# number of most recent object changes to display on index page
NUM_CHANGES_INDEX = 20
@app.route('/', methods=['GET'])
def index():
status = object_status(request.args)
changes = db_session.query(ObjectChange).\
order_by(ObjectChange.datetime.desc()).\
limit(NUM_CHANGES_INDEX)
return render_template('index.html', status=status, changes=changes)
def object_status(request_args):
required = ['object_id', 'object_type', 'timestamp']
for field in required:
if field not in request.args:
return None
elif request.args.get(field) == '':
return None
object_id = request_args.get('object_id')
object_type = request_args.get('object_type')
timestamp = request_args.get('timestamp')
try:
timestamp = int(timestamp)
except ValueError:
return None
result = query_object(object_id, object_type, timestamp)
return cgi.escape(json.dumps(result))
def update_dict(existing, changes):
for key in changes:
if key not in existing:
existing[key] = changes[key]
elif isinstance(existing[key], dict) and \
isinstance(changes[key], dict):
update_dict(existing[key], changes[key])
else:
existing[key] = changes[key]
def query_object(object_id, object_type, timestamp):
dt = datetime.fromtimestamp(timestamp)
changes = db_session.query(ObjectChange.changes).\
filter(ObjectChange.object_id == object_id).\
filter(ObjectChange.object_type == object_type).\
filter(ObjectChange.datetime <= dt).\
order_by(ObjectChange.datetime).\
all()
result = {}
for change_json, in changes:
change = json.loads(change_json)
update_dict(result, change)
return result
@app.route('/clear_db', methods=['POST'])
def clear_db():
object_changes = db_session.query(ObjectChange)
object_changes.delete()
db_session.commit()
return redirect(url_for('index'))
@app.route('/upload', methods=['POST'])
def handle_upload():
def parse_timestamp(timestamp):
return datetime.fromtimestamp(float(timestamp))
def invalid_csv():
return render_template('invalid.html')
if 'file' not in request.files:
return invalid_csv()
f = request.files['file']
if f.filename == '':
return invalid_csv()
try:
csvreader = csv.reader(f, escapechar='\\')
header = next(csvreader)
except StopIteration:
return invalid_csv()
try:
for csvrow in csvreader:
row = dict(zip(header, csvrow))
change = ObjectChange()
change.object_id = int(row['object_id'])
change.object_type = row['object_type']
change.datetime = parse_timestamp(row['timestamp'])
change.changes = row['object_changes']
# validate json
json.loads(change.changes)
db_session.add(change)
except (KeyError, ValueError, csv.Error):
return invalid_csv()
db_session.commit()
return redirect(url_for('index'))