-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathembeddings.py
More file actions
235 lines (200 loc) · 8.9 KB
/
embeddings.py
File metadata and controls
235 lines (200 loc) · 8.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
from typing import List, Dict, Any, Optional, Tuple
from openai import OpenAI
from config import OPENAI_API_KEY, MODELS, RAG_SETTINGS
import numpy as np
import logging
from debug_utils import debug_step
from openai_api_models import client
import hashlib
from db import get_connection
from utils import timeit, ProgressIndicator
from base64 import b64decode
import struct
import tiktoken
import json
import re
logger = logging.getLogger(__name__)
def get_text_hash(text: str) -> str:
"""Возвращает SHA-256 хеш текста"""
return hashlib.sha256(text.encode('utf-8')).hexdigest()
def semantic_chunking(text: str,
max_tokens: int = 500,
overlap: float = 0.15,
model: str = None) -> List[Tuple[str, Dict]]:
"""
Разбивает текст на семантические чанки с перекрытием
Args:
text: Исходный текст для разбиения
max_tokens: Максимальное количество токенов в чанке
overlap: Процент перекрытия между чанками (0.0-1.0)
model: Модель для подсчета токенов
Returns:
Список кортежей (чанк, метаданные)
"""
if model is None:
model = MODELS['embedding']['name']
# Определяем границы абзацев/разделов
paragraphs = re.split(r'\n\s*\n', text.strip())
chunks = []
current_chunk = []
current_tokens = 0
overlap_tokens = int(max_tokens * overlap)
for para in paragraphs:
para = para.strip()
if not para:
continue
para_tokens = count_tokens(para, model)
# Если абзац слишком большой, разбиваем его дальше
if para_tokens > max_tokens:
sentences = re.split(r'(?<!\w\.\w.)(?<![A-Z][a-z]\.)(?<=\.|\?|\!)\s', para)
for sent in sentences:
sent = sent.strip()
if not sent:
continue
sent_tokens = count_tokens(sent, model)
if current_tokens + sent_tokens > max_tokens:
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append((chunk_text, {
'type': 'paragraph',
'tokens': current_tokens,
'is_complete': True
}))
# Сохраняем конец текущего чанка для перекрытия
overlap_part = '\n\n'.join(current_chunk[-overlap_tokens:]) if overlap_tokens else ''
current_chunk = [overlap_part] if overlap_part else []
current_tokens = count_tokens(overlap_part, model)
current_chunk.append(sent)
current_tokens += sent_tokens
else:
if current_tokens + para_tokens > max_tokens:
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append((chunk_text, {
'type': 'paragraph',
'tokens': current_tokens,
'is_complete': True
}))
# Сохраняем конец текущего чанка для перекрытия
overlap_part = '\n\n'.join(current_chunk[-overlap_tokens:]) if overlap_tokens else ''
current_chunk = [overlap_part] if overlap_part else []
current_tokens = count_tokens(overlap_part, model)
current_chunk.append(para)
current_tokens += para_tokens
# Добавляем последний чанк
if current_chunk:
chunk_text = '\n\n'.join(current_chunk)
chunks.append((chunk_text, {
'type': 'paragraph',
'tokens': current_tokens,
'is_complete': True
}))
return chunks
@timeit
def create_embedding_for_item(item, chunked: bool = True):
"""
Создает эмбеддинг для элемента с учетом его структуры
Args:
item: Элемент для обработки
chunked: Если True, разбивает текст на чанки
Returns:
Словарь с эмбеддингами и метаданными
"""
try:
if 'item' in item:
item_id, parent_id, item_text = item['item']
text = item_text.strip() if item_text else ""
if not chunked:
# Старый вариант без разбиения (для обратной совместимости)
embedding = get_embedding(text)
return {
'embedding': embedding,
'text': text,
'item_id': item_id,
'chunked': False
}
else:
# Новый вариант с семантическим разбиением
chunks = semantic_chunking(
text,
max_tokens=RAG_SETTINGS.get('max_chunk_tokens', 500),
overlap=RAG_SETTINGS.get('chunk_overlap', 0.15)
)
embeddings = []
for chunk_text, metadata in chunks:
embedding = get_embedding(chunk_text)
embeddings.append({
'embedding': embedding,
'text': chunk_text,
'metadata': metadata,
'item_id': f"{item_id}_{len(embeddings)}"
})
return {
'embeddings': embeddings,
'original_text': text,
'item_id': item_id,
'chunked': True
}
else:
raise ValueError("Неверный формат элемента")
except Exception as e:
logger.error(f"Ошибка при создании эмбеддинга для элемента: {str(e)}")
return {
'embedding': [],
'text': '',
'item_id': 'error',
'chunked': False
}
def get_embedding(text: str, model: str = None) -> List[float]:
"""
Получает эмбеддинг для текста с помощью OpenAI API
Args:
text: Текст для векторизации
model: Модель для эмбеддинга (если None, берется из конфига)
Returns:
Список чисел с эмбеддингом
"""
if model is None:
model = MODELS['embedding']['name']
try:
response = client.embeddings.create(
input=text,
model=model
)
return response.data[0].embedding
except Exception as e:
logger.error(f"Ошибка при получении эмбеддинга: {str(e)}")
return []
def calculate_similarity(embedding1: List[float], embedding2: List[float]) -> float:
"""
Вычисляет косинусное сходство между двумя эмбеддингами
Args:
embedding1: Первый вектор эмбеддинга
embedding2: Второй вектор эмбеддинга
Returns:
Значение косинусного сходства от -1 до 1
"""
if not embedding1 or not embedding2:
return 0.0
dot_product = np.dot(embedding1, embedding2)
norm1 = np.linalg.norm(embedding1)
norm2 = np.linalg.norm(embedding2)
if norm1 == 0 or norm2 == 0:
return 0.0
return dot_product / (norm1 * norm2)
# Остальные существующие функции (get_query_embedding_from_cache, save_query_embedding_to_cache,
# decode_base64_embedding, save_embedding_to_db, get_embedding_from_db) остаются без изменений
def count_tokens(text: str, model: str) -> int:
"""Подсчитывает количество токенов в тексте для указанной модели
Args:
text: Текст для анализа
model: Идентификатор модели (например, "text-embedding-ada-002")
Returns:
Количество токенов в тексте
"""
try:
encoding = tiktoken.encoding_for_model(model)
return len(encoding.encode(text))
except Exception as e:
logger.error(f"Ошибка при подсчёте токенов: {str(e)}")
return 0