Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/configs/constant.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ const bridge_ids = {
improve_prompt_optimizer: "68e4ac02739a8b89ba27b22a",
generate_test_cases: "68e8d1fbf8c9ba2043cf7afd",
prompt_checker: "692ee19da04fbf2a132b252c",
rich_ui_template: "6967b36c17a69473fa7fdb90"
rich_ui_template: "6967b36c17a69473fa7fdb90",
canonicalizer: "6973200cf60dd5bf64eeb325"
};

const redis_keys = {
Expand Down
9 changes: 8 additions & 1 deletion src/consumers/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import dotenv from "dotenv";
import logger from "../logger.js";
import rabbitmqService from "../services/rabbitmq.service.js";
import { logQueueProcessor } from "./logQueueConsumer.js";

dotenv.config();
const CONSUMER_ENABLED = process.env.CONSUMER_ENABLED?.toLowerCase() === "true";
const CONSUMERS = [];
const CONSUMERS = [
{
queueName: process.env.LOG_QUEUE_NAME,
process: logQueueProcessor,
batchSize: 1
}
];

class Consumer {
constructor(obj, connectionString) {
Expand Down
67 changes: 67 additions & 0 deletions src/consumers/logQueueConsumer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import logger from "../logger.js";
import { saveSubThreadIdAndName } from "../services/logQueue/saveSubThreadIdAndName.service.js";
import { validateResponse } from "../services/logQueue/validateResponse.service.js";
import { totalTokenCalculation } from "../services/logQueue/totalTokenCalculation.service.js";
import { chatbotSuggestions } from "../services/logQueue/chatbotSuggestions.service.js";
import { handleGptMemory } from "../services/logQueue/handleGptMemory.service.js";
import { saveToAgentMemory } from "../services/logQueue/saveToAgentMemory.service.js";
import { saveFilesToRedis } from "../services/logQueue/saveFilesToRedis.service.js";
import { sendApiHitEvent } from "../services/logQueue/sendApiHitEvent.service.js";
import { broadcastResponseWebhook } from "../services/logQueue/broadcastResponseWebhook.service.js";

async function processLogQueueMessage(messages) {
await saveSubThreadIdAndName(messages["save_sub_thread_id_and_name"]);

if (messages.type === "image") {
return;
}

const agent_memory_data = messages.save_agent_memory || {};
if (agent_memory_data.chatbot_auto_answers) {
await saveToAgentMemory({
user_question: agent_memory_data.user_message || "",
assistant_answer: agent_memory_data.assistant_message || "",
agent_id: agent_memory_data.bridge_id || "",
bridge_name: agent_memory_data.bridge_name || "",
system_prompt: agent_memory_data.system_prompt || ""
});
}

if (!messages["validateResponse"]?.alert_flag) {
await sendApiHitEvent({
message_id: messages["validateResponse"]?.message_id,
org_id: messages["validateResponse"]?.org_id
});
}

await validateResponse(messages["validateResponse"]);
await totalTokenCalculation(messages["total_token_calculation"]);

if (messages["check_handle_gpt_memory"]?.gpt_memory) {
await handleGptMemory(messages["handle_gpt_memory"]);
}

if (messages["check_chatbot_suggestions"]?.bridgeType) {
await chatbotSuggestions(messages["chatbot_suggestions"]);
}

await saveFilesToRedis(messages["save_files_to_redis"]);

if (messages.broadcast_response_webhook) {
await broadcastResponseWebhook(messages["broadcast_response_webhook"]);
}
}

async function logQueueProcessor(message, channel) {
let message_data;
try {
message_data = JSON.parse(message.content.toString());
await processLogQueueMessage(message_data);
channel.ack(message);
} catch (err) {
logger.error(`Error processing log queue message: ${err.message}`);
channel.nack(message, false, false);
}
}

export { logQueueProcessor };
1 change: 1 addition & 0 deletions src/mongoModel/Thread.model.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ const threadSchema = new Schema({
sub_thread_id: { type: String, required: true },
display_name: { type: String, required: true },
org_id: { type: String, required: true },
bridge_id: { type: String },
created_at: { type: Date }
});

Expand Down
46 changes: 46 additions & 0 deletions src/services/logQueue/broadcastResponseWebhook.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { get_webhook_data } from "../../db_services/webhookAlert.service.js";
import { sendResponse } from "../utils/utility.service.js";
import logger from "../../logger.js";

async function broadcastResponseWebhook({ bridge_id, org_id, response, user_question, variables, error_type }) {
try {
const result = await get_webhook_data(org_id);
if (!result?.webhook_data) return;

const webhook_data = [...result.webhook_data];

webhook_data.push({
org_id,
name: "default alert",
webhookConfiguration: { url: "https://flow.sokt.io/func/scriSmH2QaBH", headers: {} },
alertType: ["Error", "Variable", "retry_mechanism"],
bridges: ["all"]
});

for (const entry of webhook_data) {
const bridges = entry.bridges || [];
const alert_types = entry.alertType || [];

if (!alert_types.includes(error_type)) continue;
if (!bridges.includes(bridge_id) && !bridges.includes("all")) continue;

const webhook_config = entry.webhookConfiguration;
const webhook_url = entry.user_url || webhook_config?.url;
const headers = webhook_config?.headers || {};

const response_format = { type: "webhook", cred: { url: webhook_url, headers } };

const broadcast_data = {
response: response || {},
user_question: user_question || "",
variables: variables || {}
};

await sendResponse(response_format, broadcast_data, variables || {});
}
} catch (err) {
logger.error(`Error in broadcastResponseWebhook: ${err.message}`);
}
}

export { broadcastResponseWebhook };
40 changes: 40 additions & 0 deletions src/services/logQueue/chatbotSuggestions.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { v4 as uuidv4 } from "uuid";
import { callAiMiddleware } from "../utils/aiCall.utils.js";
import { sendResponse } from "../utils/utility.service.js";
import { bridge_ids } from "../../configs/constant.js";
import prebuiltPromptDbService from "../../db_services/prebuiltPrompt.service.js";
import logger from "../../logger.js";

async function chatbotSuggestions({ response_format, assistant, user, bridge_summary, thread_id, sub_thread_id, configuration, org_id }) {
try {
const prompt_summary = bridge_summary;
const prompt = configuration?.prompt;

const conversation = [
{ role: "user", content: user },
{ role: "assistant", content: assistant?.data?.content }
];

const final_prompt = prompt_summary ?? prompt;
const random_id = uuidv4();

const updated_prompt = await prebuiltPromptDbService.getSpecificPrebuiltPrompt(org_id, "chatbot_suggestions");
let ai_configuration = null;
if (updated_prompt?.chatbot_suggestions) {
ai_configuration = { prompt: updated_prompt.chatbot_suggestions };
}

const message = `Generate suggestions based on the user conversations. \n **User Conversations**: ${JSON.stringify(conversation.slice(-2))}`;
const variables = { prompt_summary: final_prompt };
const composed_thread_id = `${thread_id || random_id}-${sub_thread_id || random_id}`;

const result = await callAiMiddleware(message, bridge_ids.chatbot_suggestions, variables, ai_configuration, null, composed_thread_id);

const response = { data: { suggestions: result?.suggestions } };
await sendResponse(response_format, response);
} catch (err) {
logger.error(`Error calling function chatbotSuggestions: ${err.message}`);
}
}

export { chatbotSuggestions };
40 changes: 40 additions & 0 deletions src/services/logQueue/handleGptMemory.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { storeInCache } from "../../cache_service/index.js";
import { callAiMiddleware } from "../utils/aiCall.utils.js";
import { bridge_ids, redis_keys } from "../../configs/constant.js";
import prebuiltPromptDbService from "../../db_services/prebuiltPrompt.service.js";
import logger from "../../logger.js";

async function handleGptMemory({ id, user, assistant, purpose, gpt_memory_context, org_id }) {
try {
const variables = { threadID: id, memory: purpose, gpt_memory_context };
const content = assistant?.data?.content || "";

const configuration = {
conversation: [
{ role: "user", content: user },
{ role: "assistant", content }
]
};

const updated_prompt = await prebuiltPromptDbService.getSpecificPrebuiltPrompt(org_id, "gpt_memory");
if (updated_prompt?.gpt_memory) {
configuration.prompt = updated_prompt.gpt_memory;
}

const message =
"use the function to store the memory if the user message and history is related to the context or is important to store else don't call the function and ignore it. is purpose is not there than think its the begining of the conversation. Only return the exact memory as output no an extra text jusy memory if present or Just return False";

const response = await callAiMiddleware(message, bridge_ids.gpt_memory, variables, configuration, "text");

if (typeof response === "string" && response !== "False") {
const cache_key = `${redis_keys.gpt_memory_}${id}`;
await storeInCache(cache_key, response);
}

return response;
} catch (err) {
logger.error(`Error calling function handleGptMemory: ${err.message}`);
}
}

export { handleGptMemory };
35 changes: 35 additions & 0 deletions src/services/logQueue/saveFilesToRedis.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import client from "../cache.service.js";
import { findInCache, storeInCache } from "../../cache_service/index.js";
import { redis_keys } from "../../configs/constant.js";
import logger from "../../logger.js";

const REDIS_PREFIX = "AIMIDDLEWARE_";
const FILES_TTL = 604800; // 7 days

async function saveFilesToRedis({ thread_id, sub_thread_id, bridge_id, files }) {
try {
const cache_key = `${redis_keys.files_}${bridge_id}_${thread_id}_${sub_thread_id}`;
const existing_cache = await findInCache(cache_key);

if (existing_cache) {
try {
const cached_files = JSON.parse(existing_cache);
if (JSON.stringify(cached_files) === JSON.stringify(files)) {
if (client.isReady) {
await client.expire(`${REDIS_PREFIX}${cache_key}`, FILES_TTL);
}
} else {
await storeInCache(cache_key, files, FILES_TTL);
}
} catch {
await storeInCache(cache_key, files, FILES_TTL);
}
} else {
await storeInCache(cache_key, files, FILES_TTL);
}
} catch (err) {
logger.error(`Error in saveFilesToRedis: ${err.message}`);
}
}

export { saveFilesToRedis };
62 changes: 62 additions & 0 deletions src/services/logQueue/saveSubThreadIdAndName.service.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { findInCache, storeInCache } from "../../cache_service/index.js";
import { callAiMiddleware } from "../utils/aiCall.utils.js";
import { sendResponse } from "../utils/utility.service.js";
import { bridge_ids } from "../../configs/constant.js";
import Thread from "../../mongoModel/Thread.model.js";
import logger from "../../logger.js";

async function saveSubThreadIdAndName({ thread_id, sub_thread_id, org_id, thread_flag, response_format, bridge_id, user }) {
try {
const cache_key = `sub_thread_${org_id}_${bridge_id}_${thread_id}_${sub_thread_id}`;

const cached_result = await findInCache(cache_key);
if (cached_result) {
logger.info(`Found cached sub_thread_id for key: ${cache_key}`);
return;
}

const variables = { user };
let display_name = sub_thread_id;
const current_time = new Date();

if (thread_flag) {
display_name = await callAiMiddleware("generate description", bridge_ids.generate_description, variables, null, "text");
}

await Thread.findOneAndUpdate(
{ org_id, thread_id, sub_thread_id },
{
$set: { bridge_id, display_name: display_name || sub_thread_id },
$setOnInsert: { org_id, thread_id, sub_thread_id, created_at: current_time }
},
{ upsert: true }
);

const cache_data = {
org_id,
bridge_id,
thread_id,
sub_thread_id,
display_name,
created_at: current_time.toISOString()
};
await storeInCache(cache_key, cache_data, 172800); // 48 hours

if (display_name && display_name !== sub_thread_id) {
const response = {
data: {
display_name,
sub_thread_id,
thread_id,
bridge_id,
created_at: current_time.toISOString()
}
};
await sendResponse(response_format, response);
}
} catch (err) {
logger.error(`Error in saving sub thread id and name: ${err.message}`);
}
}

export { saveSubThreadIdAndName };
Loading