-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
168 lines (141 loc) · 6.56 KB
/
main.py
File metadata and controls
168 lines (141 loc) · 6.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#!/usr/bin/env python3
from flask import Flask, render_template, request, redirect, url_for, send_file, jsonify
from flask_socketio import SocketIO
from pypylon import pylon
from PIL import Image
import cv2, threading, time, os, shutil, base64, subprocess
app = Flask(__name__)
socketio = SocketIO(app)
# Initialize the camera
camera = pylon.InstantCamera(pylon.TlFactory.GetInstance().CreateFirstDevice())
camera.Open()
camera.StartGrabbing(pylon.GrabStrategy_LatestImageOnly)
# Global variable to store the latest frame
latest_frame = None
capture_thread = None
stream_running = True
progress = 0
# Global variable to control streaming state
capture_in_progress = False
#
# STREAMING
#
# while stream_running is true, capture frames from the camera and emit them to the client
#
def stream():
global latest_frame, stream_running, capture_in_progress
while stream_running:
if not capture_in_progress and camera.IsGrabbing():
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
img = grabResult.Array
# Reduce resolution for streaming
img_resized = cv2.resize(img, (640, 480))
ret, buffer = cv2.imencode('.jpg', img_resized, [int(cv2.IMWRITE_JPEG_QUALITY), 80])
latest_frame = base64.b64encode(buffer).decode('utf-8')
socketio.emit('frame', latest_frame)
grabResult.Release()
time.sleep(0.1)
# Start the frame capture thread
capture_thread = threading.Thread(target=stream)
capture_thread.daemon = True
capture_thread.start()
@app.route('/')
def index():
# Get the current exposure time and push to the client
exposure_time = camera.ExposureTime.GetValue()
return render_template('index.html', exposure_time=int(exposure_time))
@app.route('/set_exposure', methods=['POST'])
def set_exposure():
exposure_time = float(request.form['exposure_time'])
camera.ExposureTime.SetValue(exposure_time)
return redirect(url_for('index'))
def save(num_frames, timestep):
global progress, capture_in_progress
capture_in_progress = True
save_path = '/mnt/usb/captured_images_' + time.strftime('%Y-%m-%d_%H-%M-%S')
os.makedirs(save_path, exist_ok=True)
timestamp = [] # Initialize the timestamp array
# Emit an event to show the progress div
socketio.emit('show_progress')
for i in range(num_frames):
iteration_start_time = time.time()
progress = int( (i + 1) / num_frames * 100)
socketio.emit('progress', {'progress': progress})
time.sleep(timestep)
grabResult = camera.RetrieveResult(5000, pylon.TimeoutHandling_ThrowException)
if grabResult.GrabSucceeded():
current_timestamp = int(time.time() * 1000)
timestamp.append(current_timestamp)
img = grabResult.Array
img_pil = Image.fromarray(img)
img_pil.save(os.path.join(save_path, f'img_{i+1}.tiff'))
grabResult.Release()
iteration_end_time = time.time()
iteration_duration = iteration_end_time - iteration_start_time
sleep_time = max(0, timestep - iteration_duration)
time.sleep(sleep_time)
# Calculate relative timestamps
if timestamp:
first_timestamp = timestamp[0]
relative_timestamps = [ts - first_timestamp for ts in timestamp]
# Save the relative timestamp array to a text file
with open(os.path.join(save_path, 'timestamps.txt'), 'w') as f:
for ts in relative_timestamps:
f.write(f'{ts}\n')
# Emit an event to hide the progress div
socketio.emit('capture_complete')
capture_in_progress = False
#
# USER INTERFACE
#
# Start acquisition
@app.route('/start_acquisition', methods=['POST'])
def start_acquisition():
usb_path = '/mnt/usb'
if not os.path.ismount(usb_path):
return jsonify({'status': 'error', 'message': 'USB is not mounted. Please mount the USB and try again.'}), 500
num_frames = int(request.form['num_frames'])
timestep = float(request.form['timestep'])
capture_thread = threading.Thread(target=save, args=(num_frames, timestep))
capture_thread.start()
return redirect(url_for('index'))
# Below requires the user running the script to have sudo privileges with passwordless sudo enabled
# Unmount USB
@app.route('/umount_usb', methods=['POST'])
def umount_usb():
try:
result = subprocess.run(['sudo', 'umount', '/mnt/usb'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return jsonify({'status': 'success', 'message': 'Unmounting USB disk.'})
except subprocess.CalledProcessError as e:
return jsonify({'status': 'error', 'message': e.stderr.decode('utf-8') + ' Try restarting the Raspberry Pi.'}), 500
# Mount USB
@app.route('/mount_usb', methods=['POST'])
def mount_usb():
try:
result = subprocess.run(['sudo', 'mount', '/mnt/usb'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return jsonify({'status': 'success', 'message': 'Mounting USB disk.'})
except subprocess.CalledProcessError as e:
return jsonify({'status': 'error', 'message': 'USB Already Mounted. If you cant record, try restarting the Raspberry Pi.'}), 500
# Reboot the Rasberry Pi
@app.route('/reboot', methods=['POST'])
def reboot():
try:
result = subprocess.run(['sudo', 'reboot', 'now'], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return jsonify({'status': 'success', 'message': 'Rebooting Pi. Refresh this page in a few seconds.'})
except subprocess.CalledProcessError as e:
return jsonify({'status': 'error', 'message': e.stderr.decode('utf-8') + ' Try restarting the Raspberry Pi using the power button.'}), 500
# Delete all files from USB
@app.route('/delete_usb', methods=['POST'])
def delete_usb():
try:
result = subprocess.run(['sudo rm -rf /mnt/usb/captured_images*'], shell=True, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
return jsonify({'status': 'success', 'message': 'All files deleted from USB.'})
except subprocess.CalledProcessError as e:
return jsonify({'status': 'error', 'message': e.stderr.decode('utf-8') + ' Try restarting the Raspberry Pi using the power button.'}), 500
# @app.route('/download_images')
# def download_images():
# return send_file('/mnt/usb/captured_images.zip', as_attachment=True)
if __name__ == '__main__':
# socketio.run(app, host='0.0.0.0', port=5000)
socketio.run(app, host='192.168.10.1', port=5000, allow_unsafe_werkzeug=True)