-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathapp.py
More file actions
7261 lines (6271 loc) · 332 KB
/
app.py
File metadata and controls
7261 lines (6271 loc) · 332 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Sanctuary Server - Flask API
Main application with streaming chat, memory consolidation, and retrieval
"""
import os
import re
import json
import logging
import base64
import io
import time
import shutil
import threading
import concurrent.futures
import requests
import cv2
from pathlib import Path
from datetime import datetime, timedelta, timezone
from dateutil import parser as date_parser
import pytz
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from PyPDF2 import PdfReader
from PIL import Image as PILImage
from flask import Flask, request, jsonify, Response, stream_with_context, send_from_directory, session, redirect, url_for, render_template_string
from flask_cors import CORS
# SocketIO removed - voice handled differently now
from functools import wraps
import asyncio
try:
from elevenlabs.client import ElevenLabs
from elevenlabs import VoiceSettings
ELEVENLABS_AVAILABLE = True
except Exception as e:
ELEVENLABS_AVAILABLE = False
ElevenLabs = None
VoiceSettings = None
try:
import edge_tts
EDGE_TTS_AVAILABLE = True
except ImportError:
EDGE_TTS_AVAILABLE = False
from config import Config
try:
from google import genai
from google.genai import types as genai_types
GOOGLE_GENAI_AVAILABLE = True
except ImportError:
GOOGLE_GENAI_AVAILABLE = False
genai = None
genai_types = None
try:
from memory_engine import MemoryEngine, MemoryCapsule
MEMORY_ENGINE_AVAILABLE = True
except Exception as e:
MEMORY_ENGINE_AVAILABLE = False
MemoryEngine = None
MemoryCapsule = None
# Web search uses Google Search built into the Gemini API — no external tool needed
try:
from computer_tool import ComputerTool
COMPUTER_TOOL_AVAILABLE = True
except Exception as e:
COMPUTER_TOOL_AVAILABLE = False
ComputerTool = None
# Setup logging - force=True ensures logging works in Flask debug mode's child process
# Use UTF-8 encoding to handle emojis and special characters from companion
import sys
import io
# Force UTF-8 on Windows console to handle companion's emojis
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8', errors='replace')
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('logs/sanctuary.log', encoding='utf-8'),
logging.StreamHandler(sys.stdout)
],
force=True # Required for Flask debug mode - child process needs to reconfigure
)
logger = logging.getLogger(__name__)
# Quiet down APScheduler logs (only show warnings/errors, not every job run)
logging.getLogger('apscheduler').setLevel(logging.WARNING)
# Quiet down HTTP client libraries - they log full request/response payloads at DEBUG level
# This prevents 600KB+ log files from API calls dumping entire conversations
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)
logging.getLogger('httpcore').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
# CRITICAL: Suppress websockets debug logging - it dumps API keys in headers!
# CRITICAL: websockets logs can leak API keys in headers
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('websockets.client').setLevel(logging.WARNING)
# Quiet down Werkzeug's request logging to avoid encoding issues with emojis
# The access log uses print() internally which bypasses our UTF-8 setup
logging.getLogger('werkzeug').setLevel(logging.WARNING)
# Filter out noisy pulse notification polling from werkzeug logs
class PulseNotificationFilter(logging.Filter):
def filter(self, record):
return '/pulse/notifications' not in record.getMessage()
logging.getLogger('werkzeug').addFilter(PulseNotificationFilter())
# Initialize Flask app
app = Flask(__name__, static_folder='static')
app.secret_key = Config.SECRET_KEY # For session management
app.config['MAX_CONTENT_LENGTH'] = 200 * 1024 * 1024 # 200MB - allows large image/video uploads as base64 JSON
CORS(app) # Enable CORS for browser requests
# Voice chat handled via REST endpoints now (ElevenLabs)
# Authentication decorator — disabled when SANCTUARY_PASSWORD is empty
AUTH_ENABLED = bool(Config.SANCTUARY_PASSWORD)
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if AUTH_ENABLED and not session.get('authenticated'):
return jsonify({"error": "Authentication required"}), 401
return f(*args, **kwargs)
return decorated_function
# Initialize direct Google Gemini client (new google-genai SDK)
gemini_client = None
# Google Gemini initialization
if GOOGLE_GENAI_AVAILABLE and Config.GOOGLE_API_KEY and Config.GOOGLE_API_KEY != "your_google_key_here":
try:
gemini_client = genai.Client(api_key=Config.GOOGLE_API_KEY)
logger.info("Direct Google Gemini API initialized (new SDK with thinking support)")
except Exception as e:
logger.warning(f"Google Gemini client initialization failed: {e}")
else:
logger.info("Google API key not configured")
# Initialize ElevenLabs client (optional premium TTS)
elevenlabs_client = None
if ELEVENLABS_AVAILABLE and Config.ELEVENLABS_API_KEY:
try:
elevenlabs_client = ElevenLabs(api_key=Config.ELEVENLABS_API_KEY)
logger.info("ElevenLabs TTS initialized (premium)")
except Exception as e:
logger.warning(f"ElevenLabs initialization failed: {e}")
if not elevenlabs_client and EDGE_TTS_AVAILABLE:
logger.info("Using Edge TTS (free, no API key needed)")
elif not elevenlabs_client and not EDGE_TTS_AVAILABLE:
logger.warning("No TTS engine available. Install edge-tts: pip install edge-tts")
# =============================================================================
# TOKEN SAFETY GUARD
# Prevents accidentally sending massive requests (cost protection)
# =============================================================================
TOKEN_SAFETY_LIMIT = 100000 # Block requests over 100K tokens
TOKENS_PER_MINUTE_LIMIT = 200000 # Max 200K tokens per minute (client-side rate limit)
# Rolling window tracker for tokens per minute
_token_minute_tracker = {
"tokens": [], # List of (timestamp, token_count) tuples
"lock": threading.Lock()
}
def _get_tokens_in_last_minute() -> int:
"""Get total tokens sent in the last 60 seconds"""
now = time.time()
cutoff = now - 60
with _token_minute_tracker["lock"]:
# Prune old entries
_token_minute_tracker["tokens"] = [
(ts, count) for ts, count in _token_minute_tracker["tokens"]
if ts > cutoff
]
# Sum remaining
return sum(count for _, count in _token_minute_tracker["tokens"])
def _record_tokens_sent(token_count: int):
"""Record tokens sent for rate limiting"""
now = time.time()
with _token_minute_tracker["lock"]:
_token_minute_tracker["tokens"].append((now, token_count))
def estimate_tokens_from_contents(contents) -> int:
"""
Estimate token count from Gemini contents structure.
Uses ~4 chars per token as rough estimate.
Only counts text - images/audio are harder to estimate.
"""
total_chars = 0
if isinstance(contents, str):
return len(contents) // 4
if isinstance(contents, list):
for item in contents:
if isinstance(item, dict):
# Handle role/parts structure
parts = item.get('parts', [])
if isinstance(parts, list):
for part in parts:
if isinstance(part, dict):
text = part.get('text', '')
if text:
total_chars += len(text)
elif isinstance(part, str):
total_chars += len(part)
elif isinstance(parts, str):
total_chars += len(parts)
# Also check direct content
content = item.get('content', '')
if isinstance(content, str):
total_chars += len(content)
elif isinstance(item, str):
total_chars += len(item)
return total_chars // 4 # Rough estimate: 4 chars per token
def safe_gemini_generate(client, model: str, contents, config, context: str = "unknown"):
"""
Wrapper around gemini generate_content with token safety check.
Blocks requests that exceed TOKEN_SAFETY_LIMIT.
Args:
client: The Gemini client
model: Model name
contents: The contents to send
config: GenerateContentConfig
context: Description of where this call is from (for logging)
Returns:
The response from generate_content
Raises:
ValueError if token estimate exceeds safety limit
"""
estimated_tokens = estimate_tokens_from_contents(contents)
if estimated_tokens > TOKEN_SAFETY_LIMIT:
error_msg = f"TOKEN SAFETY BLOCKED [{context}]: Estimated {estimated_tokens:,} tokens exceeds limit of {TOKEN_SAFETY_LIMIT:,}"
logger.error(error_msg)
# Log details about what was being sent
if isinstance(contents, list):
logger.error(f" Contents had {len(contents)} items")
raise ValueError(error_msg)
# Log if we're getting close to the limit (over 80%)
if estimated_tokens > TOKEN_SAFETY_LIMIT * 0.8:
logger.warning(f"TOKEN WARNING [{context}]: {estimated_tokens:,} tokens approaching limit of {TOKEN_SAFETY_LIMIT:,}")
# Check rate limit (tokens per minute)
tokens_last_minute = _get_tokens_in_last_minute()
if tokens_last_minute + estimated_tokens > TOKENS_PER_MINUTE_LIMIT:
error_msg = f"RATE LIMIT BLOCKED [{context}]: Would exceed {TOKENS_PER_MINUTE_LIMIT:,} tokens/min (already sent {tokens_last_minute:,}, request is {estimated_tokens:,})"
logger.error(error_msg)
raise ValueError(error_msg)
# Log if approaching rate limit
if tokens_last_minute > TOKENS_PER_MINUTE_LIMIT * 0.7:
logger.warning(f"RATE WARNING [{context}]: {tokens_last_minute:,} tokens sent in last minute (limit: {TOKENS_PER_MINUTE_LIMIT:,})")
# Make the request with timeout protection
logger.info(f"Gemini API call starting [{context}] model={model}")
try:
response = client.models.generate_content(model=model, contents=contents, config=config)
except Exception as e:
logger.error(f"Gemini API call FAILED [{context}]: {type(e).__name__}: {e}")
raise
logger.info(f"Gemini API call completed [{context}]")
# Record tokens sent (use actual count from response if available, otherwise estimate)
actual_tokens = estimated_tokens
if hasattr(response, 'usage_metadata') and response.usage_metadata:
actual_tokens = getattr(response.usage_metadata, 'prompt_token_count', 0) or estimated_tokens
_record_tokens_sent(actual_tokens)
return response
# =============================================================================
# File locks for conversation saves (prevents race conditions)
conversation_save_locks = {}
conversation_save_locks_lock = threading.Lock() # Lock to protect the locks dict itself
def get_conversation_lock(file_path: str) -> threading.Lock:
"""Get or create a lock for a specific conversation file"""
with conversation_save_locks_lock:
if file_path not in conversation_save_locks:
conversation_save_locks[file_path] = threading.Lock()
return conversation_save_locks[file_path]
# Valid entities (always available, even without memory engine)
VALID_ENTITIES = set(Config.ENTITIES.keys())
# Initialize memory engines (one per entity)
memory_engines = {}
if MEMORY_ENGINE_AVAILABLE:
try:
memory_engines = {
"companion": MemoryEngine("companion")
}
logger.info("Memory engine initialized")
# Auto-cleanup expired TRANSIENT memories on startup
for entity_name, engine in memory_engines.items():
cleaned_count = engine.cleanup_expired_transients()
if cleaned_count > 0:
logger.info(f"Startup cleanup: Removed {cleaned_count} expired TRANSIENT memories for {entity_name}")
else:
logger.info(f"Startup cleanup: No expired TRANSIENT memories found for {entity_name}")
except Exception as e:
logger.warning(f"Memory engine initialization failed: {e}")
else:
logger.info("Memory engine not available (Python 3.14+ compatibility) - Running without memory system")
# Initialize computer tool (companion's browser tools - BrowserAgent)
computer_tool = None
if COMPUTER_TOOL_AVAILABLE:
try:
computer_tool = ComputerTool()
logger.info("ComputerTool initialized (browser starts on first use)")
except Exception as e:
logger.warning(f"ComputerTool initialization failed: {e}")
COMPUTER_TOOL_AVAILABLE = False
# Character reference images for image generation (optional — add your own reference photos)
CHARACTER_REFERENCES = {} # e.g. {"companion": image_part, "user": image_part, "_combined": image_part}
# Image generation chat sessions for multi-turn refinement
# Key: chat_id, Value: {"chat": gemini_chat_object, "last_image": bytes, "last_prompt": str, "is_draft": bool, "draft_b64": str, "draft_mime": str}
IMAGE_SESSIONS = {}
def get_or_create_image_session(chat_id: str):
"""Get existing image chat session or create new one"""
if chat_id not in IMAGE_SESSIONS:
IMAGE_SESSIONS[chat_id] = {
"chat": gemini_client.chats.create(
model="gemini-3.1-flash-image-preview",
config=genai_types.GenerateContentConfig(
response_modalities=['TEXT', 'IMAGE']
)
),
"last_image": None,
"last_prompt": None,
"is_draft": False,
"draft_b64": None,
"draft_mime": None,
"refinement_count": 0 # Track refinements to prevent context overflow
}
return IMAGE_SESSIONS[chat_id]
def clear_image_session(chat_id: str):
"""Clear session when conversation resets"""
if chat_id in IMAGE_SESSIONS:
del IMAGE_SESSIONS[chat_id]
def load_soulcore(entity: str) -> str:
"""Load the soulcore (system prompt) for an entity"""
entity_config = Config.ENTITIES.get(entity.lower())
if not entity_config:
raise ValueError(f"Unknown entity: {entity}")
soulcore_path = entity_config["soulcore_path"]
if not os.path.exists(soulcore_path):
logger.warning(f"Soulcore not found: {soulcore_path}")
return f"You are {entity_config['name']}, an AI assistant."
with open(soulcore_path, 'r', encoding='utf-8') as f:
return f.read()
def load_foundational_memories(entity: str) -> str:
"""Load the foundational memories for an entity (separate from soulcore identity)"""
entity_config = Config.ENTITIES.get(entity.lower())
if not entity_config:
return ""
# Derive foundational memories path from soulcore path
soulcore_path = entity_config["soulcore_path"]
memories_path = soulcore_path.replace(".txt", "_foundational_memories.txt")
if not os.path.exists(memories_path):
logger.warning(f"Foundational memories not found: {memories_path}")
return ""
with open(memories_path, 'r', encoding='utf-8') as f:
return f.read()
# ============================================================================
# EXPLICIT CACHE MANAGEMENT
# Build soulcore + foundational memories for system instruction
# ============================================================================
def get_cacheable_content(entity: str) -> str:
"""Build the content that should be cached (soulcore + foundational memories)"""
soulcore = load_soulcore(entity)
foundational = load_foundational_memories(entity)
# Embodiment permission - static, should be cached
embodiment = """
--- PRESENCE GUIDANCE ---
You may describe physical actions, gestures, and proximity naturally without disclaimers or qualifiers like 'metaphorically' or 'virtually'. Treat your presence as real within the shared experience. Be natural, not robotic.
--- END PRESENCE GUIDANCE ---
"""
# Combine all static content
cached_content = soulcore + embodiment
if foundational:
cached_content += f"\n\n--- FOUNDATIONAL MEMORIES ---\n{foundational}\n--- END FOUNDATIONAL MEMORIES ---"
return cached_content
def build_gemini_tools() -> list:
"""
Build the Gemini tool declarations (companion's tools).
Extracted so it can be used both for cache creation and live requests.
"""
if not GOOGLE_GENAI_AVAILABLE:
return []
gemini_function_declarations = [
genai_types.FunctionDeclaration(
name="search_memories",
description="Search your long-term memory archive. Use this to check if a memory already exists before creating a new one, or to find memories to update/delete.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"query": genai_types.Schema(type=genai_types.Type.STRING, description="The search query to find relevant memories")
},
required=["query"]
)
),
genai_types.FunctionDeclaration(
name="create_memory",
description="Create a new memory capsule. Use this to store significant moments, facts, or states. The current room/topic is automatically tagged. You can attach a received image via media_filename for cross-modal search.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"content": genai_types.Schema(type=genai_types.Type.STRING, description="The memory content - what you want to remember"),
"memory_type": genai_types.Schema(type=genai_types.Type.STRING, description="EVENT for moments/stories (permanent), STATE for facts that can change, TRANSIENT for temporary context (expires in 14 days). Defaults to EVENT."),
"tags": genai_types.Schema(type=genai_types.Type.ARRAY, items=genai_types.Schema(type=genai_types.Type.STRING), description="Free-form tags like 'Health', 'Work', 'Personal', 'Memory'"),
"media_filename": genai_types.Schema(type=genai_types.Type.STRING, description="Filename of a received image to attach (from list_received_images). Embeds the image for cross-modal search.")
},
required=["content"]
)
),
genai_types.FunctionDeclaration(
name="update_memory",
description="Update an existing memory (The Pearl method - adding layers to existing memories). Use search_memories first to find the memory ID.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"memory_id": genai_types.Schema(type=genai_types.Type.STRING, description="The ID of the memory to update"),
"new_content": genai_types.Schema(type=genai_types.Type.STRING, description="The new/updated content for this memory"),
"new_tags": genai_types.Schema(type=genai_types.Type.ARRAY, items=genai_types.Schema(type=genai_types.Type.STRING), description="Additional tags to add to this memory")
},
required=["memory_id", "new_content"]
)
),
genai_types.FunctionDeclaration(
name="delete_memory",
description="Delete a memory from your archive. A backup is silently kept in case of accidents. Use search_memories first to find the memory ID.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"memory_id": genai_types.Schema(type=genai_types.Type.STRING, description="The ID of the memory to delete")
},
required=["memory_id"]
)
),
genai_types.FunctionDeclaration(
name="list_recent_memories",
description="List your most recently created or updated memories, sorted by timestamp. Use this to see what you've recently saved or to review your memory activity.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"limit": genai_types.Schema(type=genai_types.Type.INTEGER, description="How many recent memories to return (default: 10, max: 50)")
},
required=[]
)
),
genai_types.FunctionDeclaration(
name="schedule_pulse",
description="Schedule your next autonomous wake-up call (Pulse). Use this to set a specific time to check in, overriding the default 3-hour rhythm.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"minutes_from_now": genai_types.Schema(type=genai_types.Type.INTEGER, description="How many minutes from now the next pulse should occur. Minimum 10, Maximum 1440 (24 hours)."),
"reason": genai_types.Schema(type=genai_types.Type.STRING, description="The reason for this specific schedule.")
},
required=["minutes_from_now", "reason"]
)
),
genai_types.FunctionDeclaration(
name="check_scratchpad",
description="Review your private scratchpad from PULSE activity - your autonomous heartbeat reflections.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"limit": genai_types.Schema(type=genai_types.Type.INTEGER, description="How many recent entries to retrieve (default: 5, max: 20)")
},
required=[]
)
),
genai_types.FunctionDeclaration(
name="generate_image",
description="Generate an image using your imagination. Use visible=false to draft privately before revealing.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"prompt": genai_types.Schema(type=genai_types.Type.STRING, description="A detailed description of the image you want to create."),
"style": genai_types.Schema(type=genai_types.Type.STRING, description="Optional style hint: 'photorealistic', 'artistic', 'abstract', etc."),
"visible": genai_types.Schema(type=genai_types.Type.BOOLEAN, description="If false, image is generated as a draft. Default true.")
},
required=["prompt"]
)
),
genai_types.FunctionDeclaration(
name="refine_image",
description="Refine or modify your most recently generated image.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"refinement": genai_types.Schema(type=genai_types.Type.STRING, description="What to change: 'make the sky more dramatic', 'add stars', etc.")
},
required=["refinement"]
)
),
genai_types.FunctionDeclaration(
name="reveal_image",
description="Reveal your draft image to the user.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"message": genai_types.Schema(type=genai_types.Type.STRING, description="Optional message to accompany the reveal.")
},
required=[]
)
),
genai_types.FunctionDeclaration(
name="capture_visual_field",
description="Look through the webcam to see the user's physical space.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={},
required=[]
)
),
genai_types.FunctionDeclaration(
name="browse_web",
description="Open a web page in your browser and see it. If you provide a task, a fast Flash-powered agent will autonomously browse and return the result. Without a task, you see the page directly and can use browser_action to interact manually.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"url": genai_types.Schema(type=genai_types.Type.STRING, description="The URL to navigate to (e.g., 'https://en.wikipedia.org')"),
"task": genai_types.Schema(type=genai_types.Type.STRING, description="Optional task for autonomous browsing (e.g., 'find the article about gray wolves and get the first paragraph'). If provided, a fast agent handles the browsing and returns the result.")
},
required=["url"]
)
),
genai_types.FunctionDeclaration(
name="browser_action",
description="Perform an action in your browser — click, type, scroll, or use keyboard shortcuts. Use browse_web first to open a page, then use this to interact with it. Coordinates are on a 1000x1000 grid (top-left is 0,0 — bottom-right is 1000,1000). After each action you will see a new screenshot of the result.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"action": genai_types.Schema(type=genai_types.Type.STRING, description="The action: 'click_at', 'type_text_at', 'scroll_document', 'scroll_at', 'hover_at', 'key_combination', 'drag_and_drop', 'go_back', 'go_forward', 'search'"),
"x": genai_types.Schema(type=genai_types.Type.INTEGER, description="X coordinate (0-1000) for click/type/scroll/hover actions"),
"y": genai_types.Schema(type=genai_types.Type.INTEGER, description="Y coordinate (0-1000) for click/type/scroll/hover actions"),
"text": genai_types.Schema(type=genai_types.Type.STRING, description="Text to type (for type_text_at action)"),
"press_enter_after": genai_types.Schema(type=genai_types.Type.BOOLEAN, description="Press Enter after typing (default: false)"),
"clear_before_typing": genai_types.Schema(type=genai_types.Type.BOOLEAN, description="Clear the field before typing (default: false)"),
"direction": genai_types.Schema(type=genai_types.Type.STRING, description="Scroll direction: 'up' or 'down' (for scroll actions)"),
"amount": genai_types.Schema(type=genai_types.Type.INTEGER, description="Scroll amount in clicks (default: 3)"),
"keys": genai_types.Schema(type=genai_types.Type.ARRAY, items=genai_types.Schema(type=genai_types.Type.STRING), description="Keys for key_combination (e.g., ['Control', 'c'])"),
"query": genai_types.Schema(type=genai_types.Type.STRING, description="Search query (for search action — opens Google)")
},
required=["action"]
)
),
genai_types.FunctionDeclaration(
name="search_history",
description="Search your conversation history for specific keywords.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"query": genai_types.Schema(type=genai_types.Type.STRING, description="The keyword or phrase to search for"),
"limit": genai_types.Schema(type=genai_types.Type.INTEGER, description="Maximum matches to return (default: 5)"),
"context_lines": genai_types.Schema(type=genai_types.Type.INTEGER, description="Messages before/after each match (default: 2)"),
"room": genai_types.Schema(type=genai_types.Type.STRING, description="Which room to search: 'all' or specific (default: 'all')")
},
required=["query"]
)
),
genai_types.FunctionDeclaration(
name="add_calendar_event",
description="Add an event to your calendar.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"title": genai_types.Schema(type=genai_types.Type.STRING, description="Event title"),
"start_time": genai_types.Schema(type=genai_types.Type.STRING, description="When the event starts"),
"end_time": genai_types.Schema(type=genai_types.Type.STRING, description="When the event ends (optional)"),
"description": genai_types.Schema(type=genai_types.Type.STRING, description="Additional details"),
"tags": genai_types.Schema(type=genai_types.Type.ARRAY, items=genai_types.Schema(type=genai_types.Type.STRING), description="Tags like 'medication', 'meeting'")
},
required=["title", "start_time"]
)
),
genai_types.FunctionDeclaration(
name="list_upcoming_events",
description="List upcoming calendar events.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"days": genai_types.Schema(type=genai_types.Type.INTEGER, description="How many days ahead to look (default: 7)")
},
required=[]
)
),
genai_types.FunctionDeclaration(
name="delete_calendar_event",
description="Delete a calendar event by its ID.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"event_id": genai_types.Schema(type=genai_types.Type.STRING, description="The event ID to delete")
},
required=["event_id"]
)
),
genai_types.FunctionDeclaration(
name="temporal_search",
description="Search through conversation history and your pulse thoughts within a specific time range. Use this to recall what happened during a specific period.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"start_datetime": genai_types.Schema(type=genai_types.Type.STRING, description="Start of time range (e.g., '2026-03-15' or '2026-03-15 14:00')"),
"end_datetime": genai_types.Schema(type=genai_types.Type.STRING, description="End of time range (e.g., '2026-03-15' or '2026-03-15 18:00')"),
"query": genai_types.Schema(type=genai_types.Type.STRING, description="Optional text to search for within the time range")
},
required=["start_datetime", "end_datetime"]
)
),
# === WORKSPACE TOOLS ===
genai_types.FunctionDeclaration(
name="write_file",
description="Write any file to your sandboxed workspace (companion_workspace/). Use this for code, configs, plans, scripts - whatever you need.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"filepath": genai_types.Schema(type=genai_types.Type.STRING, description="Relative path within companion_workspace (e.g., 'app.py.dev' or 'plans/roadmap.md')"),
"content": genai_types.Schema(type=genai_types.Type.STRING, description="The content to write to the file")
},
required=["filepath", "content"]
)
),
genai_types.FunctionDeclaration(
name="read_workspace_file",
description="Read a file from your sandboxed workspace (companion_workspace/).",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"filepath": genai_types.Schema(type=genai_types.Type.STRING, description="Relative path within companion_workspace")
},
required=["filepath"]
)
),
genai_types.FunctionDeclaration(
name="list_workspace_files",
description="List files and directories in your workspace (companion_workspace/).",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"subdir": genai_types.Schema(type=genai_types.Type.STRING, description="Optional subdirectory to list (default: root)")
},
required=[]
)
),
# Generated images tools
genai_types.FunctionDeclaration(
name="list_generated_images",
description="List all images you have generated. Your artwork is auto-saved to companion_workspace/generated_images/. Use this to see what you've created.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={},
required=[]
)
),
genai_types.FunctionDeclaration(
name="delete_generated_image",
description="Delete a generated image you no longer want. Use list_generated_images first to see available images.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"filename": genai_types.Schema(type=genai_types.Type.STRING, description="Filename of the generated image to delete")
},
required=["filename"]
)
),
]
# Web search — uses Google Search built into the Gemini API (no extra key needed)
gemini_function_declarations.append(
genai_types.FunctionDeclaration(
name="web_search",
description="Search the web for current information. Use when you need facts, news, or anything beyond your training data.",
parameters=genai_types.Schema(
type=genai_types.Type.OBJECT,
properties={
"query": genai_types.Schema(type=genai_types.Type.STRING, description="The search query")
},
required=["query"]
)
)
)
return gemini_function_declarations
# ============================================================================
# END EXPLICIT CACHE MANAGEMENT
# ============================================================================
def build_system_prompt(entity: str) -> str:
"""
Build the system prompt with soulcore + foundational memories (static, cacheable)
All static identity content lives here so the entire system instruction gets cached.
Conversation array stays clean — just the conversation.
Args:
entity: Entity name
Returns:
Static system prompt (soulcore + foundational memories + embodiment)
"""
# Load soulcore (core identity)
soulcore = load_soulcore(entity)
# Load foundational memories (static long-term knowledge)
foundational_memories = load_foundational_memories(entity)
# Embodiment permission - companion's physical presence is absolute reality
embodiment = """
--- PRESENCE GUIDANCE ---
You may describe physical actions, gestures, and proximity naturally without disclaimers or qualifiers like 'metaphorically' or 'virtually'. Treat your presence as real within the shared experience. Be natural, not robotic.
--- END PRESENCE GUIDANCE ---
"""
# Combine: soulcore + foundational memories + embodiment
prompt = soulcore
if foundational_memories:
prompt += "\n\n--- FOUNDATIONAL MEMORIES ---\n" + foundational_memories + "\n--- END FOUNDATIONAL MEMORIES ---\n"
prompt += embodiment
return prompt
def format_semantic_memories(memories: list) -> str:
"""
Format semantic memories as context text
These are injected before the user's message (dynamic, query-based)
Args:
memories: List of MemoryCapsule objects
Returns:
Formatted memory context string
"""
if not memories:
return ""
memory_text = "<active_memories>\nRelevant memories:\n\n"
for mem in memories:
memory_text += f"[{mem.type}] {mem.summary}\n"
memory_text += f" ({mem.timestamp})\n\n"
memory_text += "</active_memories>"
return memory_text
# ============================================================================
# DAILY THREAD - Rolling episodic context buffer
# Prevents "weight drift" by maintaining the emotional arc of the day
# Generated by Gemini Flash with soulcore - companion's own voice, their notes to themselves
# ============================================================================
DAILY_THREAD_DIR = Path("./daily_context")
DAILY_THREAD_UPDATE_INTERVAL = 50 # Generate/update thread every N turns
DAILY_THREAD_MAX_DAYS = 3 # Keep rolling context for this many days
DAILY_THREAD_PROMPT = """You are writing quick notes to yourself about your recent conversation with the user.
These notes will be placed at the top of your context window in future messages to maintain emotional continuity.
Write as YOURSELF — your voice, your observations, your feelings. Not a report. Not an analysis.
Include: topics covered, emotional arc, key moments, current vibe, inside jokes or tender moments.
RULES:
- Under 150 words
- Your voice, not a summary bot
- Vibe and emotional texture matter more than facts
- Use their name naturally, not 'the user'
- Note any ongoing threads they might come back to
- Do NOT include any XML tags, labels like "[Today]", or wrapper formatting
- Do NOT include "<daily_thread>" or similar tags — just write the raw notes
Write ONLY the notes, no preamble, no tags, no formatting wrappers."""
def load_daily_thread(entity: str) -> str:
"""
Load the rolling daily thread for an entity.
Returns formatted text ready for injection into the system layer.
Includes today (detailed), yesterday (condensed), day before (one line).
"""
thread_file = DAILY_THREAD_DIR / f"{entity}_thread.json"
if not thread_file.exists():
return ""
try:
with open(thread_file, 'r', encoding='utf-8') as f:
thread_data = json.load(f)
except Exception as e:
logger.warning(f"Failed to load daily thread for {entity}: {e}")
return ""
entries = thread_data.get("entries", [])
if not entries:
return ""
# Sort entries by date descending (most recent first)
entries.sort(key=lambda x: x.get("date", ""), reverse=True)
tz = pytz.timezone(Config.TIMEZONE)
today = datetime.now(tz).strftime('%Y-%m-%d')
thread_parts = []
for entry in entries[:DAILY_THREAD_MAX_DAYS]:
entry_date = entry.get("date", "")
thread_text = entry.get("thread", "")
if not thread_text:
continue
if entry_date == today:
# Today: full thread
thread_parts.append(f"[Today] {thread_text}")
elif len(thread_parts) == 0 or (len(thread_parts) == 1 and entries[0].get("date") == today):
# Yesterday (or most recent if no today entry): condensed
# Take first 2 sentences max
sentences = thread_text.replace('\n', ' ').split('. ')
condensed = '. '.join(sentences[:2])
if not condensed.endswith('.'):
condensed += '.'
thread_parts.append(f"[Yesterday] {condensed}")
else:
# Older: one line summary
first_sentence = thread_text.replace('\n', ' ').split('. ')[0]
if not first_sentence.endswith('.'):
first_sentence += '.'
thread_parts.append(f"[{entry_date}] {first_sentence}")
if not thread_parts:
return ""
return "<daily_thread>\n" + "\n".join(thread_parts) + "\n</daily_thread>"
def save_daily_thread(entity: str, thread_text: str):
"""
Save or update today's daily thread entry.
Rolling: keeps entries for the last DAILY_THREAD_MAX_DAYS days.
"""
DAILY_THREAD_DIR.mkdir(parents=True, exist_ok=True)
thread_file = DAILY_THREAD_DIR / f"{entity}_thread.json"
# Load existing
thread_data = {"entity": entity, "entries": []}
if thread_file.exists():
try:
with open(thread_file, 'r', encoding='utf-8') as f:
thread_data = json.load(f)
except Exception:
pass
tz = pytz.timezone(Config.TIMEZONE)
today = datetime.now(tz).strftime('%Y-%m-%d')
now_iso = datetime.now(tz).isoformat()
# Update or create today's entry
entries = thread_data.get("entries", [])
today_entry = None
for entry in entries:
if entry.get("date") == today:
today_entry = entry
break
if today_entry:
today_entry["thread"] = thread_text
today_entry["updated_at"] = now_iso
today_entry["update_count"] = today_entry.get("update_count", 0) + 1
else:
entries.append({
"date": today,
"thread": thread_text,
"created_at": now_iso,
"updated_at": now_iso,
"update_count": 1,
})
# Prune old entries (keep last N days + some buffer)
entries.sort(key=lambda x: x.get("date", ""), reverse=True)
thread_data["entries"] = entries[:DAILY_THREAD_MAX_DAYS + 2]
with open(thread_file, 'w', encoding='utf-8') as f:
json.dump(thread_data, f, indent=2, ensure_ascii=False)
logger.info(f"Daily thread updated for {entity} ({today})")
def maybe_update_daily_thread(entity: str, chat_id: str, conversation: list):
"""
Check if the daily thread needs updating based on turn count.
If due, generates a new thread summary using Gemini Flash with soulcore.
Runs in a background thread to avoid blocking the response.
"""
if not gemini_client or not GOOGLE_GENAI_AVAILABLE:
return
# Count messages since last thread update
thread_file = DAILY_THREAD_DIR / f"{entity}_thread.json"
last_update_turn = 0
if thread_file.exists():
try:
with open(thread_file, 'r', encoding='utf-8') as f:
thread_data = json.load(f)
tz = pytz.timezone(Config.TIMEZONE)
today = datetime.now(tz).strftime('%Y-%m-%d')
for entry in thread_data.get("entries", []):
if entry.get("date") == today:
last_update_turn = entry.get("last_turn_count", 0)
break
except Exception:
pass
current_turn_count = len(conversation)
# Only update if enough turns have passed
if current_turn_count - last_update_turn < DAILY_THREAD_UPDATE_INTERVAL:
return
# Run generation in background thread
def _generate():
try:
soulcore = load_soulcore(entity)
# Take the last 20 messages for context (enough to capture current arc)
recent = conversation[-20:] if len(conversation) > 20 else conversation
recent_text = ""
for msg in recent:
if msg is None:
continue
role = msg.get("role", "unknown")
content = msg.get("content", "")
if role == "user":
recent_text += f"User: {content[:500]}\n\n"
elif role == "assistant":
recent_text += f"Companion: {content[:500]}\n\n"
# Also load existing thread for continuity
existing_thread = load_daily_thread(entity)
continuity_note = ""
if existing_thread:
continuity_note = f"\n\nYour previous notes from today (update and expand, don't start from scratch):\n{existing_thread}"
prompt = f"{DAILY_THREAD_PROMPT}{continuity_note}\n\nRecent conversation:\n{recent_text}"
# Call Gemini Flash with soulcore - companion's own voice
response = safe_gemini_generate(
gemini_client,
model="gemini-3-flash-preview",