diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 8358e447..3263c5ea 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -153,6 +153,9 @@ jobs: - name: filesystem-full expected: "Directory listing for" path: generic/filesystem-full + - name: blinky + expected: "blink (count: 1, state: -)" + path: generic/blinky # Add here more samples steps: - name: Checkout current repository diff --git a/README.md b/README.md index 3c7e270c..5c256690 100644 --- a/README.md +++ b/README.md @@ -20,18 +20,19 @@ Our mission is to make it as easy to develop and securely deploy apps for the bi Ocre supports a range of features depending on the platform. The table below summarizes the current support: -| Feature | Zephyr (native_sim, b_u585i_iot02a) | Linux x86_64 | +| Feature | Zephyr (native_sim, b_u585i_iot02a) | Linux | |------------------------|:-----------------------------------:|:-------------:| | Ocre Runtime | ✅ | ✅ | -| Container Messaging | ✅ | ❌ | +| Container Messaging | ✅ | ✅ | | GPIO | ✅ | ❌ | -| Timers | ✅ | ❌ | +| Timers | ✅ | ✅ | | Sensors | ✅ | ❌ | -| Networking | ❌ | ✅ | +| Networking | ✅ | ✅ | +| Filesystem | ✅ | ✅ | | Interactive Shell | ✅ | ❌ | - **Zephyr**: Full feature set, including hardware integration and shell. -- **Linux x86_64**: Core runtime and networking only; hardware and shell features are not available. +- **Linux**: Core runtime and multiple I/O features; hardware and shell features are not available. --- diff --git a/src/ocre/api/ocre_common.c b/src/ocre/api/ocre_common.c index 6d26d16e..f8362fdd 100644 --- a/src/ocre/api/ocre_common.c +++ b/src/ocre/api/ocre_common.c @@ -7,39 +7,39 @@ #include #include "ocre_core_external.h" -#include -#include -#include -#include -#include #include #include +#include +#include +#include +#include + LOG_MODULE_DECLARE(ocre_cs_component, OCRE_LOG_LEVEL); -#include -#include "../../../../../wasm-micro-runtime/core/iwasm/include/lib_export.h" -#include "bh_log.h" -#include "../ocre_timers/ocre_timer.h" + +#ifdef CONFIG_OCRE_GPIO #include "../ocre_gpio/ocre_gpio.h" +#endif + +#ifdef CONFIG_OCRE_CONTAINER_MESSAGING #include "../ocre_messaging/ocre_messaging.h" -#include "ocre_common.h" -#include -// Place queue buffer in a dedicated section with alignments -#define SIZE_OCRE_EVENT_BUFFER 32 -__attribute__((section(".noinit.ocre_event_queue"), - aligned(8))) char ocre_event_queue_buffer[SIZE_OCRE_EVENT_BUFFER * sizeof(ocre_event_t)]; -char *ocre_event_queue_buffer_ptr = ocre_event_queue_buffer; // Pointer for validation +#endif -K_MSGQ_DEFINE(ocre_event_queue, sizeof(ocre_event_t), SIZE_OCRE_EVENT_BUFFER, 4); -bool ocre_event_queue_initialized = false; -struct k_spinlock ocre_event_queue_lock; +#include "ocre_common.h" typedef struct module_node { ocre_module_context_t ctx; - sys_snode_t node; + core_snode_t node; } module_node_t; -static sys_slist_t module_registry; -static struct k_mutex registry_mutex; +static core_slist_t module_registry; +static core_mutex_t registry_mutex; + +#define SIZE_OCRE_EVENT_BUFFER 32 + +/* Unified event queue implementation */ +core_eventq_t ocre_event_queue; +bool ocre_event_queue_initialized = false; +core_spinlock_t ocre_event_queue_lock; static struct cleanup_handler { ocre_resource_type_t type; @@ -95,22 +95,24 @@ int ocre_get_event(wasm_exec_env_t exec_env, uint32_t type_offset, uint32_t id_o } ocre_event_t event; - k_spinlock_key_t key = k_spin_lock(&ocre_event_queue_lock); - int ret = k_msgq_peek(&ocre_event_queue, &event); + int ret; + + /* Generic event queue implementation for both platforms */ + core_spinlock_key_t key = core_spinlock_lock(&ocre_event_queue_lock); + ret = core_eventq_peek(&ocre_event_queue, &event); if (ret != 0) { - // k_msg_peek returns either 0, or -ENOMSG if empty - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); return -ENOMSG; } if (event.owner != module_inst) { - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); return -EPERM; } - ret = k_msgq_get(&ocre_event_queue, &event, K_FOREVER); + ret = core_eventq_get(&ocre_event_queue, &event); if (ret != 0) { - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); return -ENOENT; } @@ -163,12 +165,12 @@ int ocre_get_event(wasm_exec_env_t exec_env, uint32_t type_offset, uint32_t id_o ================================= */ default: { - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); LOG_ERR("Invalid event type: %u", event.type); return -EINVAL; } } - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); return 0; } @@ -178,21 +180,28 @@ int ocre_common_init(void) { LOG_INF("Common system already initialized"); return 0; } - k_mutex_init(®istry_mutex); - sys_slist_init(&module_registry); - if ((uintptr_t)ocre_event_queue_buffer_ptr % 4 != 0) { - LOG_ERR("ocre_event_queue_buffer misaligned: %p", (void *)ocre_event_queue_buffer_ptr); - return -EINVAL; - } - k_msgq_init(&ocre_event_queue, ocre_event_queue_buffer, sizeof(ocre_event_t), 64); + + core_mutex_init(®istry_mutex); + core_slist_init(&module_registry); + + core_eventq_init(&ocre_event_queue, sizeof(ocre_event_t), SIZE_OCRE_EVENT_BUFFER); + + /* Purge any stale events from queue */ ocre_event_t dummy; - while (k_msgq_get(&ocre_event_queue, &dummy, K_NO_WAIT) == 0) { + while (core_eventq_get(&ocre_event_queue, &dummy) == 0) { LOG_INF("Purged stale event from queue"); } + + /* Temporary platform-specific initialization */ +#ifdef __ZEPHYR__ + /* No additional Zephyr-specific initialization needed */ +#else /* POSIX */ + pthread_mutex_init(&ocre_event_queue_lock.mutex, NULL); +#endif + ocre_event_queue_initialized = true; #if EVENT_THREAD_POOL_SIZE > 0 - LOG_INF("ocre_event_queue initialized at %p, size=%d, buffer=%p", (void *)&ocre_event_queue, sizeof(ocre_event_t), - (void *)ocre_event_queue_buffer_ptr); + LOG_INF("ocre_event_queue initialized at %p, size=%d", (void *)&ocre_event_queue, sizeof(ocre_event_t)); for (int i = 0; i < EVENT_THREAD_POOL_SIZE; i++) { event_args[i].index = i; char thread_name[16]; @@ -208,6 +217,7 @@ int ocre_common_init(void) { #endif initialized = true; common_initialized = true; + LOG_INF("OCRE common initialized successfully"); return 0; } @@ -241,7 +251,7 @@ int ocre_register_module(wasm_module_inst_t module_inst) { LOG_ERR("Null module instance"); return -EINVAL; } - module_node_t *node = k_malloc(sizeof(module_node_t)); + module_node_t *node = core_malloc(sizeof(module_node_t)); if (!node) { LOG_ERR("Failed to allocate module node"); return -ENOMEM; @@ -250,16 +260,18 @@ int ocre_register_module(wasm_module_inst_t module_inst) { node->ctx.exec_env = wasm_runtime_create_exec_env(module_inst, OCRE_WASM_STACK_SIZE); if (!node->ctx.exec_env) { LOG_ERR("Failed to create exec env for module %p", (void *)module_inst); - k_free(node); + core_free(node); return -ENOMEM; } node->ctx.in_use = true; - node->ctx.last_activity = k_uptime_get_32(); + node->ctx.last_activity = core_uptime_get(); memset(node->ctx.resource_count, 0, sizeof(node->ctx.resource_count)); memset(node->ctx.dispatchers, 0, sizeof(node->ctx.dispatchers)); - k_mutex_lock(®istry_mutex, K_FOREVER); - sys_slist_append(&module_registry, &node->node); - k_mutex_unlock(®istry_mutex); + + core_mutex_lock(®istry_mutex); + core_slist_append(&module_registry, &node->node); + core_mutex_unlock(®istry_mutex); + LOG_INF("Module registered: %p", (void *)module_inst); return 0; } @@ -269,21 +281,24 @@ void ocre_unregister_module(wasm_module_inst_t module_inst) { LOG_ERR("Null module instance"); return; } - k_mutex_lock(®istry_mutex, K_FOREVER); + + core_mutex_lock(®istry_mutex); module_node_t *node, *tmp; - SYS_SLIST_FOR_EACH_CONTAINER_SAFE(&module_registry, node, tmp, node) { + module_node_t *prev = NULL; + CORE_SLIST_FOR_EACH_CONTAINER_SAFE(&module_registry, node, tmp, node) { if (node->ctx.inst == module_inst) { ocre_cleanup_module_resources(module_inst); if (node->ctx.exec_env) { wasm_runtime_destroy_exec_env(node->ctx.exec_env); } - sys_slist_remove(&module_registry, NULL, &node->node); - k_free(node); + core_slist_remove(&module_registry, prev ? &prev->node : NULL, &node->node); + core_free(node); LOG_INF("Module unregistered: %p", (void *)module_inst); break; } + prev = node; } - k_mutex_unlock(®istry_mutex); + core_mutex_unlock(®istry_mutex); } ocre_module_context_t *ocre_get_module_context(wasm_module_inst_t module_inst) { @@ -291,16 +306,16 @@ ocre_module_context_t *ocre_get_module_context(wasm_module_inst_t module_inst) { LOG_ERR("Null module instance"); return NULL; } - k_mutex_lock(®istry_mutex, K_FOREVER); - module_node_t *node; - SYS_SLIST_FOR_EACH_CONTAINER(&module_registry, node, node) { + core_mutex_lock(®istry_mutex); + for (core_snode_t *current = module_registry.head; current != NULL; current = current->next) { + module_node_t *node = (module_node_t *)((char *)current - offsetof(module_node_t, node)); if (node->ctx.inst == module_inst) { - node->ctx.last_activity = k_uptime_get_32(); - k_mutex_unlock(®istry_mutex); + node->ctx.last_activity = core_uptime_get(); + core_mutex_unlock(®istry_mutex); return &node->ctx; } } - k_mutex_unlock(®istry_mutex); + core_mutex_unlock(®istry_mutex); LOG_ERR("Module context not found for %p", (void *)module_inst); return NULL; } @@ -311,7 +326,9 @@ int ocre_register_dispatcher(wasm_exec_env_t exec_env, ocre_resource_type_t type function_name ? function_name : "null"); return -EINVAL; } + wasm_module_inst_t module_inst = current_module_tls ? *current_module_tls : wasm_runtime_get_module_inst(exec_env); + if (!module_inst) { LOG_ERR("No module instance for event type %d", type); return -EINVAL; @@ -327,9 +344,9 @@ int ocre_register_dispatcher(wasm_exec_env_t exec_env, ocre_resource_type_t type LOG_ERR("Function %s not found in module %p", function_name, (void *)module_inst); return -EINVAL; } - k_mutex_lock(®istry_mutex, K_FOREVER); + core_mutex_lock(®istry_mutex); ctx->dispatchers[type] = func; - k_mutex_unlock(®istry_mutex); + core_mutex_unlock(®istry_mutex); LOG_INF("Registered dispatcher for type %d: %s", type, function_name); return 0; } @@ -342,9 +359,9 @@ uint32_t ocre_get_resource_count(wasm_module_inst_t module_inst, ocre_resource_t void ocre_increment_resource_count(wasm_module_inst_t module_inst, ocre_resource_type_t type) { ocre_module_context_t *ctx = ocre_get_module_context(module_inst); if (ctx && type < OCRE_RESOURCE_TYPE_COUNT) { - k_mutex_lock(®istry_mutex, K_FOREVER); + core_mutex_lock(®istry_mutex); ctx->resource_count[type]++; - k_mutex_unlock(®istry_mutex); + core_mutex_unlock(®istry_mutex); LOG_INF("Incremented resource count: type=%d, count=%d", type, ctx->resource_count[type]); } } @@ -352,9 +369,9 @@ void ocre_increment_resource_count(wasm_module_inst_t module_inst, ocre_resource void ocre_decrement_resource_count(wasm_module_inst_t module_inst, ocre_resource_type_t type) { ocre_module_context_t *ctx = ocre_get_module_context(module_inst); if (ctx && type < OCRE_RESOURCE_TYPE_COUNT && ctx->resource_count[type] > 0) { - k_mutex_lock(®istry_mutex, K_FOREVER); + core_mutex_lock(®istry_mutex); ctx->resource_count[type]--; - k_mutex_unlock(®istry_mutex); + core_mutex_unlock(®istry_mutex); LOG_INF("Decremented resource count: type=%d, count=%d", type, ctx->resource_count[type]); } } diff --git a/src/ocre/api/ocre_common.h b/src/ocre/api/ocre_common.h index 6f0c6671..95be03a9 100644 --- a/src/ocre/api/ocre_common.h +++ b/src/ocre/api/ocre_common.h @@ -8,9 +8,10 @@ #ifndef OCRE_COMMON_H #define OCRE_COMMON_H -#include -#include #include +#include +#include +#include "ocre_core_external.h" #include "../ocre_messaging/ocre_messaging.h" #define OCRE_EVENT_THREAD_STACK_SIZE 2048 @@ -18,17 +19,15 @@ #define OCRE_WASM_STACK_SIZE 16384 #define EVENT_THREAD_POOL_SIZE 0 - extern bool common_initialized; extern bool ocre_event_queue_initialized; extern __thread wasm_module_inst_t *current_module_tls; - - -extern struct k_msgq ocre_event_queue; // Defined in ocre_common.c -extern bool ocre_event_queue_initialized; // Defined in ocre_common.c -extern struct k_spinlock ocre_event_queue_lock; // Defined in ocre_common.c extern char *ocre_event_queue_buffer_ptr; // Defined in ocre_common.c +/* External declarations for unified event queue */ +extern core_eventq_t ocre_event_queue; // Defined in ocre_common.c +extern core_spinlock_t ocre_event_queue_lock; // Defined in ocre_common.c + /** * @brief Enumeration of OCRE resource types. @@ -205,4 +204,4 @@ int ocre_get_event(wasm_exec_env_t exec_env, uint32_t type_offset, uint32_t id_o void ocre_common_shutdown(void); -#endif /* OCRE_COMMON_H */ \ No newline at end of file +#endif /* OCRE_COMMON_H */ diff --git a/src/ocre/components/container_supervisor/cs_sm_impl.c b/src/ocre/components/container_supervisor/cs_sm_impl.c index 8b96f4a6..c86a4fdc 100644 --- a/src/ocre/components/container_supervisor/cs_sm_impl.c +++ b/src/ocre/components/container_supervisor/cs_sm_impl.c @@ -128,10 +128,9 @@ static void container_thread_entry(void *args) { #endif // Run the WASM main function bool success = wasm_application_execute_main(module_inst, 0, NULL); - // Update container status - container->container_runtime_status = success ? CONTAINER_STATUS_STOPPED : CONTAINER_STATUS_ERROR; - + if (container->container_runtime_status != CONTAINER_STATUS_STOPPED) + container->container_runtime_status = success ? CONTAINER_STATUS_STOPPED : CONTAINER_STATUS_ERROR; // Cleanup sequence core_mutex_lock(&container->lock); { @@ -302,7 +301,7 @@ ocre_container_status_t CS_create_container(ocre_container_t *container) { if (container->container_runtime_status != CONTAINER_STATUS_UNKNOWN && container->container_runtime_status != CONTAINER_STATUS_DESTROYED) { LOG_ERR("Cannot create container again container with ID: %d, already exists", curr_container_ID); - return RUNTIME_STATUS_ERROR; + return CONTAINER_STATUS_ERROR; } if (!validate_container_memory(container)) { diff --git a/src/ocre/container_messaging_posix/messaging.c b/src/ocre/container_messaging_posix/messaging.c deleted file mode 100644 index c3d0ba24..00000000 --- a/src/ocre/container_messaging_posix/messaging.c +++ /dev/null @@ -1,280 +0,0 @@ -/** - * @copyright Copyright © contributors to Project Ocre, - * which has been established as Project Ocre a Series of LF Projects, LLC - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include -#include -#include "messaging.h" -#include "ocre/utils/utils.h" - -#define MAX_MSG_SIZE 4096 - -#include -#include -#include -#include -#include -#include -#include -#include - -#define QUEUE_SIZE 10 -#define STACK_SIZE 1024 -#define PRIORITY 5 -#define WASM_STACK_SIZE (8 * 1024) - -static mqd_t ocre_msg_queue; -char mq_name[32]; -static pthread_t subscriber_thread_data; -static struct mq_attr mq_attr = { - .mq_flags = 0, - .mq_maxmsg = QUEUE_SIZE, - .mq_msgsize = MAX_MSG_SIZE, - .mq_curmsgs = 0 -}; - -#define CONFIG_MESSAGING_MAX_SUBSCRIPTIONS 10 - -void cleanup_messaging() { - mq_close(ocre_msg_queue); - mq_unlink(mq_name); - exit(0); // Terminate the process after cleanup -} - - -// Structure to hold the subscription information -typedef struct { - char *topic; - wasm_function_inst_t handler; - wasm_module_inst_t module_inst; -} ocre_subscription_t; - -static ocre_subscription_t subscriptions[CONFIG_MESSAGING_MAX_SUBSCRIPTIONS]; -static int subscription_count = 0; -static bool msg_system_is_init = false; - -static uint32_t allocate_wasm_memory(wasm_module_inst_t module_inst, const void *src, size_t size) { - void *native_addr = NULL; - uint64_t wasm_ptr = wasm_runtime_module_malloc(module_inst, size, &native_addr); - if (!wasm_ptr || !native_addr) { - LOG_ERR("Failed to allocate memory in WASM"); - return 0; - } - if (src) { - memcpy(native_addr, src, size); - } - return (uint32_t)wasm_ptr; -} - -static void free_wasm_message(wasm_module_inst_t module_inst, uint32_t *ptr_array, uint16_t count) { - for (uint16_t i = 0; i < count; i++) { - if (ptr_array[i]) { - wasm_runtime_module_free(module_inst, ptr_array[i]); - } - } -} - -void* subscriber_thread(void *arg) { - wasm_runtime_init_thread_env(); - uint8_t buffer[MAX_MSG_SIZE]; - while (1) { - ssize_t msg_size = mq_receive(ocre_msg_queue, (char *)buffer, sizeof(buffer), NULL); - if (msg_size >= (ssize_t)sizeof(ocre_msg_t)) { - LOG_INF("Got a message from another container"); - ocre_msg_t *msg = (ocre_msg_t *)buffer; - char *topic = (char *)(buffer + msg->topic); - char *content_type = (char *)(buffer + msg->content_type); - void *payload = (void *)(buffer + msg->payload); - - for (int i = 0; i < subscription_count; i++) { - if (strcmp(subscriptions[i].topic, topic) == 0) { - wasm_module_inst_t module_inst = subscriptions[i].module_inst; - if (!module_inst) { - LOG_ERR("Invalid module instance"); - continue; - } - - // Create exec_env for this thread/call - wasm_exec_env_t exec_env = wasm_runtime_create_exec_env(module_inst, WASM_STACK_SIZE); - if (!exec_env) { - LOG_ERR("Failed to create exec_env"); - continue; - } - - uint32_t topic_offset = - (uint32_t)wasm_runtime_module_dup_data(module_inst, topic, strlen(topic) + 1); - if (topic_offset == 0) { - LOG_ERR("Failed to allocate memory for topic in WASM"); - continue; - } - uint32_t content_offset = (uint32_t)wasm_runtime_module_dup_data(module_inst, content_type, strlen(content_type) + 1); - if (content_offset == 0) { - LOG_ERR("Failed to allocate memory for content_type in WASM"); - wasm_runtime_module_free(module_inst, topic_offset); - continue; - } - uint32_t payload_offset = - (uint32_t)wasm_runtime_module_dup_data(module_inst, payload, msg->payload_len); - if (payload_offset == 0) { - LOG_ERR("Failed to allocate memory for payload in WASM"); - wasm_runtime_module_free(module_inst, topic_offset); - wasm_runtime_module_free(module_inst, content_offset); - continue; - } - - uint32_t args[5] = {msg->mid, topic_offset, content_offset, payload_offset, msg->payload_len}; - if (!wasm_runtime_call_wasm(exec_env, subscriptions[i].handler, 5, args)) { - const char *error = wasm_runtime_get_exception(module_inst); - LOG_ERR("Failed to call WASM function: %s", error ? error : "Unknown error"); - } else { - LOG_INF("Function executed successfully"); - } - - // Free memory and exec_env - wasm_runtime_module_free(module_inst, topic_offset); - wasm_runtime_module_free(module_inst, content_offset); - wasm_runtime_module_free(module_inst, payload_offset); - wasm_runtime_destroy_exec_env(exec_env); - } - } - } else { - core_sleep_ms(1000); - } - } - printf("do I get here?\n"); - wasm_runtime_destroy_thread_env(); - return NULL; -} - -void ocre_msg_system_init() { - if (msg_system_is_init) { - LOG_WRN("Messaging System is already initialized"); - return; - } - - // POSIX: Create message queue and thread - snprintf(mq_name, sizeof(mq_name), "/ocre_msgq_%d", getpid()); - ocre_msg_queue = mq_open(mq_name, O_CREAT | O_RDWR, 0644, &mq_attr); - if (ocre_msg_queue == (mqd_t)-1) { - perror("Failed to create message queue"); - return; - } - if (pthread_create(&subscriber_thread_data, NULL, subscriber_thread, NULL) != 0) { - perror("Failed to create subscriber thread"); - mq_close(ocre_msg_queue); - mq_unlink(mq_name); // Clean up if thread creation fails - return; - } - msg_system_is_init = true; -} - -int ocre_publish_message(wasm_exec_env_t exec_env, char *topic, char *content_type, void *payload, int payload_len) { - LOG_INF("---------------_ocre_publish_message: topic: %s [%zu], content_type: %s, payload_len: %u ", topic, - strlen(topic), content_type, payload_len); - - if (!msg_system_is_init) { - LOG_ERR("Messaging system not initialized"); - return -1; - } - - if (!topic || (topic[0] == '\0')) { - LOG_ERR("topic is NULL or empty, please check function parameters!"); - return -1; - } - - if (!content_type || (content_type[0] == '\0')) { - LOG_ERR("content_type is NULL or empty, please check function parameters!"); - return -1; - } - - if (!payload || payload_len == 0) { - LOG_ERR("payload is NULL or payload_len is 0, please check function parameters!"); - return -1; - } - - static uint64_t message_id = 0; - - size_t topic_len = strlen(topic) + 1; - size_t content_type_len = strlen(content_type) + 1; - size_t total_size = sizeof(ocre_msg_t) + topic_len + content_type_len + payload_len; - - if (total_size > MAX_MSG_SIZE) { - LOG_ERR("Message too large for queue: %zu > %d", total_size, MAX_MSG_SIZE); - return -1; - } - - uint8_t *buffer = malloc(total_size); - if (!buffer) { - LOG_ERR("Failed to allocate message buffer"); - return -1; - } - - ocre_msg_t *msg = (ocre_msg_t *)buffer; - msg->mid = message_id++; - msg->topic = sizeof(ocre_msg_t); - msg->content_type = msg->topic + topic_len; - msg->payload = msg->content_type + content_type_len; - msg->payload_len = payload_len; - - memcpy(buffer + msg->topic, topic, topic_len); - memcpy(buffer + msg->content_type, content_type, content_type_len); - memcpy(buffer + msg->payload, payload, payload_len); - - if (mq_send(ocre_msg_queue, (const char *)buffer, total_size, 0) != 0) { - perror("Message queue is full or error occurred"); - free(buffer); - return -1; - } - free(buffer); - - return 0; -} - -int ocre_subscribe_message(wasm_exec_env_t exec_env, char *topic, char *handler_name) { - LOG_INF("---------------_ocre_subscribe_message---------------"); - - if (!msg_system_is_init) { - LOG_ERR("Messaging system not initialized"); - return -1; - } - - signal(SIGINT, cleanup_messaging); - signal(SIGTERM, cleanup_messaging); - - if (subscription_count < CONFIG_MESSAGING_MAX_SUBSCRIPTIONS) { - - if (!topic || (topic[0] == '\0')) { - LOG_ERR("topic is NULL or empty, please check function parameters!"); - return -1; - } - - if (!handler_name || (handler_name[0] == '\0')) { - LOG_ERR("handler_name is NULL or empty, please check function parameters!"); - return -1; - } - - wasm_module_inst_t current_module_inst = wasm_runtime_get_module_inst(exec_env); - wasm_function_inst_t handler_function = wasm_runtime_lookup_function(current_module_inst, handler_name); - if (!handler_function) { - LOG_ERR("Failed to find %s in WASM module", handler_name); - return -1; - } - - subscriptions[subscription_count].topic = topic; - subscriptions[subscription_count].handler = handler_function; - subscriptions[subscription_count].module_inst = current_module_inst; - - LOG_INF("WASM messaging callback function set successfully"); - - subscription_count++; - } else { - LOG_ERR("Maximum subscriptions reached, could not subscribe to topic"); - return -1; - } - - return 0; -} diff --git a/src/ocre/container_messaging_posix/messaging.h b/src/ocre/container_messaging_posix/messaging.h deleted file mode 100644 index 2bf77b2c..00000000 --- a/src/ocre/container_messaging_posix/messaging.h +++ /dev/null @@ -1,56 +0,0 @@ - -/** - * @copyright Copyright © contributors to Project Ocre, - * which has been established as Project Ocre a Series of LF Projects, LLC - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#ifndef CONTAINER_MESSAGING_POSIX_H -#define CONTAINER_MESSAGING_POSIX_H -#include "wasm_export.h" - -/** - * @typedef ocre_msg - * - * Structure of ocre messages - * - */ -typedef struct ocre_msg { - // message id - increments on each message - uint64_t mid; - // url of the request - uint32_t topic; // offset from start of buffer - // payload format (MIME type??) - uint32_t content_type; // offset from start of buffer - // payload of the request - uint32_t payload; // offset from start of buffer - // length in bytes of the payload - int32_t payload_len; -} ocre_msg_t; - -/** - * @brief Initialize OCRE Messaging System. - * - */ -void ocre_msg_system_init(); - -/** - * @brief Publish a message to the specified target. - * - * @param topic the name of the topic on which to publish the message - * @param content_type the content type of the message; it is recommended to use a MIME type - * @param payload a buffer containing the message contents - * @param payload_len the length of the payload buffer - */ -int ocre_publish_message(wasm_exec_env_t exec_env, char *topic, char *content_type, void *payload, int payload_len); - -/** - * @brief Subscribe to messages on the specified topic. - * - * @param topic the name of the topic on which to subscribe - * @param handler_name name of callback function that will be called when a message is received on this topic - */ -int ocre_subscribe_message(wasm_exec_env_t exec_env, char *topic, char *handler_name); - -#endif /* CONTAINER_MESSAGING_POSIX_H */ diff --git a/src/ocre/ocre_gpio/ocre_gpio.c b/src/ocre/ocre_gpio/ocre_gpio.c index f15a90a0..63004048 100644 --- a/src/ocre/ocre_gpio/ocre_gpio.c +++ b/src/ocre/ocre_gpio/ocre_gpio.c @@ -266,10 +266,10 @@ void ocre_gpio_cleanup_container(wasm_module_inst_t module_inst) { gpio_pins[i].in_use = 0; gpio_pins[i].owner = NULL; ocre_decrement_resource_count(module_inst, OCRE_RESOURCE_TYPE_GPIO); - LOG_INF("Cleaned up GPIO pin %d", i); + LOG_DBG("Cleaned up GPIO pin %d", i); } } - LOG_INF("Cleaned up GPIO resources for module %p", (void *)module_inst); + LOG_DBG("Cleaned up GPIO resources for module %p", (void *)module_inst); } void ocre_gpio_set_dispatcher(wasm_exec_env_t exec_env) { @@ -370,14 +370,14 @@ static void gpio_callback_handler(const struct device *port, struct gpio_callbac event.data.gpio_event.port = gpio_pins[i].port_idx; event.data.gpio_event.state = (uint32_t)state; event.owner = gpio_pins[i].owner; - k_spinlock_key_t key = k_spin_lock(&ocre_event_queue_lock); - if (k_msgq_put(&ocre_event_queue, &event, K_NO_WAIT) != 0) { + core_spinlock_key_t key = core_spinlock_lock(&ocre_event_queue_lock); + if (core_eventq_put(&ocre_event_queue, &event) != 0) { LOG_ERR("Failed to queue GPIO event for pin %d", i); } else { LOG_INF("Queued GPIO event for pin %d (port=%d, pin=%d), state=%d", i, gpio_pins[i].port_idx, gpio_pins[i].pin_number, state); } - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); } } } @@ -688,4 +688,4 @@ int ocre_gpio_wasm_unregister_callback_by_name(wasm_exec_env_t exec_env, const c LOG_INF("Unregistering callback by name: %s, global_pin=%d", name, global_pin); return ocre_gpio_unregister_callback(global_pin); -} \ No newline at end of file +} diff --git a/src/ocre/ocre_messaging/ocre_messaging.c b/src/ocre/ocre_messaging/ocre_messaging.c index c97da479..29cfe5c2 100644 --- a/src/ocre/ocre_messaging/ocre_messaging.c +++ b/src/ocre/ocre_messaging/ocre_messaging.c @@ -6,55 +6,150 @@ */ #include +#include "ocre_core_external.h" #include #include -#include -#include -#include #include LOG_MODULE_DECLARE(ocre_cs_component, OCRE_LOG_LEVEL); -#define MESSAGING_MAX_SUBSCRIPTIONS CONFIG_MESSAGING_MAX_SUBSCRIPTIONS +#ifndef CONFIG_MESSAGING_MAX_SUBSCRIPTIONS +#define CONFIG_MESSAGING_MAX_SUBSCRIPTIONS 10 +#endif +#define OCRE_MAX_TOPIC_LEN 64 -// Structure to hold the subscription information +/* Messaging subscription structure */ typedef struct { - void *topic; // Topic pointer + char topic[OCRE_MAX_TOPIC_LEN]; wasm_module_inst_t module_inst; bool is_active; -} messaging_subscription_t; +} ocre_messaging_subscription_t; typedef struct { - messaging_subscription_t info[MESSAGING_MAX_SUBSCRIPTIONS]; - uint16_t subscriptions_number; -} messaging_subscription_list; + ocre_messaging_subscription_t subscriptions[CONFIG_MESSAGING_MAX_SUBSCRIPTIONS]; + uint16_t subscription_count; + core_mutex_t mutex; +} ocre_messaging_system_t; -static messaging_subscription_list subscription_list = {0}; +static ocre_messaging_system_t messaging_system = {0}; static bool messaging_system_initialized = false; +/* Initialize messaging system */ int ocre_messaging_init(void) { if (messaging_system_initialized) { LOG_INF("Messaging system already initialized"); - return -1; + return 0; } + if (!common_initialized && ocre_common_init() != 0) { LOG_ERR("Failed to initialize common subsystem"); - return -2; + return -EAGAIN; } - - memset(&subscription_list, 0, sizeof(messaging_subscription_list)); + + memset(&messaging_system, 0, sizeof(ocre_messaging_system_t)); + + core_mutex_init(&messaging_system.mutex); + ocre_register_cleanup_handler(OCRE_RESOURCE_TYPE_MESSAGING, ocre_messaging_cleanup_container); messaging_system_initialized = true; - LOG_INF("Messaging system initialized"); return 0; } -int ocre_messaging_publish(wasm_exec_env_t exec_env, void *topic, void *content_type, void *payload, int payload_len) { +/* Cleanup messaging resources for a module */ +void ocre_messaging_cleanup_container(wasm_module_inst_t module_inst) { + if (!messaging_system_initialized || !module_inst) { + return; + } + + core_mutex_lock(&messaging_system.mutex); + + for (int i = 0; i < CONFIG_MESSAGING_MAX_SUBSCRIPTIONS; i++) { + if (messaging_system.subscriptions[i].is_active && + messaging_system.subscriptions[i].module_inst == module_inst) { + messaging_system.subscriptions[i].is_active = false; + messaging_system.subscriptions[i].module_inst = NULL; + messaging_system.subscriptions[i].topic[0] = '\0'; + messaging_system.subscription_count--; + ocre_decrement_resource_count(module_inst, OCRE_RESOURCE_TYPE_MESSAGING); + LOG_DBG("Cleaned up subscription %d for module %p", i, (void *)module_inst); + } + } + + core_mutex_unlock(&messaging_system.mutex); + + LOG_DBG("Cleaned up messaging resources for module %p", (void *)module_inst); +} + +/* Subscribe to a topic */ +int ocre_messaging_subscribe(wasm_exec_env_t exec_env, void *topic) { if (!messaging_system_initialized) { - LOG_ERR("Messaging system not initialized"); + if (ocre_messaging_init() != 0) { + LOG_ERR("Failed to initialize messaging system"); + return -EINVAL; + } + } + + if (!topic || ((char *)topic)[0] == '\0') { + LOG_ERR("Topic is NULL or empty"); + return -EINVAL; + } + + wasm_module_inst_t module_inst = wasm_runtime_get_module_inst(exec_env); + if (!module_inst) { + LOG_ERR("No module instance for exec_env"); + return -EINVAL; + } + + ocre_module_context_t *ctx = ocre_get_module_context(module_inst); + if (!ctx) { + LOG_ERR("Module context not found for module instance %p", (void *)module_inst); return -EINVAL; } + + core_mutex_lock(&messaging_system.mutex); + + // Check if already subscribed + for (int i = 0; i < CONFIG_MESSAGING_MAX_SUBSCRIPTIONS; i++) { + if (messaging_system.subscriptions[i].is_active && + messaging_system.subscriptions[i].module_inst == module_inst && + strcmp(messaging_system.subscriptions[i].topic, (char *)topic) == 0) { + LOG_INF("Already subscribed to topic: %s", (char *)topic); + core_mutex_unlock(&messaging_system.mutex); + return 0; + } + } + + // Find a free slot + for (int i = 0; i < CONFIG_MESSAGING_MAX_SUBSCRIPTIONS; i++) { + if (!messaging_system.subscriptions[i].is_active) { + strncpy(messaging_system.subscriptions[i].topic, (char *)topic, OCRE_MAX_TOPIC_LEN - 1); + messaging_system.subscriptions[i].topic[OCRE_MAX_TOPIC_LEN - 1] = '\0'; + messaging_system.subscriptions[i].module_inst = module_inst; + messaging_system.subscriptions[i].is_active = true; + messaging_system.subscription_count++; + ocre_increment_resource_count(module_inst, OCRE_RESOURCE_TYPE_MESSAGING); + LOG_INF("Subscribed to topic: %s, module: %p", (char *)topic, (void *)module_inst); + core_mutex_unlock(&messaging_system.mutex); + return 0; + } + } + + core_mutex_unlock(&messaging_system.mutex); + + LOG_ERR("No free subscription slots available"); + return -ENOMEM; +} + +/* Publish a message */ +int ocre_messaging_publish(wasm_exec_env_t exec_env, void *topic, void *content_type, void *payload, int payload_len) { + if (!messaging_system_initialized) { + if (ocre_messaging_init() != 0) { + LOG_ERR("Failed to initialize messaging system"); + return -EINVAL; + } + } + if (!topic || ((char *)topic)[0] == '\0') { LOG_ERR("Topic is NULL or empty"); return -EINVAL; @@ -67,52 +162,52 @@ int ocre_messaging_publish(wasm_exec_env_t exec_env, void *topic, void *content_ LOG_ERR("Payload is NULL or payload_len is invalid"); return -EINVAL; } - - if ((uintptr_t)ocre_event_queue_buffer_ptr % 4 != 0) { - LOG_ERR("ocre_event_queue_buffer misaligned: %p", (void *)ocre_event_queue_buffer_ptr); - return -EPIPE; - } - wasm_module_inst_t module_inst = wasm_runtime_get_module_inst(exec_env); - if (!module_inst) { + + wasm_module_inst_t publisher_module = wasm_runtime_get_module_inst(exec_env); + if (!publisher_module) { LOG_ERR("No module instance for exec_env"); return -EINVAL; } + static uint32_t message_id = 0; - bool posted = false; - - // Iterate through subscriptions to find matching topics - for (int i = 0; i < MESSAGING_MAX_SUBSCRIPTIONS; i++) { - if (!subscription_list.info[i].is_active) { + bool message_sent = false; + + core_mutex_lock(&messaging_system.mutex); + + // Find matching subscriptions + for (int i = 0; i < CONFIG_MESSAGING_MAX_SUBSCRIPTIONS; i++) { + if (!messaging_system.subscriptions[i].is_active) { continue; } - - char *subscribed_topic = (char *)subscription_list.info[i].topic; + + // Check if the published topic matches the subscription (prefix match) + const char *subscribed_topic = messaging_system.subscriptions[i].topic; size_t subscribed_len = strlen(subscribed_topic); - + if (strncmp(subscribed_topic, (char *)topic, subscribed_len) != 0) { continue; // No prefix match } - - wasm_module_inst_t target_module = subscription_list.info[i].module_inst; + + wasm_module_inst_t target_module = messaging_system.subscriptions[i].module_inst; if (!target_module) { LOG_ERR("Invalid module instance for subscription %d", i); continue; } - - // Allocate WASM memory for topic, content_type, and payload - uint32_t topic_offset = - (uint32_t)wasm_runtime_module_dup_data(target_module, (char *)topic, strlen((char *)topic) + 1); + + // Allocate WASM memory for the target module + uint32_t topic_offset = (uint32_t)wasm_runtime_module_dup_data(target_module, (char *)topic, strlen((char *)topic) + 1); if (topic_offset == 0) { LOG_ERR("Failed to allocate WASM memory for topic"); continue; } - uint32_t content_offset = (uint32_t)wasm_runtime_module_dup_data(target_module, (char *)content_type, - strlen((char *)content_type) + 1); + + uint32_t content_offset = (uint32_t)wasm_runtime_module_dup_data(target_module, (char *)content_type, strlen((char *)content_type) + 1); if (content_offset == 0) { LOG_ERR("Failed to allocate WASM memory for content_type"); wasm_runtime_module_free(target_module, topic_offset); continue; } + uint32_t payload_offset = (uint32_t)wasm_runtime_module_dup_data(target_module, payload, payload_len); if (payload_offset == 0) { LOG_ERR("Failed to allocate WASM memory for payload"); @@ -120,7 +215,8 @@ int ocre_messaging_publish(wasm_exec_env_t exec_env, void *topic, void *content_ wasm_runtime_module_free(target_module, content_offset); continue; } - + + // Create and queue the messaging event ocre_event_t event; event.type = OCRE_RESOURCE_TYPE_MESSAGING; event.data.messaging_event.message_id = message_id; @@ -132,100 +228,38 @@ int ocre_messaging_publish(wasm_exec_env_t exec_env, void *topic, void *content_ event.data.messaging_event.payload_offset = payload_offset; event.data.messaging_event.payload_len = (uint32_t)payload_len; event.owner = target_module; + + LOG_DBG("Creating messaging event: ID=%d, topic=%s, content_type=%s, payload_len=%d for module %p", + message_id, (char *)topic, (char *)content_type, payload_len, (void *)target_module); - k_spinlock_key_t key = k_spin_lock(&ocre_event_queue_lock); - if (k_msgq_put(&ocre_event_queue, &event, K_NO_WAIT) != 0) { + core_spinlock_key_t key = core_spinlock_lock(&ocre_event_queue_lock); + if (core_eventq_put(&ocre_event_queue, &event) != 0) { LOG_ERR("Failed to queue messaging event for message ID %d", message_id); wasm_runtime_module_free(target_module, topic_offset); wasm_runtime_module_free(target_module, content_offset); wasm_runtime_module_free(target_module, payload_offset); } else { - posted = true; - LOG_DBG("Queued messaging event: ID=%d, topic=%s, content_type=%s, payload_len=%d for module %p", - message_id, (char *)topic, (char *)content_type, payload_len, (void *)target_module); + message_sent = true; + LOG_DBG("Queued messaging event for message ID %d", message_id); } - k_spin_unlock(&ocre_event_queue_lock, key); + core_spinlock_unlock(&ocre_event_queue_lock, key); } - if (posted) { - LOG_DBG("Published message: ID=%d, topic=%s, content_type=%s, payload_len=%d", message_id, (char *)topic, - (char *)content_type, payload_len); + + core_mutex_unlock(&messaging_system.mutex); + + if (message_sent) { + LOG_DBG("Published message: ID=%d, topic=%s, content_type=%s, payload_len=%d", + message_id, (char *)topic, (char *)content_type, payload_len); message_id++; return 0; } else { - LOG_ERR("No matching subscriptions found for topic %s", (char *)topic); + LOG_WRN("No matching subscriptions found for topic %s", (char *)topic); return -ENOENT; } } -int ocre_messaging_subscribe(wasm_exec_env_t exec_env, void *topic) { - if (!messaging_system_initialized) { - LOG_ERR("Messaging system not initialized"); - return -EINVAL; - } - if (subscription_list.subscriptions_number >= MESSAGING_MAX_SUBSCRIPTIONS) { - LOG_ERR("Maximum subscriptions reached"); - return -ENOMEM; - } - if (!topic || ((char *)topic)[0] == '\0') { - LOG_ERR("Topic is NULL or empty"); - return -EINVAL; - } - wasm_module_inst_t module_inst = wasm_runtime_get_module_inst(exec_env); - if (!module_inst) { - LOG_ERR("No module instance for exec_env"); - return -EINVAL; - } - ocre_module_context_t *ctx = ocre_get_module_context(module_inst); - if (!ctx || !ctx->exec_env) { - LOG_ERR("Execution environment not found for module instance %p", (void *)module_inst); - return -EINVAL; - } - - // Find a free slot for subscription - for (int i = 0; i < MESSAGING_MAX_SUBSCRIPTIONS; i++) { - if (!subscription_list.info[i].is_active) { - size_t topic_len = strlen((char *)topic) + 1; - subscription_list.info[i].topic = k_malloc(topic_len); - if (!subscription_list.info[i].topic) { - LOG_ERR("Failed to allocate memory for topic"); - return -ENOMEM; - } - - strcpy((char *)subscription_list.info[i].topic, (char *)topic); - subscription_list.info[i].module_inst = module_inst; - subscription_list.info[i].is_active = true; - subscription_list.subscriptions_number++; - ocre_increment_resource_count(module_inst, OCRE_RESOURCE_TYPE_MESSAGING); - LOG_INF("Subscribed to topic: %s, current module_inst: %p", (char *)topic, (void *)module_inst); - return 0; - } - } - LOG_ERR("No free subscription slots available"); - return -ENOMEM; -} - -void ocre_messaging_cleanup_container(wasm_module_inst_t module_inst) { - if (!messaging_system_initialized || !module_inst) { - LOG_ERR("Messaging system not initialized or invalid module %p", (void *)module_inst); - return; - } - // Clean up subscriptions - for (int i = 0; i < MESSAGING_MAX_SUBSCRIPTIONS; i++) { - if (subscription_list.info[i].is_active && subscription_list.info[i].module_inst == module_inst) { - k_free(subscription_list.info[i].topic); - subscription_list.info[i].topic = NULL; - subscription_list.info[i].module_inst = NULL; - subscription_list.info[i].is_active = false; - subscription_list.subscriptions_number--; - ocre_decrement_resource_count(module_inst, OCRE_RESOURCE_TYPE_MESSAGING); - LOG_INF("Cleaned up subscription %d for module %p", i, (void *)module_inst); - } - } - LOG_INF("Cleaned up messaging resources for module %p", (void *)module_inst); -} - -int ocre_messaging_free_module_event_data(wasm_exec_env_t exec_env, uint32_t topic_offset, uint32_t content_offset, - uint32_t payload_offset) { +/* Free module event data */ +int ocre_messaging_free_module_event_data(wasm_exec_env_t exec_env, uint32_t topic_offset, uint32_t content_offset, uint32_t payload_offset) { wasm_module_inst_t module_inst = wasm_runtime_get_module_inst(exec_env); if (!module_inst) { LOG_ERR("Cannot find module_inst for free event data"); diff --git a/src/ocre/ocre_messaging/ocre_messaging.h b/src/ocre/ocre_messaging/ocre_messaging.h index bdbf3ea9..d46eb56e 100644 --- a/src/ocre/ocre_messaging/ocre_messaging.h +++ b/src/ocre/ocre_messaging/ocre_messaging.h @@ -9,9 +9,7 @@ #define OCRE_MESSAGING_H #include -#include -#include -#include +#include "ocre_core_external.h" #include #define MESSAGING_QUEUE_SIZE 100 @@ -77,4 +75,4 @@ void ocre_messaging_cleanup_container(wasm_module_inst_t module_inst); int ocre_messaging_free_module_event_data(wasm_exec_env_t exec_env, uint32_t topic_offset, uint32_t content_offset, uint32_t payload_offset); -#endif /* OCRE_MESSAGING_H */ \ No newline at end of file +#endif /* OCRE_MESSAGING_H */ diff --git a/src/ocre/ocre_timers/ocre_timer.c b/src/ocre/ocre_timers/ocre_timer.c index d53e145e..02a67d91 100644 --- a/src/ocre/ocre_timers/ocre_timer.c +++ b/src/ocre/ocre_timers/ocre_timer.c @@ -6,39 +6,38 @@ */ #include -#include #include +#include #include -#include -#include -#include - -LOG_MODULE_DECLARE(ocre_cs_component, OCRE_LOG_LEVEL); - #include #include #include +#include -// Compact timer structure +LOG_MODULE_DECLARE(ocre_cs_component, OCRE_LOG_LEVEL); + +/* Unified timer structure using core_timer API */ typedef struct { uint32_t in_use: 1; uint32_t id: 8; // Up to 256 timers uint32_t interval: 16; // Up to 65s intervals uint32_t periodic: 1; - struct k_timer timer; + uint32_t running: 1; // Track if timer is currently running + uint32_t start_time; // Start time for remaining time calculations + core_timer_t timer; // Unified core timer wasm_module_inst_t owner; -} ocre_timer; +} ocre_timer_internal; -#ifndef CONFIG_MAX_TIMER +#ifndef CONFIG_MAX_TIMERS #define CONFIG_MAX_TIMERS 5 #endif // Static data -static ocre_timer timers[CONFIG_MAX_TIMERS]; +static ocre_timer_internal timers[CONFIG_MAX_TIMERS]; static bool timer_system_initialized = false; -static void timer_callback_wrapper(struct k_timer *timer); +static void unified_timer_callback(void *user_data); void ocre_timer_init(void) { if (timer_system_initialized) { @@ -63,7 +62,7 @@ int ocre_timer_create(wasm_exec_env_t exec_env, int id) { return -EINVAL; } - ocre_timer *timer = &timers[id - 1]; + ocre_timer_internal *timer = &timers[id - 1]; if (timer->in_use) { LOG_ERR("Timer ID %d already in use", id); return -EBUSY; @@ -72,7 +71,14 @@ int ocre_timer_create(wasm_exec_env_t exec_env, int id) { timer->id = id; timer->owner = module; timer->in_use = 1; - k_timer_init(&timer->timer, timer_callback_wrapper, NULL); + + // Initialize unified core timer + if (core_timer_init(&timer->timer, unified_timer_callback, timer) != 0) { + LOG_ERR("Failed to initialize core timer %d", id); + timer->in_use = 0; + return -EINVAL; + } + ocre_increment_resource_count(module, OCRE_RESOURCE_TYPE_TIMER); LOG_INF("Created timer %d for module %p", id, (void *)module); return 0; @@ -85,13 +91,17 @@ int ocre_timer_delete(wasm_exec_env_t exec_env, ocre_timer_t id) { return -EINVAL; } - ocre_timer *timer = &timers[id - 1]; + ocre_timer_internal *timer = &timers[id - 1]; if (!timer->in_use || timer->owner != module) { LOG_ERR("Timer ID %d not in use or not owned by module %p", id, (void *)module); return -EINVAL; } - k_timer_stop(&timer->timer); + + // Stop unified core timer + core_timer_stop(&timer->timer); + timer->in_use = 0; + timer->running = 0; timer->owner = NULL; ocre_decrement_resource_count(module, OCRE_RESOURCE_TYPE_TIMER); LOG_INF("Deleted timer %d", id); @@ -105,7 +115,7 @@ int ocre_timer_start(wasm_exec_env_t exec_env, ocre_timer_t id, int interval, in return -EINVAL; } - ocre_timer *timer = &timers[id - 1]; + ocre_timer_internal *timer = &timers[id - 1]; if (!timer->in_use || timer->owner != module) { LOG_ERR("Timer ID %d not in use or not owned by module %p", id, (void *)module); return -EINVAL; @@ -118,9 +128,17 @@ int ocre_timer_start(wasm_exec_env_t exec_env, ocre_timer_t id, int interval, in timer->interval = interval; timer->periodic = is_periodic; - k_timeout_t duration = K_MSEC(interval); - k_timeout_t period = is_periodic ? duration : K_NO_WAIT; - k_timer_start(&timer->timer, duration, period); + timer->start_time = core_uptime_get(); + timer->running = 1; + + // Start unified core timer + int period_ms = is_periodic ? interval : 0; + if (core_timer_start(&timer->timer, interval, period_ms) != 0) { + LOG_ERR("Failed to start core timer %d", id); + timer->running = 0; + return -EINVAL; + } + LOG_INF("Started timer %d with interval %dms, periodic=%d", id, interval, is_periodic); return 0; } @@ -132,12 +150,16 @@ int ocre_timer_stop(wasm_exec_env_t exec_env, ocre_timer_t id) { return -EINVAL; } - ocre_timer *timer = &timers[id - 1]; + ocre_timer_internal *timer = &timers[id - 1]; if (!timer->in_use || timer->owner != module) { LOG_ERR("Timer ID %d not in use or not owned by module %p", id, (void *)module); return -EINVAL; } - k_timer_stop(&timer->timer); + + // Stop unified core timer + core_timer_stop(&timer->timer); + timer->running = 0; + LOG_INF("Stopped timer %d", id); return 0; } @@ -149,12 +171,25 @@ int ocre_timer_get_remaining(wasm_exec_env_t exec_env, ocre_timer_t id) { return -EINVAL; } - ocre_timer *timer = &timers[id - 1]; + ocre_timer_internal *timer = &timers[id - 1]; if (!timer->in_use || timer->owner != module) { LOG_ERR("Timer ID %d not in use or not owned by module %p", id, (void *)module); return -EINVAL; } - int remaining = k_ticks_to_ms_floor32(k_timer_remaining_ticks(&timer->timer)); + + int remaining; + if (!timer->running) { + remaining = 0; + } else { + uint32_t current_time = core_uptime_get(); + uint32_t elapsed = current_time - timer->start_time; + if (elapsed >= timer->interval) { + remaining = 0; // Timer should have expired + } else { + remaining = timer->interval - elapsed; + } + } + LOG_INF("Timer %d remaining time: %dms", id, remaining); return remaining; } @@ -167,48 +202,54 @@ void ocre_timer_cleanup_container(wasm_module_inst_t module_inst) { for (int i = 0; i < CONFIG_MAX_TIMERS; i++) { if (timers[i].in_use && timers[i].owner == module_inst) { - k_timer_stop(&timers[i].timer); + // Stop unified core timer + core_timer_stop(&timers[i].timer); timers[i].in_use = 0; + timers[i].running = 0; timers[i].owner = NULL; ocre_decrement_resource_count(module_inst, OCRE_RESOURCE_TYPE_TIMER); - LOG_INF("Cleaned up timer %d for module %p", i + 1, (void *)module_inst); + LOG_DBG("Cleaned up timer %d for module %p", i + 1, (void *)module_inst); } } - LOG_INF("Cleaned up timer resources for module %p", (void *)module_inst); + LOG_DBG("Cleaned up timer resources for module %p", (void *)module_inst); } -static void timer_callback_wrapper(struct k_timer *timer) { +/* Unified timer callback using core_timer API */ +static void unified_timer_callback(void *user_data) { if (!timer_system_initialized || !common_initialized || !ocre_event_queue_initialized) { LOG_ERR("Timer, common, or event queue not initialized, skipping callback"); return; } - if (!timer) { - LOG_ERR("Null timer pointer in callback"); + + ocre_timer_internal *timer = (ocre_timer_internal *)user_data; + if (!timer || !timer->in_use || !timer->owner) { + LOG_ERR("Invalid timer in callback: %p", (void *)timer); return; } - if ((uintptr_t)ocre_event_queue_buffer_ptr % 4 != 0) { - LOG_ERR("ocre_event_queue_buffer misaligned: %p", (void *)ocre_event_queue_buffer_ptr); - return; - } - LOG_DBG("Timer callback for timer %p", (void *)timer); - LOG_DBG("ocre_event_queue at %p, buffer at %p", (void *)&ocre_event_queue, (void *)ocre_event_queue_buffer_ptr); - for (int i = 0; i < CONFIG_MAX_TIMERS; i++) { - if (timers[i].in_use && &timers[i].timer == timer && timers[i].owner) { - ocre_event_t event; - event.type = OCRE_RESOURCE_TYPE_TIMER; - event.data.timer_event.timer_id = timers[i].id; - event.owner = timers[i].owner; - - LOG_DBG("Creating timer event: type=%d, id=%d, for owner %p", event.type, event.data.timer_event.timer_id, - (void *)timers[i].owner); - LOG_DBG("Event address: %p, Queue buffer: %p", (void *)&event, (void *)ocre_event_queue_buffer_ptr); - k_spinlock_key_t key = k_spin_lock(&ocre_event_queue_lock); - if (k_msgq_put(&ocre_event_queue, &event, K_NO_WAIT) != 0) { - LOG_ERR("Failed to queue timer event for timer %d", timers[i].id); - } else { - LOG_DBG("Queued timer event for timer %d", timers[i].id); - } - k_spin_unlock(&ocre_event_queue_lock, key); - } - } + + LOG_DBG("Timer callback for timer %d", timer->id); + + // For non-periodic timers, mark as not running + if (!timer->periodic) { + timer->running = 0; + } else { + // For periodic timers, update start time for next cycle + timer->start_time = core_uptime_get(); + } + + // Create and queue timer event + ocre_event_t event; + event.type = OCRE_RESOURCE_TYPE_TIMER; + event.data.timer_event.timer_id = timer->id; + event.owner = timer->owner; + + LOG_DBG("Creating timer event: type=%d, id=%d, for owner %p", event.type, timer->id, (void *)timer->owner); + + core_spinlock_key_t key = core_spinlock_lock(&ocre_event_queue_lock); + if (core_eventq_put(&ocre_event_queue, &event) != 0) { + LOG_ERR("Failed to queue timer event for timer %d", timer->id); + } else { + LOG_DBG("Queued timer event for timer %d", timer->id); + } + core_spinlock_unlock(&ocre_event_queue_lock, key); } diff --git a/src/shared/platform/ocre_core_external.h b/src/shared/platform/ocre_core_external.h index c38b8c27..69375219 100644 --- a/src/shared/platform/ocre_core_external.h +++ b/src/shared/platform/ocre_core_external.h @@ -223,4 +223,71 @@ int core_fileclose(void *handle); */ int core_construct_filepath(char *path, size_t len, char *name); +/** + * @brief Get system uptime in milliseconds. + * + * @return System uptime in milliseconds. + */ +uint32_t core_uptime_get(void); + +/** + * @brief Lock a spinlock and return the interrupt key. + * + * @param lock Pointer to the spinlock structure. + * @return Interrupt key to be used with unlock. + */ +core_spinlock_key_t core_spinlock_lock(core_spinlock_t *lock); + +/** + * @brief Unlock a spinlock using the interrupt key. + * + * @param lock Pointer to the spinlock structure. + * @param key Interrupt key returned from lock operation. + */ +void core_spinlock_unlock(core_spinlock_t *lock, core_spinlock_key_t key); + +/** + * @brief Initialize an event queue with specified item size and capacity. + * + * @param eventq Pointer to the event queue structure to initialize. + * @param item_size Size of each item in bytes. + * @param max_items Maximum number of items the queue can hold. + * @return 0 on success, negative error code on failure. + */ +int core_eventq_init(core_eventq_t *eventq, size_t item_size, size_t max_items); + +/** + * @brief Peek at the next item in the queue without removing it. + * + * @param eventq Pointer to the event queue. + * @param event Pointer to buffer where the peeked item will be copied. + * @return 0 on success, -ENOMSG if queue is empty. + */ +int core_eventq_peek(core_eventq_t *eventq, void *event); + +/** + * @brief Get and remove the next item from the queue. + * + * @param eventq Pointer to the event queue. + * @param event Pointer to buffer where the retrieved item will be copied. + * @return 0 on success, -ENOENT if queue is empty. + */ +int core_eventq_get(core_eventq_t *eventq, void *event); + +/** + * @brief Put an item into the queue. + * + * @param eventq Pointer to the event queue. + * @param event Pointer to the item to be added to the queue. + * @return 0 on success, -ENOMEM if queue is full. + */ +int core_eventq_put(core_eventq_t *eventq, const void *event); + +/** + * @brief Destroy an event queue and free its resources. + * + * @param eventq Pointer to the event queue to destroy. + */ +void core_eventq_destroy(core_eventq_t *eventq); + #endif /* OCRE_CORE_EXTERNAL_H */ diff --git a/src/shared/platform/posix/core_eventq.c b/src/shared/platform/posix/core_eventq.c new file mode 100644 index 00000000..1146a371 --- /dev/null +++ b/src/shared/platform/posix/core_eventq.c @@ -0,0 +1,74 @@ +/** + * @copyright Copyright © contributors to Project Ocre, + * which has been established as Project Ocre a Series of LF Projects, LLC + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ocre_core_external.h" +#include +#include + +int core_eventq_init(core_eventq_t *eventq, size_t item_size, size_t max_items) { + eventq->buffer = core_malloc(item_size * max_items); + if (!eventq->buffer) { + return -ENOMEM; + } + eventq->item_size = item_size; + eventq->max_items = max_items; + eventq->count = 0; + eventq->head = 0; + eventq->tail = 0; + pthread_mutex_init(&eventq->mutex, NULL); + pthread_cond_init(&eventq->cond, NULL); + return 0; +} + +int core_eventq_peek(core_eventq_t *eventq, void *event) { + pthread_mutex_lock(&eventq->mutex); + if (eventq->count == 0) { + pthread_mutex_unlock(&eventq->mutex); + return -ENOMSG; + } + memcpy(event, (char *)eventq->buffer + (eventq->head * eventq->item_size), eventq->item_size); + pthread_mutex_unlock(&eventq->mutex); + return 0; +} + +int core_eventq_get(core_eventq_t *eventq, void *event) { + pthread_mutex_lock(&eventq->mutex); + if (eventq->count == 0) { + pthread_mutex_unlock(&eventq->mutex); + return -ENOENT; + } + memcpy(event, (char *)eventq->buffer + (eventq->head * eventq->item_size), eventq->item_size); + eventq->head = (eventq->head + 1) % eventq->max_items; + eventq->count--; + pthread_mutex_unlock(&eventq->mutex); + return 0; +} + +int core_eventq_put(core_eventq_t *eventq, const void *event) { + pthread_mutex_lock(&eventq->mutex); + if (eventq->count >= eventq->max_items) { + pthread_mutex_unlock(&eventq->mutex); + return -ENOMEM; + } + memcpy((char *)eventq->buffer + (eventq->tail * eventq->item_size), event, eventq->item_size); + eventq->tail = (eventq->tail + 1) % eventq->max_items; + eventq->count++; + pthread_cond_signal(&eventq->cond); + pthread_mutex_unlock(&eventq->mutex); + return 0; +} + +void core_eventq_destroy(core_eventq_t *eventq) { + pthread_mutex_destroy(&eventq->mutex); + pthread_cond_destroy(&eventq->cond); + if (eventq->buffer) { + core_free(eventq->buffer); + eventq->buffer = NULL; + } +} + + diff --git a/src/shared/platform/posix/core_internal.h b/src/shared/platform/posix/core_internal.h index 2a4387ed..c3f8f47b 100644 --- a/src/shared/platform/posix/core_internal.h +++ b/src/shared/platform/posix/core_internal.h @@ -10,17 +10,21 @@ #include #include +#include #include #include #include +#include +#include #include // Config macros -//#define CONFIG_OCRE_CONTAINER_MESSAGING /*!< Enable container messaging support */ +#define CONFIG_OCRE_CONTAINER_MESSAGING /*!< Enable container messaging support */ #define CONFIG_OCRE_NETWORKING /*!< Enable networking support */ #define CONFIG_OCRE_CONTAINER_FILESYSTEM #define CONFIG_OCRE_CONTAINER_WAMR_TERMINATION +#define CONFIG_OCRE_TIMER // Base paths for the application #define OCRE_BASE_PATH "./ocre_data" /*!< Base directory for Ocre resources */ @@ -41,25 +45,66 @@ #define LOG_MODULE_DECLARE(name, level) +/* + * @brief Log level priority definitions (highest to lowest) + */ +#define APP_LOG_LEVEL_ERR 1 +#define APP_LOG_LEVEL_WRN 2 +#define APP_LOG_LEVEL_INF 3 +#define APP_LOG_LEVEL_DBG 4 + +/* + * @brief Determine the current log level based on CONFIG defines + * Priority: CONFIG_LOG_LVL_ERR > CONFIG_LOG_LVL_WRN > CONFIG_LOG_LVL_INF > CONFIG_LOG_LVL_DBG + * If none specified, default to INFO level + */ +#if defined(CONFIG_LOG_LVL_ERR) + #define APP_CURRENT_LOG_LEVEL APP_LOG_LEVEL_ERR +#elif defined(CONFIG_LOG_LVL_WRN) + #define APP_CURRENT_LOG_LEVEL APP_LOG_LEVEL_WRN +#elif defined(CONFIG_LOG_LVL_INF) + #define APP_CURRENT_LOG_LEVEL APP_LOG_LEVEL_INF +#elif defined(CONFIG_LOG_LVL_DBG) + #define APP_CURRENT_LOG_LEVEL APP_LOG_LEVEL_DBG +#else + #define APP_CURRENT_LOG_LEVEL APP_LOG_LEVEL_INF /* Default to INFO level */ +#endif + /** - * @brief Log a debug message. + * @brief Log an error message (always shown if ERR level or higher). */ -#define LOG_DBG(fmt, ...) printf("[DEBUG] " fmt "\n", ##__VA_ARGS__) +#if APP_CURRENT_LOG_LEVEL >= APP_LOG_LEVEL_ERR + #define LOG_ERR(fmt, ...) printf("[ERROR] " fmt "\n", ##__VA_ARGS__) +#else + #define LOG_ERR(fmt, ...) do { } while(0) +#endif /** - * @brief Log an error message. + * @brief Log a warning message (shown if WRN level or higher). */ -#define LOG_ERR(fmt, ...) printf("[ERROR] " fmt "\n", ##__VA_ARGS__) +#if APP_CURRENT_LOG_LEVEL >= APP_LOG_LEVEL_WRN + #define LOG_WRN(fmt, ...) printf("[WARNING] " fmt "\n", ##__VA_ARGS__) +#else + #define LOG_WRN(fmt, ...) do { } while(0) +#endif /** - * @brief Log a warning message. + * @brief Log an informational message (shown if INF level or higher). */ -#define LOG_WRN(fmt, ...) printf("[WARNING] " fmt "\n", ##__VA_ARGS__) +#if APP_CURRENT_LOG_LEVEL >= APP_LOG_LEVEL_INF + #define LOG_INF(fmt, ...) printf("[INFO] " fmt "\n", ##__VA_ARGS__) +#else + #define LOG_INF(fmt, ...) do { } while(0) +#endif /** - * @brief Log an informational message. + * @brief Log a debug message (shown only if DBG level). */ -#define LOG_INF(fmt, ...) printf("[INFO] " fmt "\n", ##__VA_ARGS__) +#if APP_CURRENT_LOG_LEVEL >= APP_LOG_LEVEL_DBG + #define LOG_DBG(fmt, ...) printf("[DEBUG] " fmt "\n", ##__VA_ARGS__) +#else + #define LOG_DBG(fmt, ...) do { } while(0) +#endif // Constants @@ -156,4 +201,87 @@ struct core_timer { void *user_data; /*!< User data for the callback */ }; +/* Generic singly-linked list iteration macros */ +#define CONTAINER_OF(ptr, type, member) \ + ((type *)((char *)(ptr) - offsetof(type, member))) + +#define CORE_SLIST_FOR_EACH_CONTAINER_SAFE(list, var, tmp, member) \ + for (var = (list)->head ? CONTAINER_OF((list)->head, __typeof__(*var), member) : NULL, \ + tmp = var ? (var->member.next ? CONTAINER_OF(var->member.next, __typeof__(*var), member) : NULL) : NULL; \ + var; \ + var = tmp, tmp = tmp ? (tmp->member.next ? CONTAINER_OF(tmp->member.next, __typeof__(*var), member) : NULL) : NULL) + +#define CORE_SLIST_FOR_EACH_CONTAINER(list, var, member) \ + for (var = (list)->head ? CONTAINER_OF((list)->head, __typeof__(*var), member) : NULL; \ + var; \ + var = var->member.next ? CONTAINER_OF(var->member.next, __typeof__(*var), member) : NULL) + +/** + * @brief Structure representing a node in a singly-linked list. + */ +typedef struct core_snode { + struct core_snode *next; /*!< Pointer to the next node in the list */ +} core_snode_t; + +/** + * @brief Structure representing a singly-linked list for POSIX platform. + */ +typedef struct { + core_snode_t *head; /*!< Pointer to the first node in the list */ + core_snode_t *tail; /*!< Pointer to the last node in the list */ +} core_slist_t; + +/** + * @brief Initialize a singly-linked list. + * + * @param list Pointer to the list to initialize. + */ +void core_slist_init(core_slist_t *list); + +/** + * @brief Append a node to the end of a singly-linked list. + * + * @param list Pointer to the list to append to. + * @param node Pointer to the node to append. + */ +void core_slist_append(core_slist_t *list, core_snode_t *node); + +/** + * @brief Remove a node from a singly-linked list. + * + * @param list Pointer to the list to remove from. + * @param prev Pointer to the previous node (or NULL if removing head). + * @param node Pointer to the node to remove. + */ +void core_slist_remove(core_slist_t *list, core_snode_t *prev, core_snode_t *node); + +/** + * @brief Spinlock type for POSIX platform (simulated using mutex). + */ +typedef struct { + pthread_mutex_t mutex; /*!< POSIX mutex for spinlock simulation */ +} core_spinlock_t; + +/** + * @brief Spinlock key type for POSIX platform. + */ +typedef int core_spinlock_key_t; + +/** + * @brief Generic event queue structure for POSIX platform. + * + * A thread-safe circular buffer implementation that can store + * any type of data items with configurable size and capacity. + */ +typedef struct { + void *buffer; /*!< Dynamically allocated buffer for queue items */ + size_t item_size; /*!< Size of each individual item in bytes */ + size_t max_items; /*!< Maximum number of items the queue can hold */ + size_t count; /*!< Current number of items in the queue */ + size_t head; /*!< Index of the next item to be read */ + size_t tail; /*!< Index where the next item will be written */ + pthread_mutex_t mutex; /*!< Mutex for thread-safe access */ + pthread_cond_t cond; /*!< Condition variable for signaling */ +} core_eventq_t; + #endif /* OCRE_CORE_INTERNAL_H */ diff --git a/src/shared/platform/posix/core_misc.c b/src/shared/platform/posix/core_misc.c index f556c5a9..069c1735 100644 --- a/src/shared/platform/posix/core_misc.c +++ b/src/shared/platform/posix/core_misc.c @@ -38,3 +38,19 @@ void core_sleep_ms(int milliseconds) void core_yield(void) { sched_yield(); } + +uint32_t core_uptime_get(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint32_t)(ts.tv_sec * 1000 + ts.tv_nsec / 1000000); +} + +core_spinlock_key_t core_spinlock_lock(core_spinlock_t *lock) { + pthread_mutex_lock(&lock->mutex); + return 0; +} + +void core_spinlock_unlock(core_spinlock_t *lock, core_spinlock_key_t key) { + (void)key; + pthread_mutex_unlock(&lock->mutex); +} diff --git a/src/shared/platform/posix/core_slist.c b/src/shared/platform/posix/core_slist.c new file mode 100644 index 00000000..1def4404 --- /dev/null +++ b/src/shared/platform/posix/core_slist.c @@ -0,0 +1,34 @@ +/** + * @copyright Copyright © contributors to Project Ocre, + * which has been established as Project Ocre a Series of LF Projects, LLC + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ocre_core_external.h" + +void core_slist_init(core_slist_t *list) { + list->head = NULL; + list->tail = NULL; +} + +void core_slist_append(core_slist_t *list, core_snode_t *node) { + node->next = NULL; + if (list->tail) { + list->tail->next = node; + } else { + list->head = node; + } + list->tail = node; +} + +void core_slist_remove(core_slist_t *list, core_snode_t *prev, core_snode_t *node) { + if (prev) { + prev->next = node->next; + } else { + list->head = node->next; + } + if (list->tail == node) { + list->tail = prev; + } +} diff --git a/src/shared/platform/posix/ocre_internal.cmake b/src/shared/platform/posix/ocre_internal.cmake index fa6cfac7..03ec062b 100644 --- a/src/shared/platform/posix/ocre_internal.cmake +++ b/src/shared/platform/posix/ocre_internal.cmake @@ -77,8 +77,13 @@ set(lib_sources ${OCRE_ROOT_DIR}/src/shared/platform/posix/core_misc.c ${OCRE_ROOT_DIR}/src/shared/platform/posix/core_memory.c ${OCRE_ROOT_DIR}/src/shared/platform/posix/core_timer.c + ${OCRE_ROOT_DIR}/src/shared/platform/posix/core_slist.c + ${OCRE_ROOT_DIR}/src/shared/platform/posix/core_eventq.c # APIs ${OCRE_ROOT_DIR}/src/ocre/api/ocre_api.c + ${OCRE_ROOT_DIR}/src/ocre/api/ocre_common.c + ${OCRE_ROOT_DIR}/src/ocre/ocre_timers/ocre_timer.c + ${OCRE_ROOT_DIR}/src/ocre/ocre_messaging/ocre_messaging.c # Utils ${OCRE_ROOT_DIR}/src/ocre/utils/strlcat.c ) diff --git a/src/shared/platform/zephyr/core_eventq.c b/src/shared/platform/zephyr/core_eventq.c new file mode 100644 index 00000000..7ec83f7a --- /dev/null +++ b/src/shared/platform/zephyr/core_eventq.c @@ -0,0 +1,64 @@ +/** + * @copyright Copyright © contributors to Project Ocre, + * which has been established as Project Ocre a Series of LF Projects, LLC + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "ocre_core_external.h" +#include +#include + +int core_eventq_init(core_eventq_t *eventq, size_t item_size, size_t max_items) { + eventq->buffer = core_malloc(item_size * max_items); + if (!eventq->buffer) { + return -ENOMEM; + } + eventq->item_size = item_size; + eventq->max_items = max_items; + + k_msgq_init(&eventq->msgq, (char *)eventq->buffer, item_size, max_items); + return 0; +} + +int core_eventq_peek(core_eventq_t *eventq, void *event) { + int ret = k_msgq_peek(&eventq->msgq, event); + if (ret == 0) { + return 0; + } else if (ret == -ENOMSG) { + return -ENOMSG; + } else { + return ret; + } +} + +int core_eventq_get(core_eventq_t *eventq, void *event) { + int ret = k_msgq_get(&eventq->msgq, event, K_NO_WAIT); + if (ret == 0) { + return 0; + } else if (ret == -ENOMSG) { + return -ENOENT; + } else { + return ret; + } +} + +int core_eventq_put(core_eventq_t *eventq, const void *event) { + int ret = k_msgq_put(&eventq->msgq, event, K_NO_WAIT); + if (ret == 0) { + return 0; + } else if (ret == -ENOMSG) { + return -ENOMEM; + } else { + return ret; + } +} + +void core_eventq_destroy(core_eventq_t *eventq) { + if (eventq->buffer) { + core_free(eventq->buffer); + eventq->buffer = NULL; + } +} + + diff --git a/src/shared/platform/zephyr/core_internal.h b/src/shared/platform/zephyr/core_internal.h index 87e0cddb..26e0f971 100644 --- a/src/shared/platform/zephyr/core_internal.h +++ b/src/shared/platform/zephyr/core_internal.h @@ -120,4 +120,90 @@ struct core_timer { void *user_data; /*!< User data for the callback */ }; +/* Generic singly-linked list iteration macros */ +#define CORE_SLIST_FOR_EACH_CONTAINER_SAFE SYS_SLIST_FOR_EACH_CONTAINER_SAFE +#define CORE_SLIST_FOR_EACH_CONTAINER SYS_SLIST_FOR_EACH_CONTAINER + +/** + * @brief Structure representing a node in a singly-linked list. + */ +#define core_snode_t sys_snode_t + +/** + * @brief Structure representing a singly-linked list for POSIX platform. + */ +#define core_slist_t sys_slist_t + +/** + * @brief Initialize a singly-linked list. + * + * @param list Pointer to the list to initialize. + */ +#define core_slist_init sys_slist_init + +/** + * @brief Append a node to the end of a singly-linked list. + * + * @param list Pointer to the list to append to. + * @param node Pointer to the node to append. + */ +#define core_slist_append sys_slist_append + +/** + * @brief Remove a node from a singly-linked list. + * + * @param list Pointer to the list to remove from. + * @param prev Pointer to the previous node (or NULL if removing head). + * @param node Pointer to the node to remove. + */ +#define core_slist_remove sys_slist_remove + +/* Zephyr-specific macros */ + +/** + * @brief Get system uptime in milliseconds. + * + * @return System uptime in milliseconds. + */ +#define core_uptime_get k_uptime_get_32 + +/** + * @brief Lock a spinlock and return the interrupt key. + * + * @param lock Pointer to the spinlock structure. + * @return Interrupt key to be used with unlock. + */ +#define core_spinlock_lock k_spin_lock + +/** + * @brief Unlock a spinlock using the interrupt key. + * + * @param lock Pointer to the spinlock structure. + * @param key Interrupt key returned from lock operation. + */ +#define core_spinlock_unlock k_spin_unlock + +/** + * @brief Spinlock type for Zephyr platform. + */ +typedef struct k_spinlock core_spinlock_t; + +/** + * @brief Spinlock key type for Zephyr platform. + */ +typedef k_spinlock_key_t core_spinlock_key_t; + +/** + * @brief Generic event queue structure for Zephyr platform. + * + * A thread-safe message queue implementation using Zephyr's k_msgq + * that can store any type of data items with configurable size and capacity. + */ +typedef struct { + void *buffer; /*!< Dynamically allocated buffer for queue items */ + size_t item_size; /*!< Size of each individual item in bytes */ + size_t max_items; /*!< Maximum number of items the queue can hold */ + struct k_msgq msgq; /*!< Zephyr message queue */ +} core_eventq_t; + #endif diff --git a/src/shared/platform/zephyr/ocre_internal.cmake b/src/shared/platform/zephyr/ocre_internal.cmake index f79bdff7..38fbcd16 100644 --- a/src/shared/platform/zephyr/ocre_internal.cmake +++ b/src/shared/platform/zephyr/ocre_internal.cmake @@ -78,6 +78,7 @@ set(lib_sources ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_thread.c ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_mutex.c ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_mq.c + ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_eventq.c ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_misc.c ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_memory.c ${OCRE_ROOT_DIR}/src/shared/platform/zephyr/core_timer.c