-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcommon.py
More file actions
657 lines (562 loc) · 20.4 KB
/
common.py
File metadata and controls
657 lines (562 loc) · 20.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
"""
Common utilities for IEC 104 driver.
This module provides core functionality for the IEC 104 driver including:
- Configuration loading from TOML files
- Logging setup with rotation
- Signal data structures and storage
- Client state management
- CSV signal configuration loading
"""
import logging
from logging.handlers import RotatingFileHandler
import sys
from fnmatch import fnmatch
from dataclasses import dataclass, field
from types import SimpleNamespace
from datetime import datetime, timezone
import time
import tomllib
import queue
from typing import Optional, Any, Callable, Iterable
import socket
from threading import Lock, Event, RLock
import const
import csv
_UTC = timezone.utc
def utcnow() -> datetime:
return datetime.now(_UTC)
# ---- Driver Configuration Loading ----
@dataclass
class Conf:
"""Configuration dataclass for the IEC 104 driver.
Holds all configuration parameters loaded from TOML file.
"""
nw_port: int
nw_max_client: int
nw_bind_ip: str
nw_allow_ip: list[str]
prot_ca: int
prot_t3: int
prot_k: int
prot_w: int
prot_strict_coa: bool
max_rx_buf: int
sim_sc: str
sg_addr: str
log_file_lvl: str
log_console_lvl: str
log_name: str
log_fname: str
log_backup: int
log_size: int
log_i_frame_stats_every: int # I-frame statistics logging interval (every N sent)
history_file: str
def load_config(path: str = "config.toml") -> Conf:
"""Load configuration from TOML file.
Args:
path: Path to the TOML configuration file.
Returns:
Conf object populated with configuration values.
Raises:
FileNotFoundError: If configuration file doesn't exist.
tomllib.TOMLDecodeError: If TOML parsing fails.
Example:
>>> conf = load_config("my_config.toml")
>>> print(conf.nw_port)
2404
"""
with open(path, 'rb') as f:
data = tomllib.load(f)
return Conf(
nw_port=data['nw']['port'],
nw_max_client=data['nw']['max_clients'],
nw_bind_ip=data['nw']['bind_ip'],
nw_allow_ip=data['nw']['allow_ip'],
prot_ca=data['prot']['ca'],
prot_t3=data['prot']['t3'],
prot_k=data['prot']['k'],
prot_w=data['prot']['w'],
prot_strict_coa=data['prot'].get('strict_coa', True),
max_rx_buf=data['prot']['max_rx_buf'],
sim_sc=data['sim']['sc'],
sg_addr=data['sg']['addr'],
log_file_lvl=data['log']['file_lvl'],
log_console_lvl=data['log']['console_lvl'],
log_fname=data['log']['fname'],
log_name=data['log']['name'],
log_backup=data['log']['backup'],
log_size=data['log']['size'],
log_i_frame_stats_every=data['log'].get('i_frame_stats_every', 1000),
history_file=data.get('client', {}).get('history_file', ''),
)
# ---- Logging Setup ----
def setup_logging(conf: Conf) -> logging.Logger:
"""Configure logger with format "timestamp\tname\tlevel\tmessage".
Configures both file (with rotation) and console logging handlers.
Uses RotatingFileHandler to manage log file size and backup count.
Args:
conf: Configuration object with logging parameters:
- log_name: Logger name (appears in each record)
- log_file_lvl: File logging level (DEBUG, INFO, etc.)
- log_console_lvl: Console logging level
- log_fname: Path to log file
- log_size: Maximum log file size in MB
- log_backup: Number of backup files to keep
Returns:
Configured logger instance.
Note:
Time format: YYYY-MM-DD HH:MM:SS.mmm
Example: 2024-01-15 14:30:25.123
"""
logger = logging.getLogger(conf.log_name)
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter(
'%(asctime)s.%(msecs)03d\t%(name)s\t%(levelname)s\t%(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# File handler with rotation
file_lvl = getattr(logging, conf.log_file_lvl.upper(), logging.DEBUG)
rotate_handler = RotatingFileHandler(
conf.log_fname,
maxBytes=conf.log_size * 1024 * 1024,
backupCount=conf.log_backup,
encoding='utf-8'
)
rotate_handler.setFormatter(formatter)
rotate_handler.setLevel(file_lvl)
logger.addHandler(rotate_handler)
# Console handler
console_lvl = getattr(logging, conf.log_console_lvl.upper(), logging.CRITICAL)
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
console_handler.setLevel(console_lvl)
logger.addHandler(console_handler)
return logger
# ---- Signal Configuration ----
@dataclass
class SignalConf:
"""Signal configuration dataclass.
Stores all information about a single signal point including its
IEC 104 addressing, value, quality, and change threshold.
Attributes:
id: Internal signal identifier.
ioa: Information Object Address (IEC 104 address).
asdu: ASDU type identifier.
name: Human-readable signal name.
dsc: Optional description.
val: Current signal value.
ts: Timestamp of last value update.
q: Quality byte (0=good, 1=invalid, etc.).
threshold: Minimum change to trigger update (None = always trigger).
"""
id: int
ioa: int
asdu: int
name: str
dsc: str = ''
threshold: Optional[float] = None
conv: str = ''
val: Any = 0.0
ts: datetime = field(default_factory=utcnow)
q: int = 0x00
iv: bool = False
@dataclass
class IecEvent:
"""IEC 104 event dataclass for queuing and transmission.
Attributes:
id: Internal signal identifier.
ioa: Information Object Address.
asdu: ASDU type identifier.
val: Current signal value.
ts: Timestamp of the event.
q: Quality byte.
iv: Invalid time flag (CP56Time2a IV bit).
cot: Cause of transmission (default 3 = spontaneous).
"""
id: int
ioa: int
asdu: int
val: Any
ts: datetime = field(default_factory=utcnow)
q: int = 0
iv: bool = False
cot: int = 3
def create_data_storage():
"""Create a thread-safe data storage for signals.
Implements a centralized storage for all signal points with:
- Indexes by ID, IOA, and name for fast lookup
- Threshold-based change detection
- Subscriber notification on value changes
- Thread-safe operations using locks
Returns:
SimpleNamespace with methods:
add_signal(id, ioa, asdu, name, val, threshold): Add a new signal.
update_val(val, id=None, ioa=None, q=0, ts=None): Update signal value.
get_all(): Get all signals as dict.
get_signal(id=None, ioa=None): Get specific signal.
get_signal_by_name(pattern): Get signals matching name pattern.
subscribe(client_id, queue): Subscribe to updates.
unsubscribe(client_id): Remove subscription.
get_all_for_gi(): Generator for general interrogation responses.
Example:
>>> storage = create_data_storage()
>>> storage.add_signal(1, 45, 36, "pressure", 0.0, 0.1)
>>> storage.update_val(100.5, ioa=45)
"""
_signals = {}
_ioa_idx = {}
_name_idx = {}
_lock = Lock()
_subs = {}
def get_all_for_gi():
"""Generate events for all signals for general interrogation.
Yields:
IecEvent for each signal with COT=20 (interrogation response).
"""
with _lock:
snapshot = list(_signals.values())
for sg in snapshot:
yield IecEvent(
id=sg.id,
ioa=sg.ioa,
asdu=sg.asdu,
val=sg.val,
ts=sg.ts,
q=sg.q,
iv=sg.iv,
cot=20
)
def add_signal(id, ioa, asdu, name, threshold):
"""Add a new signal to storage.
Args:
id: Internal signal identifier.
ioa: Information Object Address.
asdu: ASDU type identifier.
name: Human-readable signal name.
val: Initial value.
threshold: Minimum change to trigger update.
Raises:
ValueError: If IOA already exists.
"""
with _lock:
_signals[id] = SignalConf(
id=id,
ioa=ioa,
asdu=asdu,
name=name,
ts=utcnow(),
threshold=threshold
)
if ioa in _ioa_idx:
raise ValueError(f'IOA {ioa} already exists')
_ioa_idx[ioa] = id
_name_idx[name.lower()] = id
def update_val(val, *, id=None, ioa=None, q=0, iv=False, ts=None):
"""Update signal value and notify subscribers if threshold exceeded.
Args:
val: New value.
id: Internal signal ID (mutually exclusive with ioa).
ioa: IOA (mutually exclusive with id).
q: Quality byte.
iv: Invalid time flag (CP56Time2a IV bit).
ts: Optional timestamp (defaults to now).
Returns:
bool: True if value was updated, False if unchanged or invalid.
Raises:
ValueError: If both id and ioa are specified or neither is.
"""
if (id is not None) == (ioa is not None):
raise ValueError('Cannot specify both id and ioa simultaneously')
with _lock:
if ioa is not None:
id = _ioa_idx.get(ioa)
if id is None:
return False
sg = _signals.get(id)
if not sg:
return False
q_change = (q != sg.q) or (iv != sg.iv)
val_change = False
if sg.threshold is not None:
try:
if abs(val - sg.val) >= sg.threshold:
val_change = True
except (TypeError, ValueError):
val_change = (val != sg.val)
else:
val_change = (val != sg.val)
if not val_change and not q_change:
return False
sg.ts = ts or utcnow()
sg.val = val
sg.q = q
sg.iv = iv
event = IecEvent(id=id, ioa=sg.ioa, asdu=sg.asdu, val=sg.val, ts=sg.ts, q=sg.q, iv=sg.iv)
if sg.asdu >= 45:
return True
targets = list(_subs.values())
for sub in targets:
sub.put_nowait(event)
return True
def get_signal(id: int | None = None, ioa: int | None = None):
"""Get a signal by ID or IOA.
Args:
id: Internal signal ID (mutually exclusive with ioa).
ioa: IOA (mutually exclusive with id).
Returns:
dict: Dictionary with single signal entry, or empty dict if not found.
Raises:
ValueError: If both id and ioa are specified or neither is.
"""
if (id is not None) == (ioa is not None):
raise ValueError('Cannot specify both id and ioa simultaneously')
with _lock:
if id is None:
id = _ioa_idx.get(ioa)
if id is None or id not in _signals:
return {}
return {id: _signals[id]}
def get_signal_by_name(name_patt: str):
"""Get signals matching name pattern (fnmatch glob).
Args:
name_patt: Name pattern to match (e.g. "pressure*", "KP_1_ZDV_?.*").
Returns:
dict: Dictionary of matching signals by ID.
"""
patt = name_patt.lower()
with _lock:
return {id: _signals[id] for name, id in _name_idx.items() if fnmatch(name, patt)}
def get_all():
"""Get all signals.
Returns:
dict: Copy of all signals by ID.
"""
with _lock:
return dict(_signals)
def subscribe(client_id, queue):
"""Subscribe a client to value updates.
Args:
client_id: Unique client identifier (e.g., address tuple).
queue: Queue to receive IecEvent updates.
"""
with _lock:
_subs[client_id] = queue
def unsubscribe(client_id):
"""Unsubscribe a client from value updates.
Args:
client_id: Client identifier to remove.
"""
with _lock:
if client_id in _subs:
del _subs[client_id]
return SimpleNamespace(
add_signal=add_signal,
update_val=update_val,
get_all=get_all,
get_signal_by_name=get_signal_by_name,
subscribe=subscribe,
unsubscribe=unsubscribe,
get_all_for_gi=get_all_for_gi,
get_signal=get_signal
)
def load_signal(add_signal: Callable, ca: int, fname: str = 'iec_addr.csv') -> None:
"""Load signal configuration from CSV file.
Reads signal definitions from a tab-separated CSV file and adds them
to the signal storage. Filters signals by Common Address (ca).
Args:
add_signal: Function to add a signal (from create_data_storage).
ca: Common Address to filter signals.
fname: Path to CSV configuration file (default: 'signals.csv').
Expected CSV columns:
ca, id, ioa, asdu, name, val, threshold
Example:
>>> load_signal(storage.add_signal, 1, "signals.csv")
"""
with open(fname, mode='r', encoding='utf-8') as f:
reader = csv.DictReader(f, delimiter='\t')
for row in reader:
if ca != int(row['ca']):
continue
asdu = int(row['asdu'])
if len(row['threshold']) > 0:
threshold = float(row['threshold'])
else:
threshold = None
add_signal(
int(row['id']),
int(row['ioa']),
asdu,
row['name'],
threshold
)
def get_val_by_asdu(type_asdu: int, val: str) -> int | float | str :
"""Convert string value to appropriate type based on ASDU.
Args:
type_asdu: ASDU type identifier.
val: String representation of the value.
Returns:
int for integer ASDUs, float for floating-point ASDUs.
Example:
>>> get_val_by_asdu(36, "100.5") # Float ASDU
100.5
>>> get_val_by_asdu(30, "1") # Integer ASDU
1
"""
val = val.strip().replace(',', '.')
if type_asdu in const.INT_ASDU:
return int(val)
elif type_asdu in const.FLOAT_ASDU:
return float(val)
else:
return val
def print_signals(sg_dict: dict) -> None:
"""Print signals in a formatted table.
Args:
sg_dict: Dictionary of signals (ID -> SignalConf) to print.
Example:
>>> storage = create_data_storage()
>>> storage.add_signal(1, 45, 36, "pressure", 0.0, 0.1)
>>> print_signals(storage.get_all())
"""
if len(sg_dict) == 0:
return
header = f"{'ID':<8} | {'IOA':<8} | {'TYPE':<6} | {'Name':<35} | {'Value':<8} | {'Q':<4} | {'IV':<3} | {'Timestamp':<23} | {'Threshold'}"
separator = "-" * len(header)
print('\n' + separator)
print(header)
print(separator)
for row in sorted(sg_dict.keys()):
sg = sg_dict[row]
ts = sg.ts.strftime('%Y-%m-%d %H:%M:%S.%f')[:23] if sg.ts else ''
iv = 'x' if sg.iv else ''
print(f'{row:<8} | {sg.ioa:<8} | {sg.asdu:<6} | {sg.name:<35} | {sg.val:<8} | {sg.q:<4} | {iv:<3} | {ts:<23} | {sg.threshold}')
print(separator)
# ---- Client State Management ----
@dataclass
class ClientState:
"""State object for a connected client.
Maintains all protocol state, synchronization primitives, and
communication resources for a single client connection.
Attributes:
ca: Common Address for this client.
rec_sq: Receive sequence number (N(S) expected).
send_sq: Send sequence number (V(S)).
startdt_confirmed: Whether STARTDT has been confirmed.
stop_event: Event to signal thread termination.
addr: Client address tuple (ip, port).
conn: Connected socket.
log: Logger for this client.
conf: Configuration reference.
last_rec: Last receive time (monotonic).
last_send: Last send time (monotonic).
seq_lock: RLock for sequence number protection.
sock_lock: Lock for socket write serialization.
out_que: Outbound message queue.
on_command: Callback for command processing.
on_gi: Callback for general interrogation.
rec_count_since_send: Count of received I-frames without S-frame.
last_ack_nr: Last acknowledged N(R) from client.
sent_obj: Count of sent ASDU objects (for statistics).
rx_buf: Receive buffer for frame assembly.
"""
ca: int = 0
rec_sq: int = 0
send_sq: int = 0
startdt_confirmed: bool = False
stop_event: Event = field(default_factory=Event)
addr: Optional[tuple] = None
conn: Optional[socket.socket] = None
log: Optional[logging.Logger] = None
conf: Optional['Conf'] = None
# Time for interval calculations - use monotonic to avoid system clock drift
last_rec: float = field(default_factory=time.monotonic)
last_send: float = field(default_factory=time.monotonic)
# Protects protocol sequences (send_sq/rec_sq/ack) and socket writes
seq_lock: RLock = field(default_factory=RLock)
# Serializes TCP socket writes between read/write threads
sock_lock: Lock = field(default_factory=Lock)
out_que: Optional[queue.Queue] = None
on_command: Optional[Callable[[Any, int], None]] = None
on_gi: Optional[Callable[[], Iterable[IecEvent]]] = None
rec_count_since_send: int = 0 # Received I-frames without S-frame (for w-window)
last_ack_nr: int = 0 # Last N(R) from client - acknowledged our I-frames (for k-window)
sent_obj: int = 0 # Counter for sent ASDU objects (for statistics)
rx_buf: bytearray = field(default_factory=bytearray)
session_name: str = ''
on_data: Optional[Callable] = None
def create_client_storage():
"""Create thread-safe storage for client states.
Implements a centralized storage for all connected clients with
thread-safe operations for adding, removing, and listing clients.
Returns:
SimpleNamespace with methods:
get_clients(): Get copy of all clients.
add_client(state): Add a client.
remove_client(addr): Remove a client by address.
close_all(): Close all client connections.
Example:
>>> storage = create_client_storage()
>>> storage.add_client(client_state)
>>> clients = storage.get_clients()
>>> storage.close_all()
"""
_clients = {}
_lock = Lock()
def get_clients():
with _lock:
return _clients.copy()
def add_client(state):
with _lock:
_clients[state.addr] = state
def remove_client(addr):
with _lock:
_clients.pop(addr, None)
def close_all():
with _lock:
for addr, state in list(_clients.items()):
try:
state.conn.close()
state.stop_event.set()
except Exception:
if state.log:
state.log.error(f'Error stopping {addr}')
del _clients[addr]
return SimpleNamespace(
get_clients=get_clients,
add_client=add_client,
remove_client=remove_client,
close_all=close_all
)
def load_connections(path: str = 'config.toml') -> list:
"""Load [[conn]] sections from config TOML file.
Returns:
List of SimpleNamespace(name, ip, port, ca, auto_start, auto_gi).
"""
with open(path, 'rb') as f:
data = tomllib.load(f)
return [
SimpleNamespace(
name=c['name'], ip=c['ip'],
port=c.get('port', 2404), ca=c.get('ca', 1),
auto_start=c.get('auto_start', True),
auto_gi=c.get('auto_gi', True),
)
for c in data.get('conn', [])
]
def load_bus_config(path: str = 'config.toml') -> list:
"""Load [[bus]] sections from config TOML file.
Returns:
List of SimpleNamespace(name, type, host, port, ioa_filter).
"""
with open(path, 'rb') as f:
data = tomllib.load(f)
return [
SimpleNamespace(
name=c.get('name', ''),
type=c['type'],
host=c.get('host', '127.0.0.1'),
port=c.get('port', 9000),
ioa_filter=c.get('ioa_filter'),
)
for c in data.get('bus', [])
]