Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ ENV DEBIAN_FRONTEND=noninteractive
RUN mkdir -p ${APP_HOME} ${UI_HOME}

COPY frontend /tmp/frontend
RUN cd /tmp/frontend && npm install && npm run build && mv dist ${UI_HOME}/dist
RUN cd /tmp/frontend && npm config set registry https://registry.npmmirror.com && npm install && npm run build && mv dist ${UI_HOME}/dist


FROM registry.cn-qingdao.aliyuncs.com/dataease/sqlbot-base:latest AS sqlbot-builder
Expand Down Expand Up @@ -57,10 +57,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
libpixman-1-dev libfreetype6-dev \
&& rm -rf /var/lib/apt/lists/*

# configure npm
# configure npm (use China mirror for faster downloads)
RUN npm config set fund false \
&& npm config set audit false \
&& npm config set progress false
&& npm config set progress false \
&& npm config set registry https://registry.npmmirror.com

COPY g2-ssr/app.js g2-ssr/package.json /app/
COPY g2-ssr/charts/* /app/charts/
Expand Down
43 changes: 38 additions & 5 deletions backend/apps/datasource/api/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,19 @@
from common.core.deps import SessionDep, CurrentUser, Trans
from common.utils.utils import SQLBotLogUtil
from apps.ai_model.model_factory import create_llm
from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \
execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \
check_status_by_id, sync_single_fields, copy_ds
from ..crud.datasource import (
get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields,
execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField,
get_ds, fieldEnum, check_status_by_id, sync_single_fields, copy_ds,
export_datasources, import_datasources,
)
from ..crud.field import get_fields_by_table_id
from ..crud.table import get_tables_by_ds_id
from ..models.datasource import CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField, FieldObj, \
TableSchemaResponse, ColumnSchemaResponse, PreviewResponse
from ..models.datasource import (
CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField, FieldObj,
TableSchemaResponse, ColumnSchemaResponse, PreviewResponse,
DatasourceExportPayload, DatasourceImportPayload,
)
from common.audit.models.log_model import OperationType, OperationModules
from common.audit.schemas.logger_decorator import LogConfig, system_log

Expand Down Expand Up @@ -131,6 +137,33 @@ class CopyDatasourceRequest(BaseModel):
name: Optional[str] = None


class DatasourceExportRequest(BaseModel):
"""批量导出数据源:请求体为要导出的数据源 id 列表"""
ids: List[int] = []


@router.post("/export", response_model=DatasourceExportPayload, summary="批量导出数据源",
description="导出数据源基础信息、选中表、表映射关系、标准元数据(表/字段备注)。")
@require_permissions(permission=SqlbotPermission(role=['ws_admin']))
async def export_datasources_api(
session: SessionDep, user: CurrentUser, body: DatasourceExportRequest = Body(default=None),
):
ids = (body.ids if body and body.ids else []) or []
if not ids:
return DatasourceExportPayload(datasources=[])
return export_datasources(session, user, ids)


@router.post("/import", response_model=List[CoreDatasource], summary="批量导入数据源",
description="从导出 JSON 批量创建数据源,含表、字段元数据及表映射关系。")
@require_permissions(permission=SqlbotPermission(role=['ws_admin']))
@system_log(LogConfig(operation_type=OperationType.CREATE, module=OperationModules.DATASOURCE))
async def import_datasources_api(
session: SessionDep, trans: Trans, user: CurrentUser, body: DatasourceImportPayload,
):
return await import_datasources(session, trans, user, body)


@router.post("/copy/{id}", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_copy")
@require_permissions(permission=SqlbotPermission(role=['ws_admin'], keyExpression="id", type='ds'))
@system_log(LogConfig(operation_type=OperationType.CREATE, module=OperationModules.DATASOURCE, result_id_expr="id"))
Expand Down
179 changes: 177 additions & 2 deletions backend/apps/datasource/crud/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
from .table import get_tables_by_ds_id
from ..crud.field import delete_field_by_ds_id, update_field
from ..crud.table import delete_table_by_ds_id, update_table
from ..models.datasource import CoreDatasource, CreateDatasource, CoreTable, CoreField, ColumnSchema, TableObj, \
DatasourceConf, TableAndFields
from ..models.datasource import (
CoreDatasource, CreateDatasource, CoreTable, CoreField, ColumnSchema, TableObj,
DatasourceConf, TableAndFields,
ExportDatasourceItem, ExportTableItem, ExportFieldItem, DatasourceExportPayload, DatasourceImportPayload,
)
from ...openapi.models.openapiModels import DatasourceResponse


Expand Down Expand Up @@ -690,6 +693,178 @@ def get_table_schema(session: SessionDep, current_user: CurrentUser, ds: CoreDat

return schema_str


def export_datasources(
session: SessionDep, user: CurrentUser, ids: List[int]
) -> DatasourceExportPayload:
"""批量导出数据源:基础信息、选中表、表映射关系、标准元数据(表/字段备注等)。"""
current_oid = user.oid if user.oid is not None else 1
if user.isAdmin:
pass # admin 可导出任意 oid 的数据源,由调用方传 ids
items = []
for ds_id in ids:
ds = session.exec(select(CoreDatasource).where(CoreDatasource.id == ds_id)).first()
if not ds or (ds.oid != current_oid and not user.isAdmin):
continue
tables = session.query(CoreTable).filter(CoreTable.ds_id == ds_id).order_by(CoreTable.table_name).all()
table_items = []
for t in tables:
fields = session.query(CoreField).filter(CoreField.table_id == t.id).order_by(CoreField.field_index).all()
field_items = [
ExportFieldItem(
id=f.id,
field_name=f.field_name or "",
field_type=f.field_type or "",
field_comment=f.field_comment or "",
custom_comment=f.custom_comment or "",
checked=f.checked,
field_index=f.field_index,
)
for f in fields
]
table_items.append(
ExportTableItem(
id=t.id,
table_name=t.table_name or "",
table_comment=t.table_comment or "",
custom_comment=t.custom_comment or "",
checked=t.checked,
fields=field_items,
)
)
ds_dict = {
"name": ds.name,
"description": ds.description or "",
"type": ds.type,
"type_name": ds.type_name or "",
"configuration": ds.configuration,
"status": ds.status or "Success",
"num": ds.num or "0/0",
"recommended_config": ds.recommended_config or 1,
}
items.append(
ExportDatasourceItem(
version=1,
datasource=ds_dict,
tables=table_items,
table_relation=copy.deepcopy(ds.table_relation) if ds.table_relation else [],
)
)
return DatasourceExportPayload(datasources=items)


def _apply_import_to_ds(
session: SessionDep,
new_ds: CoreDatasource,
item: ExportDatasourceItem,
) -> None:
"""对已存在或新创建的数据源应用导入的表/字段/表关系。"""
new_tables = session.query(CoreTable).filter(CoreTable.ds_id == new_ds.id).order_by(CoreTable.table_name).all()
old_to_new_table_id = {}
old_to_new_field_id = {}
for exp_t in item.tables:
new_t = next((x for x in new_tables if x.table_name == exp_t.table_name), None)
if not new_t:
continue
old_to_new_table_id[exp_t.id] = new_t.id
new_fields = session.query(CoreField).filter(CoreField.table_id == new_t.id).order_by(CoreField.field_index).all()
for exp_f in exp_t.fields:
new_f = next((x for x in new_fields if x.field_name == exp_f.field_name), None)
if new_f:
old_to_new_field_id[exp_f.id] = new_f.id
if exp_f.custom_comment:
new_f.custom_comment = exp_f.custom_comment
session.add(new_f)
session.commit()

if item.table_relation:
new_relation = copy.deepcopy(item.table_relation)
for rel in new_relation:
if rel.get("shape") != "edge":
continue
for key, mapping in [("source", old_to_new_table_id), ("target", old_to_new_table_id)]:
cell = (rel.get(key) or {}).get("cell")
if cell is not None and cell in mapping:
rel.setdefault(key, {})["cell"] = mapping[cell]
for key, mapping in [("source", old_to_new_field_id), ("target", old_to_new_field_id)]:
port = (rel.get(key) or {}).get("port")
if port is not None and port in mapping:
rel.setdefault(key, {})["port"] = mapping[port]
new_ds.table_relation = new_relation
session.add(new_ds)
session.commit()
updateNum(session, new_ds)
run_save_ds_embeddings([new_ds.id])


@clear_cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.DS_ID_LIST, keyExpression="user.oid")
async def import_datasources(
session: SessionDep, trans: Trans, user: CurrentUser, payload: DatasourceImportPayload
) -> List[CoreDatasource]:
"""批量导入数据源:合并模式——同名称则更新,否则新建;不因重复报错。"""
current_oid = user.oid if user.oid is not None else 1
result = []
for item in payload.datasources:
if not item.datasource or not item.datasource.get("name"):
continue
ds_dict = item.datasource
name = ds_dict["name"]
tables_payload = [
CoreTable(
table_name=t.table_name,
table_comment=t.table_comment or "",
custom_comment=t.custom_comment or "",
)
for t in item.tables
]

existing = session.exec(
select(CoreDatasource).where(
and_(CoreDatasource.name == name, CoreDatasource.oid == current_oid)
)
).first()

if existing:
try:
existing.description = ds_dict.get("description") or ""
existing.type = ds_dict.get("type") or "mysql"
existing.type_name = DB.get_db(existing.type).db_name
existing.configuration = ds_dict.get("configuration") or ""
existing.status = ds_dict.get("status") or "Success"
existing.recommended_config = ds_dict.get("recommended_config") or 1
session.add(existing)
session.commit()
clear_ds_engine_cache(existing.id)
sync_table(session, existing, tables_payload)
session.refresh(existing)
_apply_import_to_ds(session, existing, item)
session.refresh(existing)
result.append(existing)
except Exception as e:
SQLBotLogUtil.warning(f"import_datasources merge ds {name}: {e}")
continue
else:
try:
create_ds_obj = CreateDatasource(
name=name,
description=ds_dict.get("description") or "",
type=ds_dict.get("type") or "mysql",
configuration=ds_dict.get("configuration") or "",
status=ds_dict.get("status") or "Success",
num=ds_dict.get("num") or "0/0",
recommended_config=ds_dict.get("recommended_config") or 1,
tables=tables_payload,
)
new_ds = await create_ds(session, trans, user, create_ds_obj)
_apply_import_to_ds(session, new_ds, item)
session.refresh(new_ds)
result.append(new_ds)
except Exception as e:
SQLBotLogUtil.warning(f"import_datasources create ds {name}: {e}")
continue
return result


@cache(namespace=CacheNamespace.AUTH_INFO, cacheName=CacheName.DS_ID_LIST, keyExpression="oid")
async def get_ws_ds(session, oid) -> list:
stmt = select(CoreDatasource.id).distinct().where(CoreDatasource.oid == oid)
Expand Down
40 changes: 40 additions & 0 deletions backend/apps/datasource/models/datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,3 +190,43 @@ class PreviewResponse(BaseModel):
fields: List | None = []
data: List | None = []
sql: str | None = ''


# ---------- 数据源批量导入导出 ----------
class ExportFieldItem(BaseModel):
"""导出用字段项,保留 id 用于导入时表关系重映射"""
id: int = 0
field_name: str = ''
field_type: str = ''
field_comment: str = ''
custom_comment: str = ''
checked: bool = True
field_index: int = 0


class ExportTableItem(BaseModel):
"""导出用表项,保留 id 用于导入时表关系重映射"""
id: int = 0
table_name: str = ''
table_comment: str = ''
custom_comment: str = ''
checked: bool = True
fields: List[ExportFieldItem] = []


class ExportDatasourceItem(BaseModel):
"""单条数据源导出结构:基础信息 + 选中表 + 表映射关系 + 标准元数据"""
version: int = 1
datasource: dict = {} # name, description, type, type_name, configuration, status, num, recommended_config
tables: List[ExportTableItem] = []
table_relation: List = [] # 与 CoreDatasource.table_relation 一致,含 source/target cell(表id) port(字段id)


class DatasourceExportPayload(BaseModel):
"""批量导出请求/响应"""
datasources: List[ExportDatasourceItem] = []


class DatasourceImportPayload(BaseModel):
"""批量导入请求体"""
datasources: List[ExportDatasourceItem] = []
Loading
Loading