Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions amlip_cpp/include/amlip_cpp/types/GenericDataType.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,21 @@ class GenericDataType : public InterfaceDataType
*/
AMLIP_CPP_DllAPI void* data() const;

AMLIP_CPP_DllAPI void set_data(void* data)
{
data_ = data;
}

/*!
* @brief Return value of attribute \c data__size_
*/
AMLIP_CPP_DllAPI uint32_t data_size() const;

AMLIP_CPP_DllAPI void set_data_size(uint32_t size)
{
data_size_ = size;
}

/*!
* @brief This function returns the name of this specific data type
*/
Expand Down
122 changes: 122 additions & 0 deletions amlip_demo_nodes/amlip_bytes_demo/computing_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import signal
import numpy as np

from amlip_swig import GenericDataType

from amlip_py.node.AsyncComputingNode import AsyncComputingNode

import datatypes


DESCRIPTION = """Script to execute an Asynchronous Computing Node. (Close with C^)"""
USAGE = ('python3 computing_node.py '
'[-d <domain>] [-n <name>]')


def parse_options():
"""
Parse arguments.

:return: The arguments parsed.
"""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=True,
description=(DESCRIPTION),
usage=(USAGE)
)
parser.add_argument(
'-d',
'--domain',
type=int,
default=0,
help='DDS Domain ID.'
)
parser.add_argument(
'-n',
'--name',
type=str,
default='AML-ComputingNode',
help='Node name.'
)

return parser.parse_args()


def main():
# Parse arguments
args = parse_options()

domain = args.domain
node_name = args.name

# Prepare callback to call when new job received
def process_job(job, task_id, client_id):

print(
f'Computing Node {node_name} preparing solution for task <{task_id}> '
f'from client <{client_id}>.\n')
solution = datatypes.FileStatistics(job.to_string())
print(
f' Solution : <{solution}>.',
end='\n\n')

solution_bytes = solution.tobytes()
solution_numpy = np.frombuffer(solution_bytes)
solution_gdt = GenericDataType(solution_numpy, len(solution_numpy))
# TODO

solution_gdt = GenericDataType(str(solution_bytes))

return solution_gdt

# Create node
node = AsyncComputingNode(
node_name,
callback=process_job,
domain=domain)
print(
f'Computing Node {node.id()} ready.',
end='\n\n')

# Run
node.run()
print(
f'Node {node.id()} already processing jobs. Waiting SIGINT (C^)...',
end='\n\n')

# Wait for signal to stop
def handler(signum, frame):
print(
' Signal received, stopping.',
end='\n\n')
signal.signal(signal.SIGINT, handler)
signal.pause()

# Stopping node
node.stop()

# Closing
print(
f'Computing Node {node.id()} closing.',
end='\n\n')


# Call main in program execution
if __name__ == '__main__':
main()
63 changes: 63 additions & 0 deletions amlip_demo_nodes/amlip_bytes_demo/datatypes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from amlip_py.types.JobSolutionDataType import JobSolutionDataType
from amlip_py.types.JobDataType import JobDataType
import pickle


class Book():

def __init__(
self,
filename):
self.filename = filename

def tobytes(self) -> bytearray:
with open(self.filename, 'rb') as file:
return bytearray(file.read())


def get_keys_with_max_values(dictionary, n):
sorted_items = sorted(dictionary.items(), key=lambda x: x[1], reverse=True)
max_keys = [item for item in sorted_items[:n]]
return max_keys


class FileStatistics():

def __init__(
self,
txt: str):

# Count number of lines
self.num_lines = len(txt.split('\n'))

# Count number of chars
self.size_bytes = len(txt)

# Count appearance of each work
self.word_counts = {}
for word in txt.lower().split():
self.word_counts[word] = self.word_counts.get(word, 0) + 1

def tobytes(self) -> bytes:
pickle.dumps(self)

def __str__(self) -> str:
st = 'FileStatistics{'
st += f'lines:{self.num_lines}'
st += f' ; bytes:{self.size_bytes} ; '
st += str(get_keys_with_max_values(self.word_counts, 3))
return st
183 changes: 183 additions & 0 deletions amlip_demo_nodes/amlip_bytes_demo/main_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima).
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import signal
import pickle
import numpy as np
import ctypes

from py_utils.debugging.debug_utils import debug_variable_introspection
from py_utils.logging.log_utils import logging
from py_utils.wait.IntWaitHandler import IntWaitHandler
from py_utils.wait.WaitHandler import AwakeReason

from amlip_swig import GenericDataType

from amlip_py.node.AsyncMainNode import AsyncMainNode

import datatypes


DESCRIPTION = """Script to execute an Asynchronous Main Node"""
USAGE = ('python3 main_node.py '
'[-d <domain>] [-n <name>] [<file1> <file2> ...]')


def parse_options():
"""
Parse arguments.

:return: The arguments parsed.
"""
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
add_help=True,
description=(DESCRIPTION),
usage=(USAGE)
)
parser.add_argument(
'-d',
'--domain',
type=int,
default=0,
help='DDS Domain ID.'
)
parser.add_argument(
'-n',
'--name',
type=str,
default='AML-MainNode',
help='Node name.'
)
parser.add_argument(
'args',
nargs='*',
default=[
'resources/test.txt',
'resources/caperucita.txt',
'resources/lorem_ipsum.txt',
'resources/lazarillo_de_tormes.txt',
'resources/don_quijote_de_la_mancha.txt',
],
help='File name to read and send job.'
)

return parser.parse_args()


def main():

# Parse arguments
args = parse_options()

domain = args.domain
node_name = args.name
jobs = args.args
n_jobs = len(jobs)
tasks = {}

# Create Wait Handler so process waits for all solutions
waiter = IntWaitHandler(True)

# Prepare callback to call when solution received
def process_solution_received(solution, task_id, server_id):
filename = tasks[task_id]

print (f' === RECEIVED SOLUTION for {filename}')
print (f'{solution}')
# debug_variable_introspection(solution, debug_level=logging.INFO)
# debug_variable_introspection(generic_type_view(solution), debug_level=logging.INFO)

buffer = ctypes.c_ubyte * solution.data_size()
buffer = buffer.from_address(int(solution.data().get_buffer()))
solution_numpy = np.frombuffer(solution.data(), dtype=bytes)
statistics = pickle.loads(solution_numpy)

print(
f'Main Node {node_name} received solution for task <{task_id}>:<{filename}> '
f'from server <{server_id}> :\n'
f' <{statistics}>.',
end='\n\n')
waiter.increase()

# Create Node
node = AsyncMainNode(
node_name,
callback=process_solution_received,
domain=domain)
print(
f'Main Node {node.id()} ready.',
end='\n\n')

# Iterate while arguments do not run out
while jobs:

# If arguments given, pop first one currently available
filename = jobs.pop(0)

# Send task and awaits for the solution
job_book = datatypes.Book(filename)
job_bytes = job_book.tobytes()
# debug_variable_introspection(job_bytes, debug_level=logging.INFO)
job_numpy = np.frombuffer(job_bytes, dtype=np.uint8, count=len(job_bytes))
# job_numpy = np.frombuffer(job_bytes, count=len(job_bytes))
# job_gdt = GenericDataType(job_numpy.flatten().tolist(), len(job_numpy))

job_gdt = GenericDataType()
job_gdt.set_data_size(len(job_bytes))

buffer = ctypes.c_ubyte * len(job_bytes)
buffer = buffer.from_buffer(job_bytes)
buffer_void = ctypes.cast(buffer, ctypes.c_void_p)

job_gdt.set_data(buffer_void)

task_id = node.request_job_solution(job_gdt)
print(
f'Main Node {node.id()} sent task <{filename}> with task id <{task_id}>.',
end='\n\n')
tasks[task_id] = filename

# Waiting for solutions
print(
f'Main Node {node.id()} waiting for solutions or SIGINT (C^).',
end='\n\n')

# If signal arrive, disable waiter
def handler(signum, frame):
print(
' Signal received, stopping.',
end='\n\n')
waiter.disable()
signal.signal(signal.SIGINT, handler)

reason = waiter.wait_greater_equal(n_jobs)

# Closing
if reason == AwakeReason.disabled:
print(
f'Main Node {node.id()} closed with '
f'{n_jobs-waiter.get_value()} solutions remaining. Closing.',
end='\n\n')

elif reason == AwakeReason.condition_met:
print(
f'Main Node {node.id()} received all solutions. Closing.',
end='\n\n')


# Call main in program execution
if __name__ == '__main__':
main()