From 76766e9f960c238a686ecdd2e73aa61da156da4f Mon Sep 17 00:00:00 2001 From: jparisu Date: Thu, 29 Jun 2023 12:51:30 +0200 Subject: [PATCH] Try to use numpy with ctypes to pass data from python to C++ (not working) Signed-off-by: jparisu --- .../amlip_cpp/types/GenericDataType.hpp | 10 + .../amlip_bytes_demo/computing_node.py | 122 ++++++++++++ .../amlip_bytes_demo/datatypes.py | 63 ++++++ .../amlip_bytes_demo/main_node.py | 183 ++++++++++++++++++ 4 files changed, 378 insertions(+) create mode 100644 amlip_demo_nodes/amlip_bytes_demo/computing_node.py create mode 100644 amlip_demo_nodes/amlip_bytes_demo/datatypes.py create mode 100644 amlip_demo_nodes/amlip_bytes_demo/main_node.py diff --git a/amlip_cpp/include/amlip_cpp/types/GenericDataType.hpp b/amlip_cpp/include/amlip_cpp/types/GenericDataType.hpp index b763589b..d6f78753 100644 --- a/amlip_cpp/include/amlip_cpp/types/GenericDataType.hpp +++ b/amlip_cpp/include/amlip_cpp/types/GenericDataType.hpp @@ -126,11 +126,21 @@ class GenericDataType : public InterfaceDataType */ AMLIP_CPP_DllAPI void* data() const; + AMLIP_CPP_DllAPI void set_data(void* data) + { + data_ = data; + } + /*! * @brief Return value of attribute \c data__size_ */ AMLIP_CPP_DllAPI uint32_t data_size() const; + AMLIP_CPP_DllAPI void set_data_size(uint32_t size) + { + data_size_ = size; + } + /*! * @brief This function returns the name of this specific data type */ diff --git a/amlip_demo_nodes/amlip_bytes_demo/computing_node.py b/amlip_demo_nodes/amlip_bytes_demo/computing_node.py new file mode 100644 index 00000000..88183a1b --- /dev/null +++ b/amlip_demo_nodes/amlip_bytes_demo/computing_node.py @@ -0,0 +1,122 @@ +# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import signal +import numpy as np + +from amlip_swig import GenericDataType + +from amlip_py.node.AsyncComputingNode import AsyncComputingNode + +import datatypes + + +DESCRIPTION = """Script to execute an Asynchronous Computing Node. (Close with C^)""" +USAGE = ('python3 computing_node.py ' + '[-d ] [-n ]') + + +def parse_options(): + """ + Parse arguments. + + :return: The arguments parsed. + """ + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + add_help=True, + description=(DESCRIPTION), + usage=(USAGE) + ) + parser.add_argument( + '-d', + '--domain', + type=int, + default=0, + help='DDS Domain ID.' + ) + parser.add_argument( + '-n', + '--name', + type=str, + default='AML-ComputingNode', + help='Node name.' + ) + + return parser.parse_args() + + +def main(): + # Parse arguments + args = parse_options() + + domain = args.domain + node_name = args.name + + # Prepare callback to call when new job received + def process_job(job, task_id, client_id): + + print( + f'Computing Node {node_name} preparing solution for task <{task_id}> ' + f'from client <{client_id}>.\n') + solution = datatypes.FileStatistics(job.to_string()) + print( + f' Solution : <{solution}>.', + end='\n\n') + + solution_bytes = solution.tobytes() + solution_numpy = np.frombuffer(solution_bytes) + solution_gdt = GenericDataType(solution_numpy, len(solution_numpy)) + # TODO + + solution_gdt = GenericDataType(str(solution_bytes)) + + return solution_gdt + + # Create node + node = AsyncComputingNode( + node_name, + callback=process_job, + domain=domain) + print( + f'Computing Node {node.id()} ready.', + end='\n\n') + + # Run + node.run() + print( + f'Node {node.id()} already processing jobs. Waiting SIGINT (C^)...', + end='\n\n') + + # Wait for signal to stop + def handler(signum, frame): + print( + ' Signal received, stopping.', + end='\n\n') + signal.signal(signal.SIGINT, handler) + signal.pause() + + # Stopping node + node.stop() + + # Closing + print( + f'Computing Node {node.id()} closing.', + end='\n\n') + + +# Call main in program execution +if __name__ == '__main__': + main() diff --git a/amlip_demo_nodes/amlip_bytes_demo/datatypes.py b/amlip_demo_nodes/amlip_bytes_demo/datatypes.py new file mode 100644 index 00000000..efac56f1 --- /dev/null +++ b/amlip_demo_nodes/amlip_bytes_demo/datatypes.py @@ -0,0 +1,63 @@ +# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from amlip_py.types.JobSolutionDataType import JobSolutionDataType +from amlip_py.types.JobDataType import JobDataType +import pickle + + +class Book(): + + def __init__( + self, + filename): + self.filename = filename + + def tobytes(self) -> bytearray: + with open(self.filename, 'rb') as file: + return bytearray(file.read()) + + +def get_keys_with_max_values(dictionary, n): + sorted_items = sorted(dictionary.items(), key=lambda x: x[1], reverse=True) + max_keys = [item for item in sorted_items[:n]] + return max_keys + + +class FileStatistics(): + + def __init__( + self, + txt: str): + + # Count number of lines + self.num_lines = len(txt.split('\n')) + + # Count number of chars + self.size_bytes = len(txt) + + # Count appearance of each work + self.word_counts = {} + for word in txt.lower().split(): + self.word_counts[word] = self.word_counts.get(word, 0) + 1 + + def tobytes(self) -> bytes: + pickle.dumps(self) + + def __str__(self) -> str: + st = 'FileStatistics{' + st += f'lines:{self.num_lines}' + st += f' ; bytes:{self.size_bytes} ; ' + st += str(get_keys_with_max_values(self.word_counts, 3)) + return st diff --git a/amlip_demo_nodes/amlip_bytes_demo/main_node.py b/amlip_demo_nodes/amlip_bytes_demo/main_node.py new file mode 100644 index 00000000..2a257a03 --- /dev/null +++ b/amlip_demo_nodes/amlip_bytes_demo/main_node.py @@ -0,0 +1,183 @@ +# Copyright 2023 Proyectos y Sistemas de Mantenimiento SL (eProsima). +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import signal +import pickle +import numpy as np +import ctypes + +from py_utils.debugging.debug_utils import debug_variable_introspection +from py_utils.logging.log_utils import logging +from py_utils.wait.IntWaitHandler import IntWaitHandler +from py_utils.wait.WaitHandler import AwakeReason + +from amlip_swig import GenericDataType + +from amlip_py.node.AsyncMainNode import AsyncMainNode + +import datatypes + + +DESCRIPTION = """Script to execute an Asynchronous Main Node""" +USAGE = ('python3 main_node.py ' + '[-d ] [-n ] [ ...]') + + +def parse_options(): + """ + Parse arguments. + + :return: The arguments parsed. + """ + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + add_help=True, + description=(DESCRIPTION), + usage=(USAGE) + ) + parser.add_argument( + '-d', + '--domain', + type=int, + default=0, + help='DDS Domain ID.' + ) + parser.add_argument( + '-n', + '--name', + type=str, + default='AML-MainNode', + help='Node name.' + ) + parser.add_argument( + 'args', + nargs='*', + default=[ + 'resources/test.txt', + 'resources/caperucita.txt', + 'resources/lorem_ipsum.txt', + 'resources/lazarillo_de_tormes.txt', + 'resources/don_quijote_de_la_mancha.txt', + ], + help='File name to read and send job.' + ) + + return parser.parse_args() + + +def main(): + + # Parse arguments + args = parse_options() + + domain = args.domain + node_name = args.name + jobs = args.args + n_jobs = len(jobs) + tasks = {} + + # Create Wait Handler so process waits for all solutions + waiter = IntWaitHandler(True) + + # Prepare callback to call when solution received + def process_solution_received(solution, task_id, server_id): + filename = tasks[task_id] + + print (f' === RECEIVED SOLUTION for {filename}') + print (f'{solution}') + # debug_variable_introspection(solution, debug_level=logging.INFO) + # debug_variable_introspection(generic_type_view(solution), debug_level=logging.INFO) + + buffer = ctypes.c_ubyte * solution.data_size() + buffer = buffer.from_address(int(solution.data().get_buffer())) + solution_numpy = np.frombuffer(solution.data(), dtype=bytes) + statistics = pickle.loads(solution_numpy) + + print( + f'Main Node {node_name} received solution for task <{task_id}>:<{filename}> ' + f'from server <{server_id}> :\n' + f' <{statistics}>.', + end='\n\n') + waiter.increase() + + # Create Node + node = AsyncMainNode( + node_name, + callback=process_solution_received, + domain=domain) + print( + f'Main Node {node.id()} ready.', + end='\n\n') + + # Iterate while arguments do not run out + while jobs: + + # If arguments given, pop first one currently available + filename = jobs.pop(0) + + # Send task and awaits for the solution + job_book = datatypes.Book(filename) + job_bytes = job_book.tobytes() + # debug_variable_introspection(job_bytes, debug_level=logging.INFO) + job_numpy = np.frombuffer(job_bytes, dtype=np.uint8, count=len(job_bytes)) + # job_numpy = np.frombuffer(job_bytes, count=len(job_bytes)) + # job_gdt = GenericDataType(job_numpy.flatten().tolist(), len(job_numpy)) + + job_gdt = GenericDataType() + job_gdt.set_data_size(len(job_bytes)) + + buffer = ctypes.c_ubyte * len(job_bytes) + buffer = buffer.from_buffer(job_bytes) + buffer_void = ctypes.cast(buffer, ctypes.c_void_p) + + job_gdt.set_data(buffer_void) + + task_id = node.request_job_solution(job_gdt) + print( + f'Main Node {node.id()} sent task <{filename}> with task id <{task_id}>.', + end='\n\n') + tasks[task_id] = filename + + # Waiting for solutions + print( + f'Main Node {node.id()} waiting for solutions or SIGINT (C^).', + end='\n\n') + + # If signal arrive, disable waiter + def handler(signum, frame): + print( + ' Signal received, stopping.', + end='\n\n') + waiter.disable() + signal.signal(signal.SIGINT, handler) + + reason = waiter.wait_greater_equal(n_jobs) + + # Closing + if reason == AwakeReason.disabled: + print( + f'Main Node {node.id()} closed with ' + f'{n_jobs-waiter.get_value()} solutions remaining. Closing.', + end='\n\n') + + elif reason == AwakeReason.condition_met: + print( + f'Main Node {node.id()} received all solutions. Closing.', + end='\n\n') + + +# Call main in program execution +if __name__ == '__main__': + main()