Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/controllers/agentVersion.controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,10 @@ const suggestModel = async (req, res, next) => {

const getConnectedAgents = async (req, res, next) => {
const { version_id } = req.params; // Changed from 'id' to 'version_id'
const { type } = req.query;
const { type, key = "orchestral" } = req.query;
const org_id = req.profile.org.id;

const result = await agentVersionDbService.getAllConnectedAgents(version_id, org_id, type);
const result = await agentVersionDbService.getAllConnectedAgents(version_id, org_id, type, key);
res.locals = { success: true, data: result };
req.statusCode = 200;
return next();
Expand Down
133 changes: 117 additions & 16 deletions src/db_services/agentVersion.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,19 @@ import { getReqOptVariablesInPrompt, transformAgentVariableToToolCallFormat } fr
import { convertPromptToString } from "../utils/promptWrapper.utils.js";
const ObjectId = mongoose.Types.ObjectId;

// Constants for agent traversal modes
const AGENT_TRAVERSAL_MODES = {
ORCHESTRAL: "orchestral",
FLOW: "flow"
};

// Constants for traversal directions
const TRAVERSAL_DIRECTIONS = {
BOTH: "both",
CHILDREN_ONLY: "children-only",
PARENTS_ONLY: "parents-only"
};

async function getVersion(version_id) {
try {
const version = await bridgeVersionModel.findById(version_id).lean();
Expand Down Expand Up @@ -474,7 +487,7 @@ async function publish(org_id, version_id, user_id) {
return { success: true, message: "Version published successfully" };
}

async function getAllConnectedAgents(id, org_id, type) {
async function getAllConnectedAgents(id, org_id, type, key = AGENT_TRAVERSAL_MODES.ORCHESTRAL) {
const agentsMap = {};
const visited = new Set();

Expand All @@ -499,9 +512,66 @@ async function getAllConnectedAgents(id, org_id, type) {
}
}

async function processAgent(agentId, parentIds = [], docType = null) {
if (visited.has(agentId)) {
if (parentIds && agentsMap[agentId]) {
// Find all agents that reference a given agent as a child (i.e., find parents)
async function findParentAgents(agentId) {
const parents = [];
const agentIdStr = agentId.toString();

try {
// Search in bridges (configurationModel) for agents that have this agent in connected_agents
// Note: connected_agents is an Object keyed by agent name, with values containing version_id/bridge_id
// We query for documents with non-empty connected_agents and filter in JS
const bridgeParents = await configurationModel
.find({
org_id,
connected_agents: { $exists: true, $ne: {} }
})
.lean();

for (const parent of bridgeParents) {
const connectedAgents = parent.connected_agents || {};
for (const [, info] of Object.entries(connectedAgents)) {
if (info && (info.version_id === agentIdStr || info.bridge_id === agentIdStr)) {
parents.push({ id: parent._id.toString(), type: "bridge" });
break;
}
}
}
} catch (error) {
console.error("Error finding parent bridges:", error);
}

try {
// Search in versions (bridgeVersionModel) for agents that have this agent in connected_agents
const versionParents = await bridgeVersionModel
.find({
org_id,
connected_agents: { $exists: true, $ne: {} }
})
.lean();

for (const parent of versionParents) {
const connectedAgents = parent.connected_agents || {};
for (const [, info] of Object.entries(connectedAgents)) {
if (info && (info.version_id === agentIdStr || info.bridge_id === agentIdStr)) {
parents.push({ id: parent._id.toString(), type: "version" });
break;
}
}
}
} catch (error) {
console.error("Error finding parent versions:", error);
}

return parents;
}

async function processAgent(agentId, parentIds = [], docType = null, direction = TRAVERSAL_DIRECTIONS.CHILDREN_ONLY) {
const visitKey = `${agentId.toString()}_${direction}`;

if (visited.has(visitKey)) {
// Agent already visited in this direction, just update parent references if needed
if (parentIds && parentIds.length > 0 && agentsMap[agentId]) {
parentIds.forEach((pid) => {
if (!agentsMap[agentId].parentAgents.includes(pid)) {
agentsMap[agentId].parentAgents.push(pid);
Expand All @@ -511,7 +581,7 @@ async function getAllConnectedAgents(id, org_id, type) {
return;
}

visited.add(agentId);
visited.add(visitKey);
const { doc, type: actualType } = await fetchDocument(agentId, docType);

if (!doc) {
Expand All @@ -523,17 +593,29 @@ async function getAllConnectedAgents(id, org_id, type) {
const threadId = connectedAgentDetails.thread_id || false;
const description = connectedAgentDetails.description;

agentsMap[agentId] = {
agent_name: agentName,
parentAgents: parentIds || [],
childAgents: [],
thread_id: threadId,
document_type: actualType
};
if (description) agentsMap[agentId].description = description;
// Initialize or update agent entry in map
if (!agentsMap[agentId]) {
agentsMap[agentId] = {
agent_name: agentName,
parentAgents: parentIds || [],
childAgents: [],
thread_id: threadId,
document_type: actualType
};
if (description) agentsMap[agentId].description = description;
} else {
// Update parent references if this agent already exists
if (parentIds && parentIds.length > 0) {
parentIds.forEach((pid) => {
if (!agentsMap[agentId].parentAgents.includes(pid)) {
agentsMap[agentId].parentAgents.push(pid);
}
});
}
}

// Process children (connected_agents)
const connectedAgents = doc.connected_agents || {};

for (const [, info] of Object.entries(connectedAgents)) {
if (!info) {
continue;
Expand All @@ -545,12 +627,31 @@ async function getAllConnectedAgents(id, org_id, type) {
agentsMap[agentId].childAgents.push(childId);
}
const childType = info.version_id ? "version" : "bridge";
await processAgent(childId, [agentId], childType);
await processAgent(childId, [agentId], childType, TRAVERSAL_DIRECTIONS.CHILDREN_ONLY);
}
}

// For "flow" key, also traverse parent agents
if (key === AGENT_TRAVERSAL_MODES.FLOW && direction !== TRAVERSAL_DIRECTIONS.CHILDREN_ONLY) {
const parentAgentsList = await findParentAgents(agentId);
for (const parent of parentAgentsList) {
if (!agentsMap[parent.id]) {
// Process parent agent (going "up")
await processAgent(parent.id, [], parent.type, TRAVERSAL_DIRECTIONS.PARENTS_ONLY);
}
// Ensure the parent-child relationship is properly set
if (agentsMap[parent.id] && !agentsMap[parent.id].childAgents.includes(agentId)) {
agentsMap[parent.id].childAgents.push(agentId);
}
if (agentsMap[agentId] && !agentsMap[agentId].parentAgents.includes(parent.id)) {
agentsMap[agentId].parentAgents.push(parent.id);
}
}
}
}

await processAgent(id, null, type);
// Start processing from the given agent
await processAgent(id, [], type, key === AGENT_TRAVERSAL_MODES.FLOW ? TRAVERSAL_DIRECTIONS.BOTH : TRAVERSAL_DIRECTIONS.CHILDREN_ONLY);

return agentsMap;
}
Expand Down
3 changes: 2 additions & 1 deletion src/validation/joi_validation/bridgeVersion.validation.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ const getConnectedAgents = {
.unknown(true),
query: Joi.object()
.keys({
type: Joi.string().optional()
type: Joi.string().optional(),
key: Joi.string().valid("orchestral", "flow").optional().default("orchestral")
})
.unknown(true)
};
Expand Down