-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbutt.py
More file actions
123 lines (90 loc) · 3.07 KB
/
butt.py
File metadata and controls
123 lines (90 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#!/usr/bin/env python3
"""
TODO
"""
import time
import argparse
import signal
import queue
import gpiozero
import skibase
BUTT_PIN = 26
LONG_PRESS_TIME = 2500
# ============================= Butt ========================================
class Butt(skibase.ThreadModule):
"""
TODO
"""
# === Thread handling ===
def __init__(self, main_queue):
super().__init__("Butt")
self._main_queue = main_queue
# --- Loop ---
def run(self):
button = gpiozero.Button(BUTT_PIN,
pull_up=True)
while not self._got_stop_event():
button.wait_for_press(timeout=0.500)
if not button.is_pressed:
continue
t_press = skibase.get_time_millis()
button.wait_for_release(timeout=(LONG_PRESS_TIME+10)/1000)
t_release = skibase.get_time_millis()
press_time = t_release - t_press
if press_time >= LONG_PRESS_TIME:
skibase.log_debug("Button Long press")
self._main_queue.put(skibase.TASK_BUTTON_LONG_1)
if button.is_pressed:
button.wait_for_release(timeout=None)
self._main_queue.put(skibase.TASK_BUTTON_LONG_2)
else:
self._main_queue.put(skibase.TASK_BUTTON_PRESS)
skibase.log_debug("Button Press")
# ----------------------------- Handling ------------------------------------
def butt_start(main_queue):
butt_obj = Butt(main_queue)
butt_obj.start()
return butt_obj
def butt_stop(butt_obj):
# If still alive; stop
if butt_obj.status():
butt_obj.stop()
butt_obj.join()
# ============================= argparse ====================================
def args_add_butt(parser):
return parser
# ============================= Unittest ====================================
def test():
# Arguments
parser = argparse.ArgumentParser(description=__doc__)
parser = skibase.args_add_log(parser)
parser = args_add_butt(parser)
args = parser.parse_args()
# Parse
skibase.log_config(args.loglevel.upper(), args.syslog)
# Signal
skibase.signal_setup([signal.SIGINT, signal.SIGTERM])
# Start queue
main_queue = queue.Queue()
# Start Butt
butt_obj = butt_start(main_queue)
# Loop
skibase.log_notice("Running butt unittest")
while not skibase.signal_counter and butt_obj.status():
try:
task = main_queue.get(block=True, timeout=0.25)
except queue.Empty:
task = None
if task is not None:
if task == skibase.TASK_BUTTON_PRESS:
skibase.log_notice("butt press")
elif task == skibase.TASK_BUTTON_LONG:
skibase.log_notice("butt long press")
else:
skibase.log_warning("butt got unknown task")
# Stop Butt
butt_stop(butt_obj)
skibase.log_notice("butt unittest ended")
if __name__ == '__main__':
test()
#EOF