Skip to content

Updated worker for Notifications #2

@Systemized

Description

@Systemized

Updated worker to allow completed deals to pop up on notifications

import { createClient } from "redis";
import http from "http";

import prismaDB from "./lib/prisma";
import { splitContentIntoChunks } from "./lib/utils";
import { generateObject, generateText } from "ai";
import { openai } from "./lib/ai/available-models";
import { z } from "zod";

// Processing queue (worker consumes from here)
const QUEUE = "dealQueue";
const DONE_CHANNEL = "problem_done";

type Submission = {
  id: string;
  brokerage: string;
  firstName: string;
  lastName: string;
  tags: string[];
  email: string;
  linkedinUrl: string;
  workPhone: string;
  dealCaption: string;
  revenue: number;
  ebitda: number;
  title: string;
  dealTeaser: string | null;
  grossRevenue: number | null;
  askingPrice: number | null;
  ebitdaMargin: number;
  industry: string;
  dealType: string;
  sourceWebsite: string;
  companyLocation: string;
  createdAt: string;
  updatedAt: string;
  bitrixLink: string | null;
  status: string;
  isReviewed: boolean;
  isPublished: boolean;
  seen: boolean;
  bitrixId: string | null;
  bitrixCreatedAt: string | null;
  userId: string;
  screenerId: string;
  screenerContent: string;
  screenerName: string;
};

interface AIScreeningResult {
  title: string;
  score: number;
  sentiment: "POSITIVE" | "NEGATIVE" | "NEUTRAL";
  explanation: string;
}

/**
 * Generate final AI screening result
 */
async function generateFinalSummary(
  combinedSummary: string
): Promise<AIScreeningResult | null> {
  try {
    console.log("Generating final AI screening result...");
    const result = await generateObject({
      model: openai("gpt-4o-mini"),
      prompt: `Combine the following summaries into a single summary: ${combinedSummary}`,
      schema: z.object({
        title: z.string(),
        score: z.number(),
        sentiment: z.enum(["POSITIVE", "NEGATIVE", "NEUTRAL"]),
        explanation: z.string(),
      }),
    });
    console.log(
      "Final AI screening result generated successfully:",
      result.object
    );
    return result.object;
  } catch (error) {
    console.error("Error generating final summary:", error);
    return null;
  }
}

/**
 * Save AI screening result to database
 */
async function saveAIScreeningResult(
  submissionId: string,
  result: AIScreeningResult,
  combinedSummary: string
): Promise<boolean> {
  try {
    console.log(
      `Saving AI screening result to database for submission: ${submissionId}`
    );
    await prismaDB.aiScreening.create({
      data: {
        dealId: submissionId,
        title: result.title,
        explanation: result.explanation,
        score: result.score,
        sentiment: result.sentiment,
        content: combinedSummary,
      },
    });
    console.log("AI screening result saved successfully to database");
    return true;
  } catch (error) {
    console.error("Error saving AI screening result:", error);
    return false;
  }
}

/**
 * Process content chunks and generate summaries
 */
async function processContentChunks(
  chunks: string[],
  dealInfo: Submission
): Promise<string[]> {
  console.log(
    `Processing ${chunks.length} content chunks for deal: ${dealInfo.id}`
  );
  const summaries: string[] = [];

  for (let i = 0; i < chunks.length; i++) {
    const chunk = chunks[i];
    if (!chunk) {
      console.warn(`Chunk ${i + 1} is undefined, skipping...`);
      continue;
    }
    try {
      console.log(
        `Processing chunk ${i + 1}/${chunks.length} (${
          chunk.length
        } characters)`
      );

      const dealContext = {
        title: dealInfo.title,
        brokerage: dealInfo.brokerage,
        dealCaption: dealInfo.dealCaption,
        dealType: dealInfo.dealType,
        ebitda: dealInfo.ebitda,
        ebitdaMargin: dealInfo.ebitdaMargin,
        companyLocation: dealInfo.companyLocation,
        revenue: dealInfo.revenue,
        caption: dealInfo.dealCaption,
        industry: dealInfo.industry,
      };

      const prompt = `Based on this deal context: ${JSON.stringify(
        dealContext
      )}, evaluate the following text: ${chunk}`;

      const summary = await generateText({
        system:
          "You are an expert AI Assistant that specializes in deal sourcing, evaluation and private equity in general",
        model: openai("gpt-4o-mini"),
        prompt,
      });

      console.log("summar generated by AI", summary.text);

      summaries.push(summary.text);
      console.log(`Chunk ${i + 1} processed successfully`);
    } catch (error) {
      console.error(`Error processing chunk ${i + 1}:`, error);
      summaries.push(`[Error processing chunk]`);
    }
  }

  return summaries;
}

/**
 * Process a single submission
 */
async function processSubmission(submission: Submission): Promise<boolean> {
  try {
    console.log(`=== Starting to process submission: ${submission.id} ===`);

    // Split content into chunks
    console.log("Splitting content into chunks...");
    const chunks = await splitContentIntoChunks(submission.screenerContent);
    console.log(`Content split into ${chunks.length} chunks`);

    if (chunks.length === 0) {
      console.warn("No content chunks generated - submission will be skipped");
      return false;
    }

    // Process chunks
    console.log("Processing content chunks...");
    const summaries = await processContentChunks(chunks, submission);
    const combinedSummary = summaries.join("\n\n=== Next Section ===\n\n");
    console.log(
      `Combined summary length: ${combinedSummary.length} characters`
    );

    // Generate final result
    console.log("Generating final AI screening result...");
    const finalResult = await generateFinalSummary(combinedSummary);
    if (!finalResult) {
      console.error(
        "Failed to generate final summary - submission processing failed"
      );
      return false;
    }

    // Save to database
    console.log("Saving result to database...");
    const saveSuccess = await saveAIScreeningResult(
      submission.id,
      finalResult,
      combinedSummary
    );

    if (saveSuccess) {
      console.log(
        "Database save successful, publishing completion notification..."
      );
      console.log(`=== Submission ${submission.id} processed successfully ===`);
      return true;
    } else {
      console.error("Database save failed - submission processing incomplete");
      return false;
    }
  } catch (error) {
    console.error(`=== Error processing submission ${submission.id}:`, error);
    return false;
  }
}

async function start() {
  const redis = createClient({ url: process.env.REDIS_URL });
  redis.on("error", (e) => console.error("Redis error", e));
  await redis.connect();
  console.log("Worker connected to Redis");

  // Start a simple HTTP server for Cloud Run health checks
  const port = Number(process.env.PORT) || 8080;
  const server = http.createServer((req, res) => {
    if (!req.url) return;
    if (req.url === "/health" || req.url === "/") {
      res.writeHead(200, { "Content-Type": "text/plain" });
      res.end("OK");
    } else {
      res.writeHead(404, { "Content-Type": "text/plain" });
      res.end("Not Found");
    }
  });

  server.listen(port, () =>
    console.log(`Worker HTTP server listening on :${port}`)
  );

  while (true) {
    try {
      console.log("Waiting for items with BRPOP on:", QUEUE);
      const res = await redis.brPop(QUEUE, 0);
      if (!res) continue;
      const item = res.element;
      let submission: Submission;
      try {
        submission = JSON.parse(item);
      } catch {
        console.error("Invalid JSON item, skipping");
        continue;
      }

      console.log("Received and parsed submission", submission);

      // Minimal "processing" step (replace with real work)

      const success = await processSubmission(submission);
      
      if (success) {
        try {
          // Update the deal status in UI listings list (not the processing queue)
          const updatedSubmission = { ...submission, status: "done" };
          const LISTINGS_KEY = "dealListings";
          const listItems = await redis.lRange(LISTINGS_KEY, 0, -1);
          if (!listItems) {
            console.warn("No items found in Redis list");
          } else {
            for (let i = 0; i < listItems.length; i++) {
              const item = listItems[i];
              if (!item) continue;
              try {
                const parsedItem = JSON.parse(item);
                if (parsedItem.id === submission.id) {
                  await redis.lSet(LISTINGS_KEY, i, JSON.stringify(updatedSubmission));
                  console.log(`Updated status for submission ${submission.id} in Redis list`);
                  break;
                }
              } catch (e) {
                console.error("Error parsing item in Redis list:", e);
              }
            }
          }
        } catch (e) {
          console.error("Error updating Redis list:", e);
        }

        // Publish completion notification
        await redis.publish(
          DONE_CHANNEL,
          JSON.stringify({
            userId: submission.userId,
            productId: submission.id,
            status: "done",
            productName: submission.title || submission.id,
          })
        );
      }
    } catch (e) {
      console.error("Worker loop error", e);
      await new Promise((r) => setTimeout(r, 1000));
    }
  }
}

start().catch((e) => {
  console.error("Worker failed to start", e);
  process.exit(1);
});

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions