Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added cognix/__init__.py
Empty file.
Empty file added cognix/agents/__init__.py
Empty file.
Empty file.
Empty file.
Empty file added cognix/backend/__init__.py
Empty file.
18 changes: 18 additions & 0 deletions cognix/backend/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from abc import ABC, abstractmethod

class BaseBackend(ABC):
@abstractmethod
def generate(self, **kwargs) -> str:
"""
Generate a full response based on parameters.
Parameters depend on the specific backend implementation.
"""
pass

@abstractmethod
def generate_stream(self, **kwargs):
"""
Generate a streamed response based on parameters.
Yields chunks of the response.
"""
pass
67 changes: 67 additions & 0 deletions cognix/backend/ollama_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import requests
from cognix.backend.base import BaseBackend
import json

from cognix.utils.const.ENDPOINTS.llm import OLLAMA_API_URL_BASE, OLLAMA_API_URL_ENDPOINT

class OllamaBackend(BaseBackend):
def __init__(self, model: str = None, api_url: str = OLLAMA_API_URL_BASE):
self.model = model
self.api_url = api_url

def generate(self, **kwargs) -> str:
url = self.api_url + OLLAMA_API_URL_ENDPOINT

# Use model from kwargs or fallback to self.model
model = kwargs.pop("model", self.model)
if not model:
raise ValueError("Model name must be specified either at init or as a parameter.")

# Compose payload with mandatory model and prompt + rest of kwargs
payload = {
"model": model,
**kwargs
}

# Force stream=True to handle streaming internally but return concatenated string
payload["stream"] = True
try:
with requests.post(url, json=payload, stream=True) as response:
response.raise_for_status()
full_output = ""
for line in response.iter_lines():
if line:
chunk = json.loads(line.decode("utf-8"))
full_output += chunk.get("response", "")
if chunk.get("done"):
break
return full_output.strip()
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Ollama API request failed: {e}")

def generate_stream(self, **kwargs):
url = f"{self.api_url}/api/generate"

model = kwargs.pop("model", self.model)
if not model:
raise ValueError("Model name must be specified either at init or as a parameter.")

payload = {
"model": model,
**kwargs
}

# Default to stream True if not specified
payload.setdefault("stream", True)
try:
with requests.post(url, json=payload, stream=payload["stream"]) as response:
response.raise_for_status()
for line in response.iter_lines():
if line:
chunk = json.loads(line.decode("utf-8"))
if "response" in chunk:
yield chunk["response"]
if chunk.get("done"):
break
except requests.exceptions.RequestException as e:
raise RuntimeError(f"Ollama streaming API request failed: {e}")
Empty file added cognix/conversation/__init__.py
Empty file.
21 changes: 21 additions & 0 deletions cognix/conversation/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from cognix.conversation.memory_management.memory_strategy import MemoryStrategy

class ConversationManager:
def __init__(self, memory: MemoryStrategy, system_prompt=None):
self.memory = memory
self.system_prompt = system_prompt

def add_user_message(self, content: str):
self.memory.add_message("user", content)

def add_assistant_message(self, content: str):
self.memory.add_message("assistant", content)

def get_prompt(self) -> str:
return self.memory.get_prompt()

def reset_conversation(self):
self.memory.reset()

def export_state(self):
return self.memory.get_state()
Empty file.
61 changes: 61 additions & 0 deletions cognix/conversation/memory_management/buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from cognix.conversation.memory_management.memory_strategy import MemoryStrategy

from cognix.utils.const.config import BUFFER_CONFIG

class BufferMemory(MemoryStrategy):
def __init__(self, system_prompt: str = None, max_history: int = 20):
self.system_prompt = system_prompt
self.max_history = max_history
self.history = []
if system_prompt:
self.history.append({"role": "system", "content": system_prompt})

def add_message(self, role: str, content: str):
if role not in {"user", "assistant"}:
raise ValueError("Role must be 'user' or 'assistant'")
self.history.append({"role": role, "content": content})
self._trim_history()

def add_user_message(self, content: str):
self.add_message("user", content)

def add_assistant_message(self, content: str):
self.add_message("assistant", content)

def get_prompt(self) -> str:
conversation = ""
for msg in self.history:
if msg["role"] == "system":
continue
role = "[USER] " if msg["role"] == "user" else "[CHATBOT] "
conversation += f"{role}{msg['content']}\n"
prompt = conversation.strip() + "\n[CHATBOT] "
return prompt

def reset(self):
self.history = []
if self.system_prompt:
self.history.append({"role": "system", "content": self.system_prompt})

def get_state(self) -> dict:
return {"history": self.history}

def get_history(self):
return self.history

def _trim_history(self):
system_msg = self.history[0] if self.history and self.history[0]["role"] == "system" else None
messages = self.history[1:] if system_msg else self.history
trimmed = messages[-self.max_history:]
self.history = [system_msg] + trimmed if system_msg else trimmed


def buffer_memory_factory(config=BUFFER_CONFIG) -> BufferMemory:
"""
Factory function to create a BufferMemory instance.
:param config: Configuration dictionary for BufferMemory.
:return: An instance of BufferMemory.
"""
if config is None:
config = BUFFER_CONFIG
return BufferMemory(**config)
18 changes: 18 additions & 0 deletions cognix/conversation/memory_management/memory_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from abc import ABC, abstractmethod

class MemoryStrategy(ABC):
@abstractmethod
def add_message(self, role: str, content: str):
pass

@abstractmethod
def get_prompt(self) -> str:
pass

@abstractmethod
def reset(self):
pass

@abstractmethod
def get_state(self) -> dict:
pass
88 changes: 88 additions & 0 deletions cognix/conversation/memory_management/summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from cognix.utils.const.config import SUMMARY_CONFIG


from cognix.conversation.memory_management.memory_strategy import MemoryStrategy
import threading

class SummaryMemory(MemoryStrategy):
def __init__(self, summarizer= None, max_recent: int = 5):
if summarizer is None:
self.summarizer = self._summarizer
else:
self.summarizer = summarizer
self.summary = ""
self.recent_history = []
self.max_recent = max_recent

def add_message(self, role: str, content: str):
self.recent_history.append({"role": role, "content": content})
self._prune_recent_entries()

def _prune_recent_entries(self):
if len(self.recent_history) > self.max_recent:
self._update_summary_async()
self.recent_history = [self._last_user_message()]

def _last_user_message(self):
return next((msg for msg in reversed(self.recent_history) if msg["role"] == "user"), "")

def _format_new_lines(self) -> str:
return "\n\n".join(
f"[USER] {m['content']}" if m["role"] == "user" else f"[CHATBOT] {m['content']}"
for m in self.recent_history
)

def _update_summary(self):
new_lines = self._format_new_lines()
prompt = f"""
Can you please summarize the following conversation and provide a brief overview of the main points discussed?
I want only the summary of the conversation, not the conversation itself.
{self.summary.strip()}
{new_lines}"""
self.summary = self.summarizer(prompt).strip()

def _update_summary_async(self):
def summarization_task():
new_lines = self._format_new_lines()
prompt = f"""
Can you please summarize the following conversation and provide a brief overview of the main points discussed?
I want only the summary of the conversation, not the conversation itself. Do not include the [USER] and [CHATBOT] tags.
{self.summary.strip()}
{new_lines}"""
new_summary = self.summarizer(prompt).strip()
self.summary = new_summary # safely update shared state

thread = threading.Thread(target=summarization_task, daemon=True)
thread.start()

def _summarizer(self, prompt: str) -> str:
# TODO FIX THIS SHIT
# temporary placeholder for the summarizer agent that will be used
from cognix.backend.ollama_backend import OllamaBackend
backend = OllamaBackend(model="llama2")
return backend.generate(prompt=prompt)

def get_prompt(self) -> str:
parts = []
if self.summary:
parts.append(f"[SUMMARY] {self.summary.strip()}")
if self.recent_history:
parts.append(self._format_new_lines().strip())
parts.append("\n[CHATBOT] ")
return "\n\n".join(parts)

def reset(self):
self.summary = ""
self.recent_history = []

def get_state(self):
return {"summary": self.summary}


def summary_memory_factory(config=SUMMARY_CONFIG) -> SummaryMemory:
"""
Factory function to create a SummaryMemory instance.
"""
if config is None:
config = SUMMARY_CONFIG
return SummaryMemory(**config)
45 changes: 45 additions & 0 deletions cognix/session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from cognix.backend.ollama_backend import OllamaBackend

from cognix.conversation.memory_management.buffer import BufferMemory
from cognix.conversation.memory_management.summary import SummaryMemory
from cognix.conversation.manager import ConversationManager

from cognix.utils.const.PROMPTS.conversation import BASE_CONVESATION_PROMPT
from cognix.utils.const.registry import MemoryRegistry

class COGNIXSession:
def __init__(self, model="llama2", system_prompt=BASE_CONVESATION_PROMPT, memory_type="summary",memory_config=None):
self.backend = OllamaBackend(model=model)

# Use the registry to get the memory instance dynamically
memory = MemoryRegistry.get(memory_type, memory_config)

self.conv_manager = ConversationManager(memory=memory, system_prompt=system_prompt)


def chat(self, user_input: str, **kwargs) -> str:
self.conv_manager.add_user_message(user_input)
prompt = self.conv_manager.get_prompt()
response = self.backend.generate(prompt=prompt, **kwargs)
self.conv_manager.add_assistant_message(response)
return response

def chat_stream(self, user_input: str, **kwargs):
self.conv_manager.add_user_message(user_input)
prompt = self.conv_manager.get_prompt()
stream = self.backend.generate_stream(prompt=prompt, **kwargs)
full_response = ""
for chunk in stream:
full_response += chunk
yield chunk
self.conv_manager.add_assistant_message(full_response)

def reset(self):
self.conv_manager.reset()

def set_system_prompt(self, prompt: str):
self.conv_manager.system_prompt = prompt
self.reset()

def get_history(self):
return self.conv_manager.get_history()
Empty file added cognix/utils/__init__.py
Empty file.
2 changes: 2 additions & 0 deletions cognix/utils/const/ENDPOINTS/llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
OLLAMA_API_URL_BASE = "http://localhost:11434"
OLLAMA_API_URL_ENDPOINT = "/api/generate"
9 changes: 9 additions & 0 deletions cognix/utils/const/PROMPTS/conversation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


BASE_CONVESATION_PROMPT = (
"The following is a friendly, detailed, and informative conversation between a human and an AI assistant. "
"The AI only speaks for itself and never includes the human's part or pretends to be both sides. "
"Each AI reply is a continuation of the conversation without any prefixes like 'AI:' or 'Human:'. "
"The AI remains talkative, helpful, and provides specific, context-aware answers. "
"If the AI does not know something, it clearly states that it does not know.\n\n"
)
9 changes: 9 additions & 0 deletions cognix/utils/const/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


SUMMARY_CONFIG = {
"max_recent": 5,
}

BUFFER_CONFIG = {
"max_history": 20,
}
38 changes: 38 additions & 0 deletions cognix/utils/const/registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# constants/registry.py

from cognix.conversation.memory_management.summary import summary_memory_factory
from cognix.conversation.memory_management.buffer import buffer_memory_factory



class MemoryRegistry:
_registry = {}

@classmethod
def register(cls, name):
def wrapper(factory_fn):
cls._registry[name] = factory_fn
return factory_fn
return wrapper

@classmethod
def get(cls, name, config):
if name not in cls._registry:
raise ValueError(f"Memory type '{name}' not found.")
return cls._registry[name](config)

@classmethod
def list(cls):
return list(cls._registry.keys())



MEMORY_REGISTRY = {
"summary": lambda config: SummaryMemory(
summarizer=config["summarizer"],
config=config
),
"buffer": lambda config: BufferMemory(config=config),
}
MemoryRegistry.register("summary")(summary_memory_factory)
MemoryRegistry.register("buffer")(buffer_memory_factory)
Empty file added test_project_cognix/__init__.py
Empty file.
Loading
Loading