-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathseven-segment.py
More file actions
executable file
·156 lines (136 loc) · 4.17 KB
/
seven-segment.py
File metadata and controls
executable file
·156 lines (136 loc) · 4.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
#!/usr/bin/env python
from time import sleep
from flask import Flask, abort, request
from uuid import getnode as get_mac
from nmeaserver import server, formatter
import re
import os
import logging
import time
print("*** If you are seeing many lines with 'fake_rpi.RPi' someone forget " \
"to switch back to RPi dependencies (still on the fake_rpi module) ***")
logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(name)s - %(message)s')
logger = logging.getLogger(__name__)
try:
import RPi.GPIO as GPIO
#from fake_rpi.RPi import GPIO as GPIO
except RuntimeError:
logger.error("""Error importing RPi.GPIO! This is probably because you
need superuser privileges. You can achieve this by using
'sudo' to run your script""")
app = Flask(__name__)
nmeaserver = server.NMEAServer('', 4000, True)
pulse_length = 0.1
clear_sleep = 0.3
gpio_list = [12,16,18,22,32,36,38,40] #where 40 is reset
valid_input = '1,2,3,4,#,.'
segment_map = {
'0': [12,16,18,22,32,36],
'1': [16,18],
'2': [12,16,22,32,38],
'3': [12,16,18,22,38],
'4': [16,18,36,38],
'5': [12,18,22,36,38],
'6': [12,18,22,32,36,38],
'7': [12,16,18],
'8': [12,16,18,22,32,36,38],
'9': [12,16,18,36,38],
'A': [12,16,18,32,36,38],
'B': [18,22,32,36,38],
'C': [12,22,32,36],
'D': [16,18,22,32,38],
'E': [12,16,18,22,38],
'F': [12,32,36,38],
'.': [40],
' ': [40],
'#': [40],
}
active = ' '
def enable(hex, duration):
global active
segments = segment_map.get(hex)
if segments != None:
if hex != active:
active = hex
GPIO.output(segments, GPIO.HIGH)
sleep(duration)
GPIO.output(segments, GPIO.LOW)
else:
logging.info("Unknown character '%s'", hex)
@app.route('/clear')
def clear():
enable('#', 1) #pulse_length)
sleep(clear_sleep)
return "cleared"
@app.route('/activate/<hex>')
def hello(hex):
hex = hex.upper()
matchObj = re.match( r'[1-4#\.]', hex)
if matchObj:
clear()
enable(segment, pulse_length)
return "Activated %s" % hex
else:
return "invalid input. Only %s are accepted" % valid_input
@app.route('/')
def index():
global active
mac = ':'.join(("%012X" % get_mac())[i:i+2] for i in range(0, 12, 2))
return '{mac:"%s",active:"%s"}' % (mac,active)
@app.route('/ping')
def ping():
return 'pong'
@app.route('/shutdown', methods=['POST','GET'])
def shutdown():
func = request.environ.get('werkzeug.server.shutdown')
if func is None:
raise RuntimeError('Not running with the Werkzeug Server')
func()
os.system('sudo shutdown -r now')
return 'Server shutting down...'
def bootup():
sleepSec = 0.3
clear()
enable('3', pulse_length)
sleep(sleepSec)
clear()
enable('2', pulse_length)
sleep(sleepSec)
clear()
enable('1', pulse_length)
sleep(sleepSec)
clear()
@nmeaserver.message('RXSSC')
def nmea_enable(context, message):
global active
try:
logger.debug("Received RXSSC message to activate {}".format(message['data'][1]))
segment = message['data'][1]
matchObj = re.match( r'[1-9#\.]', segment)
if matchObj:
clear()
enable(segment, pulse_length)
return nmea_status(context, message)
else:
return formatter.format("RXERR,Only %s are accepted" % valid_input, True)
except BaseException as err:
return formatter.format("RXERR,Invalid message. Example of a valid message: $RXSSC,A,1*39", True)
@nmeaserver.message('RXSTA')
def nmea_status(context, message):
global active
return formatter.format("RXSSS,A,%s,12000" % active, True)
@nmeaserver.response_stream()
def status_stream(context, wfile):
while context['stream']:
try:
wfile.write(nmea_status(context,"") + "\r\n")
wfile.flush
time.sleep(5)
except BaseException:
logger.exception("Caught exception: ")
if __name__ == '__main__':
GPIO.setmode(GPIO.BOARD)
GPIO.setup(gpio_list, GPIO.OUT)
bootup()
nmeaserver.start()
app.run(debug=True, use_reloader=False, host='0.0.0.0')