Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added adf_core_python/cli/__init__.py
Empty file.
53 changes: 53 additions & 0 deletions adf_core_python/cli/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import os

import click

NAME_PLACEHOLDER = "team_name"


@click.command()
@click.option(
"--name", prompt="Your agent team name", help="The name of your agent team"
)
def cli(name: str) -> None:
# load template dir and create a new agent team
click.echo(f"Creating a new agent team with name: {name}")
# 自身がいるディレクトリを取得
template_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)), "template")
# コマンドラインのカレントディレクトリを取得
current_dir = os.getcwd()

_copy_template(
template_dir,
current_dir,
name,
)


def _copy_template(
src: str,
dest: str,
name: str,
) -> None:
if os.path.isdir(src):
if not os.path.exists(dest):
os.makedirs(dest)
for item in os.listdir(src):
s = os.path.join(src, item)
d = os.path.join(
dest,
item.replace(NAME_PLACEHOLDER, name),
)
_copy_template(s, d, name)
else:
with open(src, "r") as f:
content = f.read()
with open(dest, "w") as f:
f.write(content.replace(NAME_PLACEHOLDER, name))
new_dest = dest.replace(NAME_PLACEHOLDER, name)
if new_dest != dest:
os.rename(dest, new_dest)


if __name__ == "__main__":
cli()
3 changes: 3 additions & 0 deletions adf_core_python/cli/template/config/development.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"test": "test"
}
35 changes: 35 additions & 0 deletions adf_core_python/cli/template/config/launcher.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
kernel:
host: localhost
port: 27931

team:
name: team_name

adf:
launcher:
precompute: 0
debug:
flag: 0
agent:
moduleconfig:
filename: config/module.yaml

develop:
flag: 1
filename: config/development.json

team:
platoon:
ambulance:
count: 100
fire:
count: 100
police:
count: 100
office:
ambulance:
count: -1
fire:
count: -1
police:
count: -1
113 changes: 113 additions & 0 deletions adf_core_python/cli/template/config/module.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
## DefaultTacticsAmbulanceTeam
DefaultTacticsAmbulanceTeam:
HumanDetector: src.team_name.module.complex.sample_human_detector.SampleHumanDetector
Search: src.team_name.module.complex.sample_search.SampleSearch
ExtendActionTransport: adf_core_python.implement.action.default_extend_action_transport.DefaultExtendActionTransport
ExtendActionMove: adf_core_python.implement.action.default_extend_action_move.DefaultExtendActionMove
CommandExecutorAmbulance: adf_core_python.implement.centralized.DefaultCommandExecutorAmbulance
CommandExecutorScout: adf_core_python.implement.centralized.DefaultCommandExecutorScout

# ## DefaultTacticsFireBrigade
DefaultTacticsFireBrigade:
HumanDetector: src.team_name.module.complex.sample_human_detector.SampleHumanDetector
Search: src.team_name.module.complex.sample_search.SampleSearch
ExtendActionRescue: adf_core_python.implement.action.default_extend_action_rescue.DefaultExtendActionRescue
ExtendActionMove: adf_core_python.implement.action.default_extend_action_move.DefaultExtendActionMove
CommandExecutorFire: adf_core_python.implement.centralized.DefaultCommandExecutorFire
CommandExecutorScout: adf_core_python.implement.centralized.DefaultCommandExecutorScout

# ## DefaultTacticsPoliceForce
DefaultTacticsPoliceForce:
RoadDetector: src.team_name.module.complex.sample_road_detector.SampleRoadDetector
Search: src.team_name.module.complex.sample_search.SampleSearch
ExtendActionClear: adf_core_python.implement.action.default_extend_action_clear.DefaultExtendActionClear
ExtendActionMove: adf_core_python.implement.action.default_extend_action_move.DefaultExtendActionMove
CommandExecutorPolice: adf_core_python.implement.centralized.DefaultCommandExecutorPolice
CommandExecutorScout: adf_core_python.implement.centralized.DefaultCommandExecutorScoutPolice

# ## DefaultTacticsAmbulanceCentre
# DefaultTacticsAmbulanceCentre:
# TargetAllocator: sample_team.module.complex.SampleAmbulanceTargetAllocator
# CommandPicker: adf_core_python.implement.centralized.DefaultCommandPickerAmbulance

# ## DefaultTacticsFireStation
# DefaultTacticsFireStation:
# TargetAllocator: sample_team.module.complex.SampleFireTargetAllocator
# CommandPicker: adf_core_python.implement.centralized.DefaultCommandPickerFire

# ## DefaultTacticsPoliceOffice
# DefaultTacticsPoliceOffice:
# TargetAllocator: sample_team.module.complex.SamplePoliceTargetAllocator
# CommandPicker: adf_core_python.implement.centralized.DefaultCommandPickerPolice

## SampleSearch
SampleSearch:
PathPlanning: adf_core_python.implement.module.algorithm.a_star_path_planning.AStarPathPlanning
Clustering: adf_core_python.implement.module.algorithm.k_means_clustering.KMeansClustering

# ## SampleBuildDetector
# SampleBuildingDetector:
# Clustering: adf_core_python.implement.module.algorithm.KMeansClustering

# ## SampleRoadDetector
# SampleRoadDetector:
# Clustering: adf_core_python.implement.module.algorithm.KMeansClustering
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning

# ## SampleHumanDetector
# SampleHumanDetector:
# Clustering: adf_core_python.implement.module.algorithm.KMeansClustering

# ## DefaultExtendActionClear
# DefaultExtendActionClear:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning

# ## DefaultExtendActionFireFighting
# DefaultExtendActionFireFighting:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning

# ## DefaultExtendActionRescue
# DefaultExtendActionRescue:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning

# ## DefaultExtendActionMove
# DefaultExtendActionMove:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning

# ## DefaultExtendActionTransport
# DefaultExtendActionTransport:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning
# ## DefaultCommandExecutorAmbulance
# DefaultCommandExecutorAmbulance:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning
# ExtendActionTransport: adf_core_python.implement.action.DefaultExtendActionTransport
# ExtendActionMove: adf_core_python.implement.action.DefaultExtendActionMove

# ## DefaultCommandExecutorFire
# DefaultCommandExecutorFire:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning
# EtxActionFireRescue: adf_core_python.implement.action.DefaultExtendActionRescue
# EtxActionFireFighting: adf_core_python.implement.action.DefaultExtendActionFireFighting
# ExtendActionMove: adf_core_python.implement.action.DefaultExtendActionMove

# ## DefaultCommandExecutorPolice
# DefaultCommandExecutorPolice:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning
# ExtendActionClear: adf_core_python.implement.action.DefaultExtendActionClear
# ExtendActionMove: adf_core_python.implement.action.DefaultExtendActionMove

# ## DefaultCommandExecutorScout
# DefaultCommandExecutorScout:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning

# ## DefaultCommandExecutorScoutPolice
# DefaultCommandExecutorScoutPolice:
# PathPlanning: adf_core_python.implement.module.algorithm.DijkstraPathPlanning
# ExtendActionClear: adf_core_python.implement.action.DefaultExtendActionClear

# ## MessageManager
MessageManager:
PlatoonChannelSubscriber: adf_core_python.implement.module.communication.default_channel_subscriber.DefaultChannelSubscriber
CenterChannelSubscriber: adf_core_python.implement.module.communication.default_channel_subscriber.DefaultChannelSubscriber
PlatoonMessageCoordinator: adf_core_python.implement.module.communication.default_message_coordinator.DefaultMessageCoordinator
CenterMessageCoordinator: adf_core_python.implement.module.communication.default_message_coordinator.DefaultMessageCoordinator
5 changes: 5 additions & 0 deletions adf_core_python/cli/template/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from adf_core_python.launcher import Launcher

if __name__ == "__main__":
launcher = Launcher("./config/launcher.yaml")
launcher.launch()
Empty file.
Empty file.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
from typing import Optional, cast

from rcrs_core.connection.URN import Entity as EntityURN
from rcrs_core.entities.civilian import Civilian
from rcrs_core.entities.entity import Entity
from rcrs_core.entities.human import Human
from rcrs_core.worldmodel.entityID import EntityID

from adf_core_python.core.agent.develop.develop_data import DevelopData
from adf_core_python.core.agent.info.agent_info import AgentInfo
from adf_core_python.core.agent.info.scenario_info import ScenarioInfo
from adf_core_python.core.agent.info.world_info import WorldInfo
from adf_core_python.core.agent.module.module_manager import ModuleManager
from adf_core_python.core.component.module.algorithm.clustering import Clustering
from adf_core_python.core.component.module.complex.human_detector import HumanDetector
from adf_core_python.core.logger.logger import get_agent_logger


class SampleHumanDetector(HumanDetector):
def __init__(
self,
agent_info: AgentInfo,
world_info: WorldInfo,
scenario_info: ScenarioInfo,
module_manager: ModuleManager,
develop_data: DevelopData,
) -> None:
super().__init__(
agent_info, world_info, scenario_info, module_manager, develop_data
)
self._clustering: Clustering = cast(
Clustering,
module_manager.get_module(
"DefaultHumanDetector.Clustering",
"adf_core_python.implement.module.algorithm.k_means_clustering.KMeansClustering",
),
)
self.register_sub_module(self._clustering)

self._result: Optional[EntityID] = None
self._logger = get_agent_logger(
f"{self.__class__.__module__}.{self.__class__.__qualname__}",
self._agent_info,
)

def calculate(self) -> HumanDetector:
transport_human: Optional[Human] = self._agent_info.some_one_on_board()
if transport_human is not None:
self._result = transport_human.get_id()
return self

if self._result is not None:
if not self._is_valid_human(self._result):
self._result = None

if self._result is None:
self._result = self._select_target()

return self

def _select_target(self) -> Optional[EntityID]:
if self._result is not None and self._is_valid_human(self._result):
return self._result

cluster_index: int = self._clustering.get_cluster_index(
self._agent_info.get_entity_id()
)
cluster_entities: list[Entity] = self._clustering.get_cluster_entities(
cluster_index
)

cluster_valid_human_entities: list[Entity] = [
entity
for entity in cluster_entities
if self._is_valid_human(entity.get_id()) and isinstance(entity, Civilian)
]
if len(cluster_valid_human_entities) != 0:
nearest_human_entity = cluster_valid_human_entities[0]
nearest_distance = self._world_info.get_distance(
self._agent_info.get_entity_id(),
nearest_human_entity.get_id(),
)
for entity in cluster_valid_human_entities:
distance = self._world_info.get_distance(
self._agent_info.get_entity_id(),
entity.get_id(),
)
if distance < nearest_distance:
nearest_distance = distance
nearest_human_entity = entity
return nearest_human_entity.get_id()

world_valid_human_entities: list[Entity] = [
entity
for entity in self._world_info.get_entities_of_types([Civilian])
if self._is_valid_human(entity.get_id())
]
if len(world_valid_human_entities) != 0:
nearest_human_entity = world_valid_human_entities[0]
nearest_distance = self._world_info.get_distance(
self._agent_info.get_entity_id(),
nearest_human_entity.get_id(),
)
for entity in world_valid_human_entities:
distance = self._world_info.get_distance(
self._agent_info.get_entity_id(),
entity.get_id(),
)
if distance < nearest_distance:
nearest_distance = distance
nearest_human_entity = entity
return nearest_human_entity.get_id()

return None

def _is_valid_human(self, target_entity_id: EntityID) -> bool:
target: Optional[Entity] = self._world_info.get_entity(target_entity_id)
if target is None:
return False
if not isinstance(target, Human):
return False
hp: Optional[int] = target.get_hp()
if hp is None or hp <= 0:
return False
buriedness: Optional[int] = target.get_buriedness()
if buriedness is None:
return False
myself = self._agent_info.get_myself()
if myself.get_urn() == EntityURN.FIRE_BRIGADE and buriedness == 0:
return False
if myself.get_urn() == EntityURN.AMBULANCE_TEAM and buriedness > 0:
return False
damage: Optional[int] = target.get_damage()
if damage is None or damage == 0:
return False
position_entity_id: Optional[EntityID] = target.get_position()
if position_entity_id is None:
return False
position: Optional[Entity] = self._world_info.get_entity(position_entity_id)
if position is None:
return False
urn: EntityURN = position.get_urn()
if urn == EntityURN.REFUGE or urn == EntityURN.AMBULANCE_TEAM:
return False

return True

def get_target_entity_id(self) -> Optional[EntityID]:
return self._result
Loading