-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbuilders.py
More file actions
75 lines (69 loc) · 2.85 KB
/
builders.py
File metadata and controls
75 lines (69 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from __future__ import annotations
from config import Config
from executor import BaseExecutor
from executor import OnDeviceMpcExecutor
from executor import RawActionExecutor
from robot_io import AirbotActuator
from robot_io import AirbotRealObserver
from robot_io import BaseActuator
from robot_io import BaseObserver
from robot_io import MockStateObserver
from robot_io import NoopActuator
from robot_io import create_airbot_robot
def _build_shared_airbot_robot(cfg: Config):
if cfg.observer.name != "airbot_real":
return None
obs = cfg.observer
ex = cfg.executor
if (
obs.airbot_host != ex.airbot_host
or int(obs.airbot_left_port) != int(ex.airbot_left_port)
or int(obs.airbot_right_port) != int(ex.airbot_right_port)
):
raise ValueError(
"observer/executor airbot endpoint mismatch; MPC runtime requires a shared robot connection"
)
robot = create_airbot_robot(
airbot_host=obs.airbot_host,
left_port=int(obs.airbot_left_port),
right_port=int(obs.airbot_right_port),
)
robot.connect()
return robot
def build_observer(cfg: Config, pending_actions_provider, shared_robot=None) -> BaseObserver:
observer_map = {
"mock": MockStateObserver,
"airbot_real": AirbotRealObserver,
}
observer_cls = observer_map.get(cfg.observer.name)
if observer_cls is None:
raise ValueError(f"Unsupported observer.name={cfg.observer.name!r}")
if observer_cls is AirbotRealObserver:
return observer_cls.from_config(cfg, pending_actions_provider, robot=shared_robot)
return observer_cls.from_config(cfg, pending_actions_provider)
def _build_actuator(cfg: Config, shared_robot=None) -> BaseActuator:
observer_cls = {
"mock": NoopActuator,
"airbot_real": AirbotActuator,
}
observer_cls = observer_cls.get(cfg.observer.name)
if observer_cls is None:
raise ValueError(f"Unsupported observer.name={cfg.observer.name!r}")
if observer_cls is AirbotActuator:
return observer_cls.from_config(cfg, robot=shared_robot)
return observer_cls.from_config(cfg)
def build_executor(cfg: Config, shared_robot=None) -> BaseExecutor:
executor_map = {
"raw_action": RawActionExecutor,
"ondevice_mpc": OnDeviceMpcExecutor,
}
executor_cls = executor_map.get(cfg.executor.name)
if executor_cls is None:
raise ValueError(f"Unsupported executor.name={cfg.executor.name!r}")
actuator = _build_actuator(cfg, shared_robot=shared_robot)
return executor_cls.from_config(cfg, actuator)
def build_runtime_components(cfg: Config, pending_actions_provider):
shared_robot = _build_shared_airbot_robot(cfg)
executor = build_executor(cfg, shared_robot=shared_robot)
observer = build_observer(cfg, pending_actions_provider, shared_robot=shared_robot)
return executor, observer