-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathencryption.py
More file actions
121 lines (90 loc) · 3.84 KB
/
encryption.py
File metadata and controls
121 lines (90 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
from meshtastic.protobuf import mesh_pb2
from firefly_logging import configure_logging, get_logger, make_log_print
configure_logging()
logger = get_logger("firefly.crypto")
print = make_log_print(logger)
MESHTASTIC_ONE_BYTE_PSK_PREFIX = bytes.fromhex("d4f1bb3a20290759f0bcffabcf4e69")
def expand_short_psk(key: str) -> str:
try:
raw_key = base64.b64decode(key.encode("ascii"), validate=True)
except Exception:
return key
if len(raw_key) != 1 or not 0x01 <= raw_key[0] <= 0x07:
return key
return base64.b64encode(MESHTASTIC_ONE_BYTE_PSK_PREFIX + raw_key).decode("ascii")
def decrypt_packet(mp: mesh_pb2.MeshPacket, key: str, *, silent: bool = False) -> mesh_pb2.Data | None:
"""
Decrypt the encrypted message payload and return the decoded Data object.
Args:
mp: The MeshPacket with encrypted payload.
key: Base64-encoded encryption key.
Returns:
A decoded mesh_pb2.Data object or None on failure.
"""
key = expand_short_psk(key)
try:
key_bytes = base64.b64decode(key.encode("ascii"))
# Build the nonce from message ID and sender
nonce_packet_id = getattr(mp, "id").to_bytes(8, "little")
nonce_from_node = getattr(mp, "from").to_bytes(8, "little")
nonce = nonce_packet_id + nonce_from_node
# Decrypt the encrypted payload
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
decryptor = cipher.decryptor()
decrypted_bytes = decryptor.update(getattr(mp, "encrypted")) + decryptor.finalize()
# Parse the decrypted bytes into a Data object
try:
data = mesh_pb2.Data()
data.ParseFromString(decrypted_bytes)
return data
except Exception as e:
# print(f"Failed to parse Data protobuf: {e}")
return None
except Exception as e:
if not silent:
print(f"Failed to decrypt: {e}")
return None
def encrypt_packet(channel: str, key: str, mp: mesh_pb2.MeshPacket, encoded_message: mesh_pb2.Data) -> bytes | None:
"""
Encrypt an encoded message and return the ciphertext.
Args:
channel: Channel name or ID.
key: Base64-encoded encryption key.
mp: MeshPacket used for ID and from fields (nonce).
encoded_message: Data object to encrypt.
Returns:
The encrypted message bytes or None on failure.
"""
key = expand_short_psk(key)
try:
mp.channel = generate_hash(channel, key)
key_bytes = base64.b64decode(key.encode("ascii"))
nonce_packet_id = getattr(mp, "id").to_bytes(8, "little")
nonce_from_node = getattr(mp, "from").to_bytes(8, "little")
# Put both parts into a single byte array.
nonce = nonce_packet_id + nonce_from_node
cipher = Cipher(algorithms.AES(key_bytes), modes.CTR(nonce), backend=default_backend())
encryptor = cipher.encryptor()
encrypted_bytes = encryptor.update(encoded_message.SerializeToString()) + encryptor.finalize()
return encrypted_bytes
except Exception as e:
print(f"Failed to encrypt: {e}")
return None
def xor_hash(data: bytes) -> int:
"""Compute an XOR hash from bytes."""
result = 0
for char in data:
result ^= char
return result
def generate_hash(name: str, key: str) -> int:
"""generate the channel number by hashing the channel name and psk"""
key = expand_short_psk(key)
replaced_key = key.replace("-", "+").replace("_", "/")
key_bytes = base64.b64decode(replaced_key.encode("utf-8"))
h_name = xor_hash(bytes(name, "utf-8"))
h_key = xor_hash(key_bytes)
result: int = h_name ^ h_key
return result