-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcontrol_server.py
More file actions
310 lines (249 loc) · 8.78 KB
/
control_server.py
File metadata and controls
310 lines (249 loc) · 8.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""
control_server.py - Command handler for IEC 104 server.
This module implements the command-line interface for controlling:
- IEC 104 server (controller simulator)
Commands are entered in the console after starting the server.
Examples:
# Start server
python main.py --server
> clients # Show connected clients
> set 45 100.5 # Set signal value
> imit_rand 60 10 # Start random simulation
> help set # Detailed help for set command
"""
from threading import Thread
from types import SimpleNamespace
from typing import Callable
import common as cm
import imit as im
def _cmd_exit(ctx, _args):
"""
exit - Stop the server.
Terminates the server, closes all connections, and exits.
Example:
> exit
"""
ctx.log.info('Stopping server')
ctx.stop_thread.set()
return True
def _cmd_clients(ctx, _args):
"""
clients - Show list of connected clients.
Displays information about each connected client:
- IP address and port
- STARTDT status
- Send/receive frame counters
Example:
> clients
('192.168.1.100', 2404) startdt=True send_sq=15 rec_sq=12
"""
for addr, state in ctx.cl.get_clients().items():
print(addr, state)
def _cmd_addr(ctx, args):
"""
addr <signal_name> - Show signal value by name.
Arguments:
signal_name : Signal name from configuration
Example:
> addr pressure_1
IOA:45 VALUE:100.50 QUAL:0 TS:2024-01-15 10:23:45
"""
cm.print_signals(ctx.sg.get_signal_by_name(args[0]))
def _cmd_set(ctx, args):
"""
set <value> <id> [quality] [inv_time] - Set signal by ID.
Arguments:
value : Numeric value (float)
id : Signal ID (database number)
quality : Optional, decimal (default 0)
0=good, 128=invalid, 64=not topical, 16=blocked, 32=substituted
inv_time : Optional, 1=invalid timestamp, 0=valid (default 0)
Example:
> set 100.5 45
> set 100.5 45 128
> set 100.5 45 0 1
"""
q = int(args[2]) if len(args) > 2 else 0
iv = bool(int(args[3])) if len(args) > 3 else False
res = ctx.sg.update_val(float(args[0]), id=int(args[1]), q=q, iv=iv)
if res:
cm.print_signals(ctx.sg.get_signal(int(args[1])))
def _cmd_setioa(ctx, args):
"""
setioa <value> <ioa> - Set signal by IOA.
Arguments:
value : Numeric value (float)
ioa : Information Object Address (1-65535)
Example:
> setioa 100.5 45
OK: IOA 45 = 100.5
Note:
Automatically finds signal by IOA.
"""
res = ctx.sg.update_val(float(args[0]), ioa=int(args[1]))
if res:
cm.print_signals(ctx.sg.get_all())
def _cmd_imit_rand(ctx, args):
"""
imit_rand <cnt_time> <cnt_id> - Start random simulation.
Arguments:
cnt_time : Number of time iterations
cnt_id : Number of signals to simulate
Generates random values for signals in range ID 5..100 with step 8.
Example:
> imit_rand 60 10
Simulation started in background
Note:
Simulation runs in a background thread.
"""
cnt_time, cnt_id = int(args[0]), int(args[1])
def run():
list_id = list(range(5, 100, 8))
print(list_id)
for _, sid, val, q in im.imit_rand(cnt_time=cnt_time, cnt_id=cnt_id, list_id=list_id, sleep_s=im.SIM_SLEEP):
ctx.sg.update_val(val, id=sid, q=q)
ctx.log.info('Simulation finished')
Thread(target=run, daemon=True).start()
print('Simulation started in background')
def _cmd_imit_ladder(ctx, args):
"""
imit_ladder <cnt_step> <time_step> <val_step> <val_min> <val_max> <name_sg> - Start ladder simulation.
Arguments:
cnt_step : Number of steps
time_step : Time between steps (seconds)
val_step : Value increment per step
val_min : Minimum value
val_max : Maximum value
name_sg : Signal name to simulate
Gradually changes signal value within specified range.
Example:
> imit_ladder 100 0.5 1.0 0 100 pressure_1
Simulation started in background for 1 signals
Note:
Signal must be analog (ASDU=36).
"""
cnt_step = int(args[0])
time_step, val_step, val_min, val_max = float(args[1]), float(args[2]), float(args[3]), float(args[4])
signals = ctx.sg.get_signal_by_name(args[5])
list_id = [key for key, sg in signals.items() if sg.asdu == 36]
if not list_id:
print('No matching analog signals (ASDU=36) found')
return
def run():
for _, sid, val, q in im.imit_ladder(
cnt_step=cnt_step,
time_step=time_step,
val_step=val_step,
val_min=val_min,
val_max=val_max,
list_id=list_id,
):
ctx.sg.update_val(val, id=sid, q=q)
ctx.log.info('Simulation finished')
Thread(target=run, daemon=True).start()
print(f'Simulation started in background for {len(list_id)} signals')
def _cmd_set_log_level(ctx, args):
"""
log_level <target> <level> - Set logging level.
Arguments:
target : 'file' or 'console'
level : DEBUG, INFO, WARNING, ERROR, CRITICAL
Example:
> log_level console DEBUG
CONSOLE level changed to DEBUG for all
Note:
Sets the level for all log handlers.
"""
target = args[0].lower()
level_str = args[1].upper()
level_int = getattr(cm.logging, level_str, None)
if level_int is None or target not in ('file', 'console'):
return
logger = ctx.log
for hdl in logger.handlers:
if target == 'file' and isinstance(hdl, cm.logging.FileHandler):
hdl.setLevel(level_str)
print(f"FILE level changed to {level_str}")
elif target == 'console' and type(hdl) is cm.logging.StreamHandler:
hdl.setLevel(level_str)
print(f"CONSOLE level changed to {level_str}")
def _cmd_help(ctx, _args):
"""
help - Show list of available commands.
Displays all commands with argument count indicators.
For detailed help on a specific command, use:
help <command>
Example:
> help
exit
clients
set <arg1> <arg2> ...
...
> help set
"""
print("\n=== Available server commands ===\n")
for name, (n, _) in COMMANDS.items():
print(f" {name}" + (" <arg1> <arg2> ..." if n else ""))
print("\nFor command help: help <command>\n")
COMMANDS = {
"exit": (0, _cmd_exit),
"clients": (0, _cmd_clients),
"addr": (1, _cmd_addr),
"set": (2, _cmd_set),
"setioa": (2, _cmd_setioa),
"imit_rand": (2, _cmd_imit_rand),
"imit_ladder": (6, _cmd_imit_ladder),
"log_level": (2, _cmd_set_log_level),
"help": (0, _cmd_help),
}
def server_handler(stop_thread: Callable, cl: Callable, sg: Callable, log, prompt_id: str = "KP ?"):
"""
Command-line handler for the server.
Runs an infinite loop reading commands from stdin and executing them.
Supports commands from COMMANDS dictionary and help <command>.
Args:
stop_thread: threading.Event to stop the loop
cl: Client storage object (client_storage)
sg: Signal storage object (data_storage)
log: Logger instance
prompt_id: Identifier for the input prompt (default: "KP ?")
"""
ctx = SimpleNamespace(stop_thread=stop_thread, cl=cl, sg=sg, log=log)
prompt = f"Server KP_{prompt_id}> "
while not stop_thread.is_set():
try:
line = input(prompt).strip().lower()
except EOFError:
log.info('Input closed, stopping server')
stop_thread.set()
return
except Exception as e:
log.exception('Input error: %s', e)
continue
if not line:
continue
parts = line.split()
cmd_name, args = parts[0], parts[1:]
if cmd_name == 'help' and args:
cmd_help = args[0]
if cmd_help in COMMANDS:
_, handler = COMMANDS[cmd_help]
print(handler.__doc__ or f"Help for {cmd_help} not found")
else:
print(f"Unknown command: {cmd_help}")
continue
entry = COMMANDS.get(cmd_name)
if entry is None:
log.info('Unknown command: %s', cmd_name)
print('Unknown command. help — list of commands.')
continue
n_args, handler = entry
if len(args) < n_args:
print(f'Expected at least {n_args} args for {cmd_name}, got {len(args)}. help — list of commands.')
continue
try:
if handler(ctx, args):
return
except Exception as e:
log.exception('Error executing command %s: %s', cmd_name, e)
print('Error:', e)