-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsliplib.py
More file actions
113 lines (95 loc) · 4.03 KB
/
sliplib.py
File metadata and controls
113 lines (95 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
import re
from threading import Lock
END = b'\xc0'
ESC = b'\xdb'
ESC_END = b'\xdc'
ESC_ESC = b'\xdd'
def encode(msg):
"""encode(msg) -> SLIP-encoded message.
"""
msg = bytes(msg)
return END + msg.replace(ESC, ESC+ESC_ESC).replace(END, ESC+ESC_END) + END
#return msg.replace(ESC, ESC+ESC_ESC).replace(END, ESC+ESC_END) + END
def decode(packet):
"""decode(packet) -> message from SLIP-encoded packet
"""
packet = bytes(packet)
return packet.strip(END).replace(ESC+ESC_END, END).replace(ESC+ESC_ESC, ESC)
def is_valid(packet):
"""is_valid(packet) -> indicates if the packet's contents conform to the SLIP specification.
A packet is valid if:
* It contains no END bytes other than leading and/or trailing END bytes, and
* All ESC bytes are followed by an ESC_END or ESC_ESC byte.
"""
packet = packet.strip(END)
return not (END in packet or
packet.endswith(ESC) or
re.search(ESC+b'[^'+ESC_END+ESC_ESC+b']', packet))
class Driver():
def __init__(self, error='ignore'):
self.error = error
self._recv_buffer = b''
self._messages = []
self._recvlock = Lock()
def send(self, data):
"""send(data). Encode data in a SLIP packet.
"""
return encode(data)
# @property
# def packets(self):
# """packets -> Read and flush the packet buffer.
# """
# with self._sendlock:
# result = b''.join(self._packets)
# self._packets = []
# return result
def receive(self, data):
"""receive(data). Handle received SLIP-encoded data.
Extracts packets from data, and decodes them. The resulting messages
are buffered, and can be retrieved by reading the attribute 'messages'.
"""
# Empty data indicates that the data reception is complete.
# To force a buffer flush, an END byte is added, so that the
# current contents of _recv_buffer will form a complete message.
if not data:
data = END
self._recv_buffer += data
# The following situations can occur:
#
# 1) _recv_buffer empty or contains only END bytes --> no new packets
# 2) _recv_buffer contains non-END bytes --> new packets from data
# Strip leading END bytes from _recv_buffer to avoid empty _packets.
self._recv_buffer = self._recv_buffer.lstrip(END)
if not self._recv_buffer:
return
# The _recv_buffer is split on sequences of one or more END bytes.
# The trailing element from the split operation is a possibly incomplete
# packet; this element is therefore used as the new _recv_buffer.
# If _recv_buffer has one or more trailing END bytes, then the last element,
# and therefore the new _recv_buffer, is an empty bytes object.
packets = re.split(END+b'+', self._recv_buffer)
self._recv_buffer = packets.pop(-1)
# Add the decoded _messages to the message buffer.
with self._recvlock:
if self.error == 'strict':
# With strict error checking, only decode valid packets.
# If any invalid packets are present, a ProtocolError exception is raised,
# indicating the invalid packets.
invalid_packets = []
for p in packets:
if is_valid(p):
self._messages.append(decode(p))
else:
invalid_packets.append(p)
if invalid_packets:
raise ProtocolError(invalid_packets)
else:
# For non-strict error checking, ignore protocol violations.
self._messages.extend(decode(p) for p in packets)
@property
def messages(self):
"""messages -> Read and flush the message buffer"""
with self._recvlock:
result = self._messages
self._messages = []
return result