Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,16 @@ conf/local.service_conf.yaml
docker/.env
docker/launch_backend_service.sh
docker/.env.oceanbase
local.service_conf.yaml
local.service_conf.yaml

# Generated by scripts/deploy.sh (runtime configs)
conf/service_conf_ragflow_*.yaml
nginx_conf/

logs/
pods/
upload_wiki_json.pid
.ragflow_secret_key
setup_tools_venv.sh
build_tools_bundle.sh
upload_snapshot.json
191 changes: 161 additions & 30 deletions api/apps/sdk/doc.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
# limitations under the License.
#
import datetime
import io
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Module 'io' is imported with both 'import' and 'import from'.

Copilot uses AI. Check for mistakes.
import json
import logging
import pathlib
import re
from io import BytesIO
import time

import xxhash
from quart import request, send_file
Expand All @@ -34,7 +36,7 @@
from api.db.services.knowledgebase_service import KnowledgebaseService
from api.db.services.llm_service import LLMBundle
from api.db.services.tenant_llm_service import TenantLLMService
from api.db.services.task_service import TaskService, queue_tasks, cancel_all_task_of
from api.db.services.task_service import TaskService, queue_tasks, cancel_all_task_of, queue_tasks_batch
Copy link

Copilot AI Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Import of 'TaskService' is not used.
Import of 'queue_tasks' is not used.

Suggested change
from api.db.services.task_service import TaskService, queue_tasks, cancel_all_task_of, queue_tasks_batch
from api.db.services.task_service import cancel_all_task_of, queue_tasks_batch

Copilot uses AI. Check for mistakes.
from common.metadata_utils import meta_filter, convert_conditions
from api.utils.api_utils import check_duplicate_ids, construct_json_result, get_error_data_result, get_parser_config, get_result, server_error_response, token_required, \
get_request_json
Expand Down Expand Up @@ -181,6 +183,82 @@ async def upload(dataset_id, tenant_id):
return get_result(data=renamed_doc_list)


@manager.route("/datasets/<dataset_id>/documents_with_meta", methods=["POST"]) # noqa: F821
@token_required
async def upload_with_meta(dataset_id, tenant_id):
e, kb = KnowledgebaseService.get_by_id(dataset_id)
if not e:
raise LookupError(f"Can't find the dataset with ID {dataset_id}!")

req = await request.json
docs = req.get("docs")
if not docs:
return get_error_data_result(
message="No docs in request params!", code=RetCode.ARGUMENT_ERROR
)
parse = req.get("parse", True)

group_id_field = req.get("group_id_field")
file_extension = req.get("file_extension", "html")

file_objs = []
for doc in docs:
title = doc["title"]
file_obj = io.BytesIO(doc["content"].encode("utf-8"))
# If the title already has an extension, do not add another extension
if "." in title:
filename = title
else:
filename = f"{title}.{file_extension}"
file_obj.filename = filename
metadata = doc.get("metadata", {})
if not metadata.get("_group_id") and group_id_field and group_id_field in metadata:
metadata["_group_id"] = metadata[group_id_field]
if not metadata.get("_title"):
metadata["_title"] = title
file_objs.append((
file_obj,
metadata,
))
err, files = FileService.upload_document(kb, file_objs, tenant_id)
if err:
return get_result(message="\n".join(err), code=RetCode.SERVER_ERROR)
# rename key's name
renamed_doc_list = []
docs_to_parse = []
for file in files:
doc = file[0]
key_mapping = {
"chunk_num": "chunk_count",
"kb_id": "dataset_id",
"token_num": "token_count",
"parser_id": "chunk_method",
}
renamed_doc = {}
for key, value in doc.items():
new_key = key_mapping.get(key, key)
renamed_doc[new_key] = value
renamed_doc["run"] = "UNSTART"
renamed_doc_list.append(renamed_doc)
if parse:
doc["tenant_id"] = tenant_id
docs_to_parse.append(doc)

# Batch parse documents
if docs_to_parse:
doc_ids = [doc["id"] for doc in docs_to_parse]
storage_addresses = File2DocumentService.get_storage_addresses(doc_ids)
docs_with_storage = []
for doc in docs_to_parse:
bucket, name = storage_addresses.get(doc["id"], (None, None))
if bucket and name:
docs_with_storage.append((doc, bucket, name))
if docs_with_storage:
queue_tasks_batch(docs_with_storage, 0)

return get_result(data=renamed_doc_list)


@manager.route("/datasets/<dataset_id>/documents/<document_id>", methods=["PUT"]) # noqa: F821
@token_required
async def update_doc(tenant_id, dataset_id, document_id):
Expand Down Expand Up @@ -825,27 +903,59 @@ async def parse(tenant_id, dataset_id):
unique_doc_ids, duplicate_messages = check_duplicate_ids(doc_list, "document")
doc_list = unique_doc_ids

not_found = []
success_count = 0
for id in doc_list:
doc = DocumentService.query(id=id, kb_id=dataset_id)
if not doc:
not_found.append(id)
continue
if not doc:
return get_error_data_result(message=f"You don't own the document {id}.")
if 0.0 < doc[0].progress < 1.0:
return get_error_data_result("Can't parse document that is currently being processed")
info = {"run": "1", "progress": 0, "progress_msg": "", "chunk_num": 0, "token_num": 0}
DocumentService.update_by_id(id, info)
settings.docStoreConn.delete({"doc_id": id}, search.index_name(tenant_id), dataset_id)
TaskService.filter_delete([Task.doc_id == id])
e, doc = DocumentService.get_by_id(id)
doc = doc.to_dict()
doc["tenant_id"] = tenant_id
bucket, name = File2DocumentService.get_storage_address(doc_id=doc["id"])
queue_tasks(doc, bucket, name, 0)
success_count += 1
if not doc_list:
if duplicate_messages:
return get_error_data_result(message=";".join(duplicate_messages))
return get_error_data_result("No valid document IDs provided")

# Batch query all documents
docs = list(DocumentService.model.select().where(
DocumentService.model.id.in_(doc_list),
DocumentService.model.kb_id == dataset_id
).dicts())

found_doc_ids = {doc["id"] for doc in docs}
not_found = [doc_id for doc_id in doc_list if doc_id not in found_doc_ids]

# Check for documents currently being processed
processing_docs = [doc for doc in docs if 0.0 < doc.get("progress", 0) < 1.0]
if processing_docs:
processing_ids = [doc["id"] for doc in processing_docs]
return get_error_data_result(f"Can't parse documents that are currently being processed: {processing_ids}")

# All found documents are ready to parse
docs_to_parse = docs

if not docs_to_parse:
if not_found:
return get_result(message=f"Documents not found: {not_found}", code=RetCode.DATA_ERROR)
if duplicate_messages:
return get_error_data_result(message=";".join(duplicate_messages))
return get_error_data_result("No documents available for parsing")

doc_ids_to_parse = [doc["id"] for doc in docs_to_parse]

# Batch delete chunks from index (before queue_tasks_batch, which handles old task chunks)
settings.docStoreConn.delete({"doc_id": doc_ids_to_parse}, search.index_name(tenant_id), dataset_id)

# Batch get storage addresses
storage_addresses = File2DocumentService.get_storage_addresses(doc_ids_to_parse)

# Prepare documents with storage addresses for batch processing
docs_with_storage = []
for doc in docs_to_parse:
bucket, name = storage_addresses.get(doc["id"], (None, None))
if bucket and name:
doc["tenant_id"] = tenant_id
docs_with_storage.append((doc, bucket, name))

# Batch queue tasks (queue_tasks_batch handles task deletion internally)
if docs_with_storage:
queue_tasks_batch(docs_with_storage, 0)

success_count = len(docs_with_storage)

# Handle response with errors
if not_found:
return get_result(message=f"Documents not found: {not_found}", code=RetCode.DATA_ERROR)
if duplicate_messages:
Expand Down Expand Up @@ -1493,6 +1603,7 @@ async def retrieval_test(tenant_id):
format: float
description: Similarity score.
"""
start_time = time.time()
req = await get_request_json()
if not req.get("dataset_ids"):
return get_error_data_result("`dataset_ids` is required.")
Expand All @@ -1511,6 +1622,11 @@ async def retrieval_test(tenant_id):
)
if "question" not in req:
return get_error_data_result("`question` is required.")

end_time = time.time()
logging.info(f"retrieval_test prepare1 elapsed time: {end_time - start_time:.3f} seconds")
start_time = time.time()

page = int(req.get("page", 1))
size = int(req.get("page_size", 30))
question = req["question"]
Expand All @@ -1520,14 +1636,17 @@ async def retrieval_test(tenant_id):
langs = req.get("cross_languages", [])
if not isinstance(doc_ids, list):
return get_error_data_result("`documents` should be a list")
doc_ids_list = KnowledgebaseService.list_documents_by_ids(kb_ids)
for doc_id in doc_ids:
if doc_id not in doc_ids_list:
return get_error_data_result(f"The datasets don't own the document {doc_id}")

if doc_ids:
is_valid, _, invalid_doc_ids = KnowledgebaseService.verify_documents_belong_to_kbs(doc_ids, kb_ids)
if not is_valid:
return get_error_data_result(f"The datasets don't own the documents {invalid_doc_ids}")

if not doc_ids:
metadata_condition = req.get("metadata_condition", {}) or {}
metas = DocumentService.get_meta_by_kbs(kb_ids)
doc_ids = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))
if metadata_condition:
metas = DocumentService.get_meta_by_kbs(kb_ids)
doc_ids = meta_filter(metas, convert_conditions(metadata_condition), metadata_condition.get("logic", "and"))
# If metadata_condition has conditions but no docs match, return empty result
if not doc_ids and metadata_condition.get("conditions"):
return get_result(data={"total": 0, "chunks": [], "doc_aggs": {}})
Expand All @@ -1536,6 +1655,11 @@ async def retrieval_test(tenant_id):
similarity_threshold = float(req.get("similarity_threshold", 0.2))
vector_similarity_weight = float(req.get("vector_similarity_weight", 0.3))
top = int(req.get("top_k", 1024))

end_time = time.time()
logging.info(f"retrieval_test prepare2 elapsed time: {end_time - start_time:.3f} seconds")
start_time = time.time()

if req.get("highlight") == "False" or req.get("highlight") == "false":
highlight = False
else:
Expand All @@ -1550,7 +1674,6 @@ async def retrieval_test(tenant_id):
rerank_mdl = None
if req.get("rerank_id"):
rerank_mdl = LLMBundle(kb.tenant_id, LLMType.RERANK, llm_name=req["rerank_id"])

if langs:
question = await cross_languages(kb.tenant_id, None, question, langs)

Expand All @@ -1573,6 +1696,10 @@ async def retrieval_test(tenant_id):
highlight=highlight,
rank_feature=label_question(question, kbs),
)
end_time = time.time()
logging.info(f"retrieval_test retrieval elapsed time: {end_time - start_time:.3f} seconds")
start_time = time.time()

if toc_enhance:
chat_mdl = LLMBundle(kb.tenant_id, LLMType.CHAT)
cks = settings.retriever.retrieval_by_toc(question, ranks["chunks"], tenant_ids, chat_mdl, size)
Expand Down Expand Up @@ -1604,11 +1731,15 @@ async def retrieval_test(tenant_id):
rename_chunk[new_key] = value
renamed_chunks.append(rename_chunk)
ranks["chunks"] = renamed_chunks

end_time = time.time()
logging.info(f"retrieval_test postprocess elapsed time: {end_time - start_time:.3f} seconds")

return get_result(data=ranks)
except Exception as e:
if str(e).find("not_found") > 0:
return get_result(
message="No chunk found! Check the chunk status please!",
code=RetCode.DATA_ERROR,
)
return server_error_response(e)
return server_error_response(e)
65 changes: 65 additions & 0 deletions sdk/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# ragflow-sdk

RAGFlow Python SDK 提供了与 RAGFlow 服务交互的 Python 接口,包括数据集管理、文档上传、对话等功能。

## 安装

```shell
pip install ragflow-sdk
```

## 快速开始

```python
from ragflow_sdk import RAGFlow

# 初始化客户端
rag = RAGFlow(api_key="YOUR_API_KEY", base_url="http://localhost:9380")

# 创建数据集
dataset = rag.create_dataset(name="My Dataset")

# 上传文档
documents = dataset.upload_documents_with_meta([
{
"title": "Document Title",
"content": "Document content...",
"metadata": {
"tags": ["tag1", "tag2"]
}
}
])
```

## 文档

- [工具模块 API 参考](ragflow_sdk/tools/README.md) - FileReader, DocumentExtractor, FieldMapper, BatchUploader 等工具的详细 API 文档
- [示例脚本](examples/README.md) - 批量上传等示例脚本的使用说明

## 工具模块

SDK 提供了强大的工具模块,用于批量处理和文档管理:

- **FileReader**: 支持多种文件格式的文件读取器
- **DocumentExtractor**: 从文件/目录中提取文档
- **FieldMapper**: 灵活的字段映射器,支持自动字段检测
- **BatchUploader**: 批量上传器,支持断点续传和自动重试

详细文档请参考 [工具模块 API 参考](ragflow_sdk/tools/README.md)。

## 构建和发布

### 构建 Python SDK

```shell
uv build
```

### 发布到 PyPI

```shell
uv pip install twine
export TWINE_USERNAME="__token__"
export TWINE_PASSWORD=$YOUR_PYPI_API_TOKEN
twine upload dist/*.whl
```
Loading
Loading