-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm_client.py
More file actions
968 lines (805 loc) · 41.5 KB
/
llm_client.py
File metadata and controls
968 lines (805 loc) · 41.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
"""
LLM Client wrapper for OpenAI-compatible APIs (LM Studio and OpenAI)
"""
from typing import Optional, List, Dict
from openai import OpenAI
from openai import APIConnectionError, APITimeoutError, RateLimitError, APIError
import logging
import tiktoken
import config
class LLMClient:
"""Client for OpenAI-compatible APIs (works with LM Studio and OpenAI)"""
def __init__(self, api_type: Optional[str] = None, logger: Optional[logging.Logger] = None):
"""
Initialize LLM client.
Args:
api_type: 'lmstudio' or 'openai'. If None, uses config.API_TYPE
logger: Logger instance (creates one if None)
"""
self.api_type = api_type or config.API_TYPE
self.logger = logger or logging.getLogger(__name__)
api_config = config.get_api_config(api_type=self.api_type)
self.client = OpenAI(
api_key=api_config['api_key'],
base_url=api_config['base_url']
)
def extract_relevant_content(
self,
chunk_text: str,
question: str,
videos: List[Dict[str, str]],
chunk_id: Optional[int] = None,
model: Optional[str] = None,
temperature: Optional[float] = None,
return_logs: bool = False
) -> str:
"""
Extract relevant content from a chunk using LLM.
Chunk may contain one or more complete videos.
Args:
chunk_text: Text chunk to analyze (may contain multiple videos)
question: User's question
videos: List of video metadata dicts with 'video_title', 'video_url', etc.
chunk_id: Chunk ID for logging
model: Model name (uses config.EXTRACTION_MODEL if None)
temperature: Temperature (uses config.EXTRACTION_TEMPERATURE if None)
Returns:
Extracted relevant content or "NO_RELEVANT_CONTENT"
"""
model = model or config.EXTRACTION_MODEL
temperature = temperature if temperature is not None else config.EXTRACTION_TEMPERATURE
# Format video titles for prompt and logging
if len(videos) == 1:
video_info = f"from video: \"{videos[0]['video_title']}\""
video_names = videos[0]['video_title']
else:
video_titles = [v['video_title'] for v in videos]
video_info = f"from {len(videos)} videos: {', '.join(video_titles)}"
video_names = f"{len(videos)} videos: {', '.join(video_titles[:3])}{'...' if len(video_titles) > 3 else ''}"
chunk_info = f"Chunk {chunk_id}" if chunk_id is not None else "Chunk"
self.logger.debug(f"[EXTRACTION] {chunk_info} - Processing {video_names}")
prompt = f"""You are analyzing transcript chunks to extract information relevant to a user's question.
Each chunk contains one or more complete videos. Videos are separated by "---VIDEO---" markers.
User's question: {question}
Transcript chunk ({video_info}):
{chunk_text}
Task: Extract ALL content from this chunk that is relevant to the user's question.
Instructions:
- Extract relevant sentences, paragraphs, or passages from any video in this chunk
- Preserve the original wording (don't paraphrase)
- Include which video each excerpt is from (use the video title)
- If nothing is relevant, respond with: "NO_RELEVANT_CONTENT"
- If content is relevant, output the relevant excerpts verbatim with video attribution
- Include context if needed for understanding
Output format:
RELEVANT_CONTENT:
[VIDEO: Video Title]
[relevant excerpts from this video]
[VIDEO: Another Video Title]
[relevant excerpts from this video]
OR
NO_RELEVANT_CONTENT"""
# Log full input (for verification that videos aren't split)
self.logger.debug(f"[EXTRACTION INPUT] {chunk_info} - Full prompt sent to LLM:")
self.logger.debug(f"[EXTRACTION INPUT] {chunk_info} - Videos in chunk: {len(videos)}")
for i, video in enumerate(videos, 1):
self.logger.debug(f"[EXTRACTION INPUT] {chunk_info} - Video {i}: {video['video_title']} (ID: {video['video_id']})")
self.logger.debug(f"[EXTRACTION INPUT] {chunk_info} - Full prompt:\n{prompt}")
# GPT-5.2 models don't support custom temperature (only default 1)
# Only include temperature parameter if it's not the default
create_kwargs = {
"model": model,
"messages": [
{"role": "system", "content": "You are a helpful assistant that extracts relevant information from transcript chunks."},
{"role": "user", "content": prompt}
],
"max_completion_tokens": 1000 # Limit extraction output (GPT-5.2 uses max_completion_tokens)
}
# GPT-5.2 models only support default temperature (1), so don't set it for these models
if not (model and ("gpt-5" in model.lower() or "gpt-5.2" in model.lower())):
create_kwargs["temperature"] = temperature
# Prepare log data
log_data = {
"chunk_id": chunk_id,
"model": model,
"system_prompt": "You are a helpful assistant that extracts relevant information from transcript chunks.",
"user_prompt": prompt,
"request_params": create_kwargs.copy(),
"videos": [{"video_id": v['video_id'], "video_title": v['video_title']} for v in videos]
}
try:
response = self.client.chat.completions.create(**create_kwargs)
content = response.choices[0].message.content.strip()
raw_response = response.choices[0].message.content
# Capture full response details
log_data["raw_response"] = raw_response
log_data["processed_response"] = content
# Log token usage
if hasattr(response, 'usage') and response.usage:
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
log_data["token_usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
self.logger.info(f"[EXTRACTION] {chunk_info} - Tokens: {prompt_tokens:,} prompt + {completion_tokens:,} completion = {total_tokens:,} total")
# Show context usage (adjust based on your model's context window)
if total_tokens > 0:
context_limit = config.CONTEXT_LIMIT
context_usage_pct = (total_tokens / context_limit) * 100
log_data["context_usage"] = {
"used": total_tokens,
"limit": context_limit,
"percentage": context_usage_pct
}
self.logger.info(f"[EXTRACTION] {chunk_info} - Context usage: {total_tokens:,}/{context_limit:,} tokens ({context_usage_pct:.1f}%)")
# Check if it's no relevant content
if "NO_RELEVANT_CONTENT" in content.upper():
self.logger.info(f"[EXTRACTION] {chunk_info} - NO_RELEVANT_CONTENT")
log_data["result"] = "NO_RELEVANT_CONTENT"
return ("NO_RELEVANT_CONTENT", log_data) if return_logs else "NO_RELEVANT_CONTENT"
# Extract content after "RELEVANT_CONTENT:" if present
if "RELEVANT_CONTENT:" in content:
content = content.split("RELEVANT_CONTENT:")[-1].strip()
log_data["result"] = "SUCCESS"
log_data["extracted_content"] = content
# Log the extracted content
preview = content[:200] + "..." if len(content) > 200 else content
self.logger.info(f"[EXTRACTION] {chunk_info} - Extracted relevant content:")
self.logger.info(f" {video_names}")
self.logger.info(f" Content preview: {preview}")
# Full response is logged at DEBUG level (will be in log file)
self.logger.debug(f"[EXTRACTION OUTPUT] {chunk_info} - Full AI response:\n{raw_response}")
self.logger.debug(f"[EXTRACTION OUTPUT] {chunk_info} - Processed extracted content:\n{content}")
# Return content with video attribution included
return (content, log_data) if return_logs else content
except APIConnectionError as e:
# Connection errors (network issues, API unreachable)
detailed_error = f"Connection error: Cannot reach API. {str(e)}"
self.logger.error(f"[EXTRACTION] {chunk_info} - {detailed_error}")
self.logger.error(f"[EXTRACTION] {chunk_info} - API Type: {self.api_type}, Base URL: {self.client.base_url}")
if self.api_type == "openai":
self.logger.error(f"[EXTRACTION] {chunk_info} - Check your internet connection and OpenAI API status")
if not config.OPENAI_API_KEY:
self.logger.error(f"[EXTRACTION] {chunk_info} - WARNING: OPENAI_API_KEY is not set!")
elif self.api_type == "lmstudio":
self.logger.error(f"[EXTRACTION] {chunk_info} - WARNING: Is LM Studio running at {self.client.base_url}?")
log_data["error"] = detailed_error
log_data["error_type"] = "APIConnectionError"
log_data["result"] = "ERROR"
return ("NO_RELEVANT_CONTENT", log_data) if return_logs else "NO_RELEVANT_CONTENT"
except APITimeoutError as e:
# Timeout errors
detailed_error = f"Timeout error: Request took too long. {str(e)}"
self.logger.error(f"[EXTRACTION] {chunk_info} - {detailed_error}")
log_data["error"] = detailed_error
log_data["error_type"] = "APITimeoutError"
log_data["result"] = "ERROR"
return ("NO_RELEVANT_CONTENT", log_data) if return_logs else "NO_RELEVANT_CONTENT"
except RateLimitError as e:
# Rate limit errors
detailed_error = f"Rate limit error: {str(e)}"
self.logger.error(f"[EXTRACTION] {chunk_info} - {detailed_error}")
self.logger.error(f"[EXTRACTION] {chunk_info} - You've hit rate limits. Try reducing MAX_PARALLEL_EXTRACTIONS or wait a moment.")
log_data["error"] = detailed_error
log_data["error_type"] = "RateLimitError"
log_data["result"] = "ERROR"
return ("NO_RELEVANT_CONTENT", log_data) if return_logs else "NO_RELEVANT_CONTENT"
except APIError as e:
# Other API errors (authentication, invalid request, etc.)
error_code = getattr(e, 'status_code', None) or getattr(e, 'code', None) or 'unknown'
error_message = str(e)
detailed_error = f"API error (code {error_code}): {error_message}"
self.logger.error(f"[EXTRACTION] {chunk_info} - {detailed_error}")
if error_code == 401:
self.logger.error(f"[EXTRACTION] {chunk_info} - Authentication failed. Check your API key.")
elif error_code == 429:
self.logger.error(f"[EXTRACTION] {chunk_info} - Rate limit exceeded. Try reducing parallel requests or wait.")
elif error_code == 500 or error_code == 502 or error_code == 503:
self.logger.error(f"[EXTRACTION] {chunk_info} - API server error. The service may be temporarily unavailable.")
log_data["error"] = detailed_error
log_data["error_type"] = "APIError"
log_data["error_code"] = error_code
log_data["result"] = "ERROR"
return ("NO_RELEVANT_CONTENT", log_data) if return_logs else "NO_RELEVANT_CONTENT"
except Exception as e:
# Catch-all for other errors
error_type = type(e).__name__
error_message = str(e)
detailed_error = f"{error_type}: {error_message}"
self.logger.error(f"[EXTRACTION] {chunk_info} - Unexpected error: {detailed_error}")
log_data["error"] = detailed_error
log_data["error_type"] = error_type
log_data["result"] = "ERROR"
return ("NO_RELEVANT_CONTENT", log_data) if return_logs else "NO_RELEVANT_CONTENT"
def synthesize_answer(
self,
question: str,
extracted_content: List[Dict[str, str]],
model: Optional[str] = None,
temperature: Optional[float] = None,
return_logs: bool = False
) -> Dict[str, any]:
"""
Synthesize final answer from extracted content.
Args:
question: User's question
extracted_content: List of dicts with 'content', 'video_title', 'video_url', 'chunk_id'
model: Model name (uses config.SYNTHESIS_MODEL if None)
temperature: Temperature (uses config.SYNTHESIS_TEMPERATURE if None)
Returns:
Dict with 'answer' and 'sources'
"""
model = model or config.SYNTHESIS_MODEL
temperature = temperature if temperature is not None else config.SYNTHESIS_TEMPERATURE
# Format extracted content with sources (collect all chunk_ids per video)
content_text = ""
video_to_sources: Dict[str, Dict] = {} # video_title -> {video_title, video_url, chunk_ids}
for i, item in enumerate(extracted_content, 1):
content_text += f"\n[Extract {i} - From: {item['video_title']}]\n"
content_text += f"{item['content']}\n"
title = item['video_title']
cid = item.get('chunk_id')
if title not in video_to_sources:
video_to_sources[title] = {
'video_title': title,
'video_url': item.get('video_url', ''),
'chunk_ids': [],
}
arr = video_to_sources[title]['chunk_ids']
if cid is not None and cid not in arr:
try:
arr.append(int(cid))
except (TypeError, ValueError):
pass
sources = list(video_to_sources.values())
prompt = f"""You are synthesizing a comprehensive answer from extracted transcript content.
User's question: {question}
Extracted relevant content from transcripts:
{content_text}
Task: Provide a comprehensive answer to the user's question based on ALL the extracted content.
Instructions:
- Synthesize information from all sources
- Organize the answer logically
- Include all relevant details (don't summarize too aggressively)
- Cite sources (which videos mentioned what)
- If information conflicts, note the different perspectives
- If information is incomplete, note what's missing
Output format:
COMPREHENSIVE_ANSWER:
[detailed answer]
SOURCES:
- Video: [title] ([url])
- Video: [title] ([url])
..."""
# Prepare log data
log_data = {
"model": model,
"system_prompt": "You are a helpful assistant that synthesizes comprehensive answers from multiple sources.",
"user_prompt": prompt,
"extracted_content_count": len(extracted_content),
"extracted_content_preview": content_text[:500] + "..." if len(content_text) > 500 else content_text
}
# GPT-5.2 models don't support custom temperature (only default 1)
# Only include temperature parameter if it's not the default
create_kwargs = {
"model": model,
"messages": [
{"role": "system", "content": "You are a helpful assistant that synthesizes comprehensive answers from multiple sources."},
{"role": "user", "content": prompt}
],
"max_completion_tokens": 4000 # Allow longer answers (GPT-5.2 uses max_completion_tokens)
}
# GPT-5.2 models only support default temperature (1), so don't set it for these models
if not (model and ("gpt-5" in model.lower() or "gpt-5.2" in model.lower())):
create_kwargs["temperature"] = temperature
log_data["request_params"] = create_kwargs.copy()
try:
response = self.client.chat.completions.create(**create_kwargs)
answer_text = response.choices[0].message.content.strip()
raw_response = response.choices[0].message.content
log_data["raw_response"] = raw_response
log_data["processed_response"] = answer_text
# Log token usage
if hasattr(response, 'usage') and response.usage:
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
log_data["token_usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
self.logger.info(f"[SYNTHESIS] Tokens: {prompt_tokens:,} prompt + {completion_tokens:,} completion = {total_tokens:,} total")
# Show context usage (adjust based on your model's context window)
if total_tokens > 0:
context_limit = config.CONTEXT_LIMIT
context_usage_pct = (total_tokens / context_limit) * 100
log_data["context_usage"] = {
"used": total_tokens,
"limit": context_limit,
"percentage": context_usage_pct
}
self.logger.info(f"[SYNTHESIS] Context usage: {total_tokens:,}/{context_limit:,} tokens ({context_usage_pct:.1f}%)")
if context_usage_pct > 100:
self.logger.warning(f"[SYNTHESIS] WARNING: Context usage exceeds limit! This may cause errors.")
# Extract answer and sources if formatted
if "COMPREHENSIVE_ANSWER:" in answer_text:
parts = answer_text.split("SOURCES:")
answer = parts[0].split("COMPREHENSIVE_ANSWER:")[-1].strip()
sources_text = parts[1].strip() if len(parts) > 1 else ""
else:
answer = answer_text
sources_text = ""
log_data["result"] = "SUCCESS"
log_data["final_answer"] = answer
# Log the synthesized answer
self.logger.info(f"[SYNTHESIS] Synthesized answer from {len(extracted_content)} extracts")
answer_preview = answer[:300] + "..." if len(answer) > 300 else answer
self.logger.info(f"[SYNTHESIS] Answer preview:\n{answer_preview}")
# Full response is logged at DEBUG level (will be in log file)
self.logger.debug(f"[SYNTHESIS OUTPUT] Full AI response:\n{answer_text}")
self.logger.debug(f"[SYNTHESIS OUTPUT] Processed answer:\n{answer}")
result = {
'answer': answer,
'sources': sources, # Use the sources we already collected
'raw_response': answer_text
}
if return_logs:
result['synthesis_log'] = log_data
return result
except Exception as e:
self.logger.error(f"[SYNTHESIS] Error: {e}")
log_data["error"] = str(e)
log_data["result"] = "ERROR"
result = {
'answer': f"Error synthesizing answer: {e}",
'sources': sources,
'raw_response': ""
}
if return_logs:
result['synthesis_log'] = log_data
return result
def generate_rag_queries(
self,
user_question: str,
max_queries: Optional[int] = None,
chat_context: Optional[str] = None,
model: Optional[str] = None,
) -> List[str]:
"""
Generate 1–N RAG search queries from the user's question (and optional context).
Used for Fast RAG: AI infers what to search for; RAG is searched with all queries.
Args:
user_question: User's question
max_queries: Max number of queries (default config.MAX_RAG_QUERIES)
chat_context: Optional recent chat context
model: Model name (uses config.SYNTHESIS_MODEL if None)
Returns:
List of 1–N query strings. Fallback to [user_question] if parse fails or empty.
"""
import json
import re
model = model or config.SYNTHESIS_MODEL
max_queries = max_queries if max_queries is not None else config.MAX_RAG_QUERIES
max_queries = max(1, min(max_queries, 20))
context_block = ""
if chat_context and chat_context.strip():
context_block = f"\n\nRECENT CONVERSATION:\n{chat_context.strip()}"
prompt = f"""You are helping to search a RAG system over video transcripts. Given the user's question, generate 1–{max_queries} short search queries to run against the RAG. Each query should target different aspects, synonyms, sub-questions, or related concepts. Be concise; each query is a short search string (a few words to a short phrase).
{context_block}
USER'S QUESTION: {user_question}
TASK: Generate 1–{max_queries} search queries. Output ONLY a JSON array of strings, one query per element. Example:
["query one", "query two", "synonym or related concept"]
Do not include any other text, markdown, or explanation."""
try:
r = self.client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": "You output only valid JSON arrays of search query strings."},
{"role": "user", "content": prompt},
],
max_completion_tokens=500,
)
raw = (r.choices[0].message.content or "").strip()
# Try JSON array first
m = re.search(r'\[[\s\S]*?\]', raw)
if m:
arr = json.loads(m.group())
if isinstance(arr, list):
out = [str(x).strip() for x in arr if x and str(x).strip()]
if out:
return out[:max_queries]
# Fallback: one query per line
lines = [ln.strip() for ln in raw.splitlines() if ln.strip() and not ln.strip().startswith(("#", "["))]
if lines:
return lines[:max_queries]
except Exception as e:
self.logger.warning(f"[RAG QUERIES] Parse or API error: {e}")
return [user_question]
def decide_if_search_needed(
self,
question: str,
chat_history: List[Dict],
existing_extracted_content: List[Dict],
model: Optional[str] = None,
return_logs: bool = False
) -> Dict:
"""
Decide if a new transcript search is needed to answer the question.
Args:
question: User's current question
chat_history: List of previous messages with 'role' and 'content'
existing_extracted_content: Previously extracted content from transcripts
model: Model to use for decision
return_logs: Whether to return detailed logs
Returns:
Dict with 'needs_search' (bool), 'reasoning' (str), 'search_query' (str if needed)
"""
model = model or config.SYNTHESIS_MODEL
# Format chat history
history_text = ""
for msg in chat_history[-10:]: # Last 10 messages
role = "User" if msg.get('role') == 'user' else "Assistant"
history_text += f"{role}: {msg.get('content', '')}\n\n"
# Format existing extracted content
content_text = ""
if existing_extracted_content:
for i, item in enumerate(existing_extracted_content[:20], 1): # Limit to 20 items
content_text += f"[Extract {i} - From: {item.get('video_title', 'Unknown')}]\n"
content_text += f"{item.get('content', '')[:500]}...\n\n"
prompt = f"""You are an AI assistant deciding whether you need to search through video transcripts to answer a user's question.
CURRENT USER QUESTION: {question}
PREVIOUS CONVERSATION:
{history_text if history_text else "No previous conversation."}
PREVIOUSLY EXTRACTED CONTENT FROM TRANSCRIPTS:
{content_text if content_text else "No previously extracted content available."}
TASK: Decide if you need to search the transcripts for more information to answer this question.
Consider:
1. Do you have enough context from the conversation and previously extracted content?
2. Is the user asking about something new that wasn't covered before?
3. Is the user explicitly asking you to search or look for more information?
4. Would more information from the transcripts help provide a better answer?
IMPORTANT: If the user explicitly says things like "search", "look for", "find more", "check the transcripts", always say YES to searching.
Respond in this EXACT format:
DECISION: YES or NO
REASONING: [Brief explanation of why you do or don't need to search]
SEARCH_QUERY: [If YES, provide an optimized search query to find the needed information. If NO, leave empty]"""
log_data = {
"type": "decision",
"model": model,
"question": question,
"chat_history_count": len(chat_history),
"existing_content_count": len(existing_extracted_content),
"prompt": prompt
}
create_kwargs = {
"model": model,
"messages": [
{"role": "system", "content": "You are a helpful assistant that decides whether to search for more information."},
{"role": "user", "content": prompt}
],
"max_completion_tokens": 500
}
try:
response = self.client.chat.completions.create(**create_kwargs)
response_text = response.choices[0].message.content.strip()
log_data["raw_response"] = response_text
# Parse response
needs_search = "DECISION: YES" in response_text.upper()
reasoning = ""
if "REASONING:" in response_text:
reasoning = response_text.split("REASONING:")[-1].split("SEARCH_QUERY:")[0].strip()
search_query = ""
if needs_search and "SEARCH_QUERY:" in response_text:
search_query = response_text.split("SEARCH_QUERY:")[-1].strip()
result = {
"needs_search": needs_search,
"reasoning": reasoning,
"search_query": search_query if needs_search else ""
}
log_data["result"] = result
self.logger.info(f"[DECISION] Needs search: {needs_search}")
self.logger.info(f"[DECISION] Reasoning: {reasoning}")
if search_query:
self.logger.info(f"[DECISION] Search query: {search_query}")
if return_logs:
result["decision_log"] = log_data
return result
except Exception as e:
self.logger.error(f"[DECISION] Error: {e}")
log_data["error"] = str(e)
# Default to searching on error
result = {
"needs_search": True,
"reasoning": f"Error in decision: {e}. Defaulting to search.",
"search_query": question
}
if return_logs:
result["decision_log"] = log_data
return result
def answer_from_context(
self,
question: str,
chat_history: List[Dict],
existing_extracted_content: List[Dict],
model: Optional[str] = None,
return_logs: bool = False
) -> Dict:
"""
Answer a question using only existing context (no new search).
Args:
question: User's current question
chat_history: Previous conversation
existing_extracted_content: Previously extracted transcript content
model: Model to use
return_logs: Whether to return logs
Returns:
Dict with 'answer', 'sources', 'from_context' flag
"""
model = model or config.SYNTHESIS_MODEL
# Format chat history
history_text = ""
for msg in chat_history[-10:]:
role = "User" if msg.get('role') == 'user' else "Assistant"
history_text += f"{role}: {msg.get('content', '')}\n\n"
# Format existing content; collect chunk_ids per video (same shape as synthesize)
content_text = ""
video_to_sources: Dict[str, Dict] = {}
for item in existing_extracted_content:
content_text += f"[From: {item.get('video_title', 'Unknown')}]\n"
content_text += f"{item.get('content', '')}\n\n"
title = item.get('video_title', 'Unknown')
cid = item.get('chunk_id')
if title not in video_to_sources:
video_to_sources[title] = {'video_title': title, 'video_url': item.get('video_url', ''), 'chunk_ids': []}
arr = video_to_sources[title]['chunk_ids']
if cid is not None and cid not in arr:
try:
arr.append(int(cid))
except (TypeError, ValueError):
pass
sources = list(video_to_sources.values())
prompt = f"""You are answering a follow-up question using existing conversation context and previously extracted transcript content.
CURRENT QUESTION: {question}
PREVIOUS CONVERSATION:
{history_text}
PREVIOUSLY EXTRACTED CONTENT FROM TRANSCRIPTS:
{content_text if content_text else "No extracted content available."}
TASK: Answer the user's question based on the conversation context and extracted content above.
Instructions:
- Use the information from the conversation and extracted content
- Be helpful and thorough
- If you're not certain about something based on the available context, say so
- Reference which sources you're drawing from when relevant"""
log_data = {
"type": "context_answer",
"model": model,
"question": question,
"prompt": prompt,
"sources_count": len(sources)
}
create_kwargs = {
"model": model,
"messages": [
{"role": "system", "content": "You are a helpful assistant answering questions based on conversation context."},
{"role": "user", "content": prompt}
],
"max_completion_tokens": 2000
}
try:
response = self.client.chat.completions.create(**create_kwargs)
answer = response.choices[0].message.content.strip()
# Get token usage from response if available
if hasattr(response, 'usage') and response.usage:
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
else:
# Fallback: estimate tokens
try:
encoding = tiktoken.encoding_for_model(model)
except (KeyError, ValueError):
encoding = tiktoken.get_encoding("cl100k_base")
prompt_tokens = len(encoding.encode(prompt))
completion_tokens = len(encoding.encode(answer))
total_tokens = prompt_tokens + completion_tokens
# Calculate context usage percentage
context_limit = config.SYNTHESIS_CONTEXT_LIMIT
context_usage_pct = (total_tokens / context_limit) * 100 if context_limit > 0 else 0
log_data["raw_response"] = answer
log_data["result"] = "SUCCESS"
log_data["token_usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
log_data["context_usage"] = {
"used": total_tokens,
"limit": context_limit,
"percentage": context_usage_pct
}
self.logger.info(f"[CONTEXT_ANSWER] Tokens: {prompt_tokens:,} prompt + {completion_tokens:,} completion = {total_tokens:,} total ({context_usage_pct:.1f}% of {context_limit:,})")
result = {
'answer': answer,
'sources': sources,
'from_context': True,
'raw_response': answer,
'context_usage_pct': context_usage_pct,
'total_tokens': total_tokens
}
if return_logs:
result['context_answer_log'] = log_data
return result
except Exception as e:
self.logger.error(f"[CONTEXT_ANSWER] Error: {e}")
log_data["error"] = str(e)
log_data["result"] = "ERROR"
result = {
'answer': f"Error answering from context: {e}",
'sources': sources,
'from_context': True,
'raw_response': ""
}
if return_logs:
result['context_answer_log'] = log_data
return result
def consolidate_extracted_content(
self,
existing_extracted_content: List[Dict],
model: Optional[str] = None,
return_logs: bool = False
) -> Dict:
"""
Consolidate and summarize extracted content to reduce token usage.
Args:
existing_extracted_content: Previously extracted transcript content to consolidate
model: Model to use for consolidation
return_logs: Whether to return logs
Returns:
Dict with 'consolidated_content' (List[Dict]) and optional 'consolidation_log'
"""
model = model or config.SYNTHESIS_MODEL
# Format existing content for summarization
content_by_video = {}
for item in existing_extracted_content:
video_title = item.get('video_title', 'Unknown')
video_url = item.get('video_url', '')
chunk_id = item.get('chunk_id', '')
if video_title not in content_by_video:
content_by_video[video_title] = {
'video_title': video_title,
'video_url': video_url,
'content_parts': [],
'chunk_ids': []
}
content_by_video[video_title]['content_parts'].append(item.get('content', ''))
if chunk_id and chunk_id not in content_by_video[video_title]['chunk_ids']:
content_by_video[video_title]['chunk_ids'].append(chunk_id)
# Create consolidation prompt
content_text = ""
for video_title, video_data in content_by_video.items():
content_text += f"\n[Video: {video_title}]\n"
content_text += "\n".join(video_data['content_parts'])
content_text += "\n\n"
prompt = f"""You are consolidating previously extracted transcript content to make it more concise while preserving all important information.
EXTRACTED CONTENT TO CONSOLIDATE:
{content_text}
TASK: Create a comprehensive, consolidated summary of this content.
- Preserve all key information, facts, and details
- Combine related information from the same video
- Maintain references to source videos
- Make it more concise while keeping everything important
- Structure it clearly by topic/theme when possible
Format your response as:
CONSOLIDATED_CONTENT:
[Video: Video Title]
[Summary of all content from this video, combining related information]
[Video: Another Video Title]
[Summary of all content from this video]
..."""
log_data = {
"type": "consolidation",
"model": model,
"input_items": len(existing_extracted_content),
"input_videos": len(content_by_video)
}
create_kwargs = {
"model": model,
"messages": [
{"role": "system", "content": "You are an expert at consolidating and summarizing information while preserving all important details."},
{"role": "user", "content": prompt}
],
"max_completion_tokens": 4000,
"temperature": 0.3
}
try:
response = self.client.chat.completions.create(**create_kwargs)
consolidated_text = response.choices[0].message.content.strip()
# Parse consolidated content back into structured format
consolidated_content = []
current_video = None
current_content = []
current_url = ""
current_chunk_ids = []
for line in consolidated_text.split('\n'):
if line.startswith('[Video:') and ']' in line:
if current_video and current_content:
consolidated_content.append({
'video_title': current_video,
'video_url': current_url,
'content': '\n'.join(current_content).strip(),
'chunk_id': ','.join(current_chunk_ids) if current_chunk_ids else 'consolidated'
})
video_part = line.split('[Video:')[1].split(']')[0].strip()
current_video = video_part
if video_part in content_by_video:
current_url = content_by_video[video_part].get('video_url', '')
current_chunk_ids = content_by_video[video_part].get('chunk_ids', [])
current_content = []
elif current_video and line.strip() and not line.startswith('CONSOLIDATED_CONTENT:'):
current_content.append(line)
if current_video and current_content:
consolidated_content.append({
'video_title': current_video,
'video_url': current_url,
'content': '\n'.join(current_content).strip(),
'chunk_id': ','.join(current_chunk_ids) if current_chunk_ids else 'consolidated'
})
if not consolidated_content:
all_videos = list(set(item.get('video_title', 'Unknown') for item in existing_extracted_content))
all_urls = {}
for item in existing_extracted_content:
title = item.get('video_title', 'Unknown')
if title not in all_urls:
all_urls[title] = item.get('video_url', '')
consolidated_content.append({
'video_title': ', '.join(all_videos[:5]) + ('...' if len(all_videos) > 5 else ''),
'video_url': all_urls.get(all_videos[0], '') if all_videos else '',
'content': consolidated_text,
'chunk_id': 'consolidated'
})
if hasattr(response, 'usage') and response.usage:
prompt_tokens = response.usage.prompt_tokens
completion_tokens = response.usage.completion_tokens
total_tokens = response.usage.total_tokens
else:
try:
encoding = tiktoken.encoding_for_model(model)
except (KeyError, ValueError):
encoding = tiktoken.get_encoding("cl100k_base")
prompt_tokens = len(encoding.encode(prompt))
completion_tokens = len(encoding.encode(consolidated_text))
total_tokens = prompt_tokens + completion_tokens
log_data["raw_response"] = consolidated_text
log_data["result"] = "SUCCESS"
log_data["output_items"] = len(consolidated_content)
log_data["token_usage"] = {
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens
}
self.logger.info(f"[CONSOLIDATION] Consolidated {len(existing_extracted_content)} items from {len(content_by_video)} videos into {len(consolidated_content)} items")
result = {
'consolidated_content': consolidated_content,
'original_count': len(existing_extracted_content),
'consolidated_count': len(consolidated_content)
}
if return_logs:
result['consolidation_log'] = log_data
return result
except Exception as e:
self.logger.error(f"[CONSOLIDATION] Error: {e}")
log_data["error"] = str(e)
log_data["result"] = "ERROR"
result = {
'consolidated_content': existing_extracted_content,
'original_count': len(existing_extracted_content),
'consolidated_count': len(existing_extracted_content),
'error': str(e)
}
if return_logs:
result['consolidation_log'] = log_data
return result