-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathq02.py
More file actions
151 lines (117 loc) · 3.88 KB
/
q02.py
File metadata and controls
151 lines (117 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
Smart Bin Encoding & Scheduling
Symbol set (single-character) mapping:
- E : Empty
- H : Half-full
- F : Full
- O : Overflow
Encoding format (compact):
- <STATUS><BIN_ID_IN_BASE36>
- Example: 'F1Z' -> status F, bin id = base36_to_int('1Z')
This module provides:
- encode_bin(status, bin_id) -> str
- decode_message(msg) -> (status, bin_id)
- send_to_central(msg) : decodes and, if needed, schedules collection
We use base36 for compact numeric representation (0-9,A-Z).
"""
from datetime import datetime
import string
# Schedule will be list of dicts: {bin_id: int, status: str, scheduled_at: timestamp}
schedule_map = []
# Symbol mapping (explicit for clarity)
SYMBOLS = {
'E': 'Empty',
'H': 'Half-full',
'F': 'Full',
'O': 'Overflow'
}
def int_to_base36(n) :
"""Convert non-negative integer to base36 string (0-9, A-Z)."""
if n < 0:
raise ValueError("Negative values not supported")
digits = string.digits + string.ascii_uppercase
if n == 0:
return '0'
out = []
while n:
n, rem = divmod(n, 36)
out.append(digits[rem])
return ''.join(reversed(out))
def base36_to_int(s: str):
"""Convert base36 string to integer."""
s = s.strip().upper()
if not s:
raise ValueError('Empty base36 string')
digits = {ch: i for i, ch in enumerate(string.digits + string.ascii_uppercase)}
val = 0
for ch in s:
if ch not in digits:
raise ValueError(f'Invalid base36 char: {ch}')
val = val * 36 + digits[ch]
return val
def encode_bin(status, bin_id):
"""Encode status (one of SYMBOLS keys) and bin_id into compact message.
"""
status = status.upper()
if status not in SYMBOLS:
raise ValueError(f'Unknown status: {status}')
if not isinstance(bin_id, int) or bin_id < 0:
raise ValueError('bin_id must be non-negative integer')
return status + int_to_base36(bin_id)
def decode_message(msg):
"""Decode message into (status, bin_id).
msg must start with one status character then base36 id.
"""
if not msg or len(msg) < 2:
raise ValueError('Invalid message format')
status = msg[0].upper()
if status not in SYMBOLS:
raise ValueError(f'Unknown status symbol: {status}')
bin_part = msg[1:]
bin_id = base36_to_int(bin_part)
return status, bin_id
def should_schedule(status):
"""Policy: schedule when Full or Overflow."""
return status in ('F', 'O')
def create_schedule_entry(bin_id, status):
now = datetime.today()
entry = {'bin_id': bin_id, 'status': SYMBOLS.get(status, status), 'scheduled_at': now}
schedule_map.append(entry)
return entry
def send_to_central(msg):
"""Simulate central system receiving encoded message; decode and schedule if needed."""
try:
status, bin_id = decode_message(msg)
except ValueError as e:
print(f'Receive error: {e}')
return None
print(f'Received: status={SYMBOLS[status]} ({status}), bin_id={bin_id}')
if should_schedule(status):
entry = create_schedule_entry(bin_id, status)
print(f'--> Scheduled collection for bin {bin_id} ({entry["status"]}) at {entry["scheduled_at"]}')
return entry
else:
print('--> No collection needed')
return None
def show_schedule():
if not schedule_map:
print('No scheduled collections')
return
print('\nScheduled Collections:')
for e in schedule_map:
print(f" - Bin {e['bin_id']}: {e['status']} at {e['scheduled_at']}")
if __name__ == '__main__':
# Demo: simulate several bins sending status
demo_bins = [
("E", 0),
("H", 15),
("F", 123),
("O", 9999),
("F", 36),
]
print('Demo transmitting encoded messages:')
for st, bid in demo_bins:
msg = encode_bin(st, bid)
print(f'Outgoing -> {msg}')
send_to_central(msg)
show_schedule()