diff --git a/omnn/rt/storage/CMakeLists.txt b/omnn/rt/storage/CMakeLists.txt new file mode 100644 index 0000000000..36b8185352 --- /dev/null +++ b/omnn/rt/storage/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.15) + +# Find required packages +find_package(hiredis REQUIRED) +find_package(Boost REQUIRED COMPONENTS unit_test_framework) + +# Add Redis cache library +add_library(redis_cache + RedisCache.cpp + RedisCache.h +) + +target_link_libraries(redis_cache + PUBLIC + hiredis::hiredis + cache_base +) + +target_include_directories(redis_cache + PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR} +) + +# Add tests subdirectory +add_subdirectory(tests) diff --git a/omnn/rt/storage/RedisCache.cpp b/omnn/rt/storage/RedisCache.cpp new file mode 100644 index 0000000000..66f5b88d19 --- /dev/null +++ b/omnn/rt/storage/RedisCache.cpp @@ -0,0 +1,121 @@ +#include "RedisCache.h" +#include +#include +#include + +namespace omnn { +namespace rt { +namespace storage { + +RedisCache::RedisCache(const std::string& host, int port, int timeout_ms, + int retry_count, int retry_delay_ms) + : host_(host), port_(port), timeout_ms_(timeout_ms), + retry_count_(retry_count), retry_delay_ms_(retry_delay_ms), + context_(nullptr) { + if (!Connect()) { + throw std::runtime_error("Failed to connect to Redis server"); + } +} + +RedisCache::~RedisCache() { + Disconnect(); +} + +bool RedisCache::Connect() { + Disconnect(); // Ensure any existing connection is closed + + struct timeval timeout = { + .tv_sec = timeout_ms_ / 1000, + .tv_usec = (timeout_ms_ % 1000) * 1000 + }; + + context_ = redisConnectWithTimeout(host_.c_str(), port_, timeout); + + if (context_ == nullptr || context_->err) { + if (context_) { + redisFree(context_); + context_ = nullptr; + } + return false; + } + return true; +} + +void RedisCache::Disconnect() { + if (context_) { + redisFree(context_); + context_ = nullptr; + } +} + +bool RedisCache::RetryOperation(const std::function& operation) { + for (int i = 0; i < retry_count_; ++i) { + if (operation()) { + return true; + } + + if (!IsConnected() && !Connect()) { + std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms_)); + continue; + } + + std::this_thread::sleep_for(std::chrono::milliseconds(retry_delay_ms_)); + } + return false; +} + +bool RedisCache::Set(const std::string& key, const std::string& value) { + return RetryOperation([this, &key, &value]() { + if (!IsConnected()) return false; + + redisReply* reply = (redisReply*)redisCommand(context_, "SET %s %s", + key.c_str(), value.c_str()); + if (!reply) return false; + + bool success = (reply->type == REDIS_REPLY_STATUS && + strcasecmp(reply->str, "OK") == 0); + freeReplyObject(reply); + return success; + }); +} + +bool RedisCache::Get(const std::string& key, std::string& value) { + return RetryOperation([this, &key, &value]() { + if (!IsConnected()) return false; + + redisReply* reply = (redisReply*)redisCommand(context_, "GET %s", + key.c_str()); + if (!reply) return false; + + bool success = false; + if (reply->type == REDIS_REPLY_STRING) { + value = std::string(reply->str, reply->len); + success = true; + } + + freeReplyObject(reply); + return success; + }); +} + +bool RedisCache::Clear() { + return RetryOperation([this]() { + if (!IsConnected()) return false; + + redisReply* reply = (redisReply*)redisCommand(context_, "FLUSHDB"); + if (!reply) return false; + + bool success = (reply->type == REDIS_REPLY_STATUS && + strcasecmp(reply->str, "OK") == 0); + freeReplyObject(reply); + return success; + }); +} + +bool RedisCache::IsConnected() const { + return context_ && !context_->err; +} + +} // namespace storage +} // namespace rt +} // namespace omnn diff --git a/omnn/rt/storage/RedisCache.h b/omnn/rt/storage/RedisCache.h new file mode 100644 index 0000000000..e8c658e374 --- /dev/null +++ b/omnn/rt/storage/RedisCache.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include +#include +#include "CacheBase.h" + +namespace omnn { +namespace rt { +namespace storage { + +class RedisCache : public CacheBase { +public: + RedisCache(const std::string& host = "localhost", int port = 6379, + int timeout_ms = 1000, int retry_count = 5, int retry_delay_ms = 1000); + ~RedisCache(); + + bool Set(const std::string& key, const std::string& value) override; + bool Get(const std::string& key, std::string& value) override; + bool Clear() override; + bool IsConnected() const; + +private: + bool Connect(); + void Disconnect(); + bool RetryOperation(const std::function& operation); + + std::string host_; + int port_; + int timeout_ms_; + int retry_count_; + int retry_delay_ms_; + redisContext* context_; +}; + +} // namespace storage +} // namespace rt +} // namespace omnn diff --git a/omnn/rt/storage/tests/CMakeLists.txt b/omnn/rt/storage/tests/CMakeLists.txt new file mode 100644 index 0000000000..b405dab180 --- /dev/null +++ b/omnn/rt/storage/tests/CMakeLists.txt @@ -0,0 +1,18 @@ +cmake_minimum_required(VERSION 3.15) + +# Redis cache test +add_executable(redis_cache_test redis_cache_test.cpp) + +target_link_libraries(redis_cache_test + PRIVATE + redis_cache + Boost::unit_test_framework +) + +add_test(NAME redis_cache_test COMMAND redis_cache_test) + +# Set test properties +set_tests_properties(redis_cache_test PROPERTIES + ENVIRONMENT "OPENMIND_TEST_REDIS_RETRY_COUNT=5;OPENMIND_TEST_REDIS_RETRY_DELAY=1000" + TIMEOUT 60 +) diff --git a/omnn/rt/storage/tests/redis_cache_test.cpp b/omnn/rt/storage/tests/redis_cache_test.cpp new file mode 100644 index 0000000000..45ba99ece6 --- /dev/null +++ b/omnn/rt/storage/tests/redis_cache_test.cpp @@ -0,0 +1,59 @@ +#define BOOST_TEST_MODULE redis_cache_test +#include +#include "../RedisCache.h" +#include +#include + +using namespace omnn::rt::storage; + +struct RedisTestConfig { + RedisTestConfig() { + // Get retry configuration from environment + const char* retry_count = std::getenv("OPENMIND_TEST_REDIS_RETRY_COUNT"); + const char* retry_delay = std::getenv("OPENMIND_TEST_REDIS_RETRY_DELAY"); + + retry_count_ = retry_count ? std::stoi(retry_count) : 5; + retry_delay_ms_ = retry_delay ? std::stoi(retry_delay) : 1000; + } + + int retry_count_; + int retry_delay_ms_; +}; + +BOOST_FIXTURE_TEST_SUITE(redis_cache_tests, RedisTestConfig) + +BOOST_AUTO_TEST_CASE(test_basic_operations) { + RedisCache cache("localhost", 6379, 1000, retry_count_, retry_delay_ms_); + + // Test Set operation + BOOST_CHECK(cache.Set("test_key", "test_value")); + + // Test Get operation + std::string value; + BOOST_CHECK(cache.Get("test_key", value)); + BOOST_CHECK_EQUAL(value, "test_value"); + + // Test Clear operation + BOOST_CHECK(cache.Clear()); + BOOST_CHECK(!cache.Get("test_key", value)); +} + +BOOST_AUTO_TEST_CASE(test_nonexistent_key) { + RedisCache cache("localhost", 6379, 1000, retry_count_, retry_delay_ms_); + + std::string value; + BOOST_CHECK(!cache.Get("nonexistent_key", value)); +} + +BOOST_AUTO_TEST_CASE(test_connection_retry) { + // Test with invalid host to trigger retry mechanism + RedisCache cache("invalid_host", 6379, 100, 2, 100); + + // Operations should fail after retries + BOOST_CHECK(!cache.Set("test_key", "test_value")); + + std::string value; + BOOST_CHECK(!cache.Get("test_key", value)); +} + +BOOST_AUTO_TEST_SUITE_END()