Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 140 additions & 16 deletions src/db_services/agentVersion.service.js
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ async function publish(org_id, version_id, user_id) {

async function getAllConnectedAgents(id, org_id, type) {
const agentsMap = {};
const visited = new Set();
const visitedDownstream = new Set();
const visitedUpstream = new Set();

async function fetchDocument(docId, docType) {
try {
Expand All @@ -498,9 +499,14 @@ async function getAllConnectedAgents(id, org_id, type) {
doc = await bridgeVersionModel.findOne({ _id: docId, org_id }).lean();
actualType = "version";
} else {
// Default to bridge if type is not specified
// Try bridge first, then version
doc = await configurationModel.findOne({ _id: docId, org_id }).lean();
actualType = "bridge";
if (doc) {
actualType = "bridge";
} else {
doc = await bridgeVersionModel.findOne({ _id: docId, org_id }).lean();
actualType = doc ? "version" : null;
}
}

return { doc, type: actualType };
Expand All @@ -509,8 +515,111 @@ async function getAllConnectedAgents(id, org_id, type) {
}
}

async function processAgent(agentId, parentIds = [], docType = null) {
if (visited.has(agentId)) {
// Find all agents (bridges and versions) that have the given agentId in their connected_agents
async function findParentAgents(agentId) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The findParentAgents function is missing error handling. If the database queries fail, the error will propagate up and potentially crash the API call. Consider adding try/catch blocks:

const parentAgents = [];

// Search in bridges (configurationModel)
const parentBridges = await configurationModel
.find({
org_id,
$or: [{ "connected_agents": { $exists: true } }]
})
.lean();
Comment on lines +523 to +528
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query in findParentAgents() currently fetches all documents with a connected_agents field and then filters them in memory. For large organizations with many agents, this could be inefficient.

Consider optimizing with a more targeted query that directly filters for documents containing the target agentId:

Suggested change
const parentBridges = await configurationModel
.find({
org_id,
$or: [{ "connected_agents": { $exists: true } }]
})
.lean();
// Search in bridges (configurationModel)
const parentBridges = await configurationModel
.find({
org_id,
connected_agents: {
$exists: true,
$elemMatch: { $or: [{ version_id: agentId }, { bridge_id: agentId }] }
}
})
.lean();


for (const bridge of parentBridges) {
const connectedAgents = bridge.connected_agents || {};
for (const [, info] of Object.entries(connectedAgents)) {
if (!info) continue;
const childId = info.version_id || info.bridge_id;
if (childId === agentId) {
parentAgents.push({ id: bridge._id.toString(), type: "bridge" });
break;
}
}
}

// Search in versions (bridgeVersionModel)
const parentVersions = await bridgeVersionModel
.find({
org_id,
$or: [{ "connected_agents": { $exists: true } }]
})
.lean();
Comment on lines +543 to +548
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the bridge query, this version query could be optimized to directly filter for documents containing the target agentId:

Suggested change
const parentVersions = await bridgeVersionModel
.find({
org_id,
$or: [{ "connected_agents": { $exists: true } }]
})
.lean();
// Search in versions (bridgeVersionModel)
const parentVersions = await bridgeVersionModel
.find({
org_id,
connected_agents: {
$exists: true,
$elemMatch: { $or: [{ version_id: agentId }, { bridge_id: agentId }] }
}
})
.lean();


for (const version of parentVersions) {
const connectedAgents = version.connected_agents || {};
for (const [, info] of Object.entries(connectedAgents)) {
if (!info) continue;
const childId = info.version_id || info.bridge_id;
if (childId === agentId) {
parentAgents.push({ id: version._id.toString(), type: "version" });
break;
}
}
}

return parentAgents;
}

// Process upstream (parent agents) recursively
async function processUpstream(agentId, childIds = [], docType = null) {
if (visitedUpstream.has(agentId)) {
// Update childAgents if already visited
if (childIds && agentsMap[agentId]) {
childIds.forEach((cid) => {
if (!agentsMap[agentId].childAgents.includes(cid)) {
agentsMap[agentId].childAgents.push(cid);
}
});
}
return;
}

visitedUpstream.add(agentId);
const { doc, type: actualType } = await fetchDocument(agentId, docType);

if (!doc) {
return;
}

const agentName = doc.name || `Agent_${agentId}`;
const connectedAgentDetails = doc.connected_agent_details || {};
const threadId = connectedAgentDetails.thread_id || false;
const description = connectedAgentDetails.description;

if (!agentsMap[agentId]) {
agentsMap[agentId] = {
agent_name: agentName,
parentAgents: [],
childAgents: childIds || [],
thread_id: threadId,
document_type: actualType
};
if (description) agentsMap[agentId].description = description;
} else {
// Merge childAgents
childIds.forEach((cid) => {
if (!agentsMap[agentId].childAgents.includes(cid)) {
agentsMap[agentId].childAgents.push(cid);
}
});
}

// Find parents of this agent
const parents = await findParentAgents(agentId);
for (const parent of parents) {
if (!agentsMap[agentId].parentAgents.includes(parent.id)) {
agentsMap[agentId].parentAgents.push(parent.id);
}
await processUpstream(parent.id, [agentId], parent.type);
}
}

// Process downstream (child agents) recursively
async function processDownstream(agentId, parentIds = [], docType = null) {
if (visitedDownstream.has(agentId)) {
// Update parentAgents if already visited
if (parentIds && agentsMap[agentId]) {
parentIds.forEach((pid) => {
if (!agentsMap[agentId].parentAgents.includes(pid)) {
Expand All @@ -521,7 +630,7 @@ async function getAllConnectedAgents(id, org_id, type) {
return;
}

visited.add(agentId);
visitedDownstream.add(agentId);
const { doc, type: actualType } = await fetchDocument(agentId, docType);

if (!doc) {
Expand All @@ -533,15 +642,25 @@ async function getAllConnectedAgents(id, org_id, type) {
const threadId = connectedAgentDetails.thread_id || false;
const description = connectedAgentDetails.description;

agentsMap[agentId] = {
agent_name: agentName,
parentAgents: parentIds || [],
childAgents: [],
thread_id: threadId,
document_type: actualType
};
if (description) agentsMap[agentId].description = description;
if (!agentsMap[agentId]) {
agentsMap[agentId] = {
agent_name: agentName,
parentAgents: parentIds || [],
childAgents: [],
thread_id: threadId,
document_type: actualType
};
if (description) agentsMap[agentId].description = description;
} else {
// Merge parentAgents
parentIds.forEach((pid) => {
if (!agentsMap[agentId].parentAgents.includes(pid)) {
agentsMap[agentId].parentAgents.push(pid);
}
});
}

// Process children
const connectedAgents = doc.connected_agents || {};

for (const [, info] of Object.entries(connectedAgents)) {
Expand All @@ -555,12 +674,17 @@ async function getAllConnectedAgents(id, org_id, type) {
agentsMap[agentId].childAgents.push(childId);
}
const childType = info.version_id ? "version" : "bridge";
await processAgent(childId, [agentId], childType);
await processDownstream(childId, [agentId], childType);
}
}
}

await processAgent(id, null, type);
// Start by processing the initial agent in both directions
// First, process upstream to find all parent agents
await processUpstream(id, [], type);

// Then, process downstream to find all child agents
await processDownstream(id, [], type);

return agentsMap;
}
Expand Down