diff --git a/src/modules/billing/services/payment-email.service.ts b/src/modules/billing/services/payment-email.service.ts index 57049ad02..88c9b9cef 100644 --- a/src/modules/billing/services/payment-email.service.ts +++ b/src/modules/billing/services/payment-email.service.ts @@ -17,6 +17,8 @@ export enum PaymentEmailType { SUBSCRIPTION_EXPIRED = "subscription_expired", PAYMENT_INFO_UPDATED = "payment_info_updated", DOWNGRADED_TO_COMMUNITY = "downgraded_to_community", + TRIAL_EXTENDED = "trial_extended", + PLAN_ADDED = "plan_added", } export interface PaymentEmailData { @@ -50,6 +52,10 @@ export interface PaymentEmailData { workspaces?: any; users?: any; sendEmails?: string[]; + price?: number; + features?: string[]; + upgradeDate?: string; + nextBillingDate?: string; } @Injectable() @@ -108,6 +114,12 @@ export class PaymentEmailService { case PaymentEmailType.DOWNGRADED_TO_COMMUNITY: await this.sendDowngradedToCommunityEmail(data); break; + case PaymentEmailType.TRIAL_EXTENDED: + await this.sendTrialExtendedEmail(data); + break; + case PaymentEmailType.PLAN_ADDED: + await this.sendPlanAddedEmail(data); + break; default: console.warn(`Unknown payment email type: ${emailType}`); } @@ -634,7 +646,7 @@ export class PaymentEmailService { date: Date | number, options?: { grace_period?: boolean }, ): string { - let d = typeof date === "number" ? new Date(date * 1000) : new Date(date); + const d = typeof date === "number" ? new Date(date * 1000) : new Date(date); if (options?.grace_period) { d.setDate(d.getDate() + 3); @@ -646,4 +658,52 @@ export class PaymentEmailService { day: "numeric", }); } + + private async sendTrialExtendedEmail(data: PaymentEmailData): Promise { + const transporter = this.emailService.createTransporter(); + const emailsToSend = data.sendEmails || [data.ownerEmail]; + + for (const email of emailsToSend) { + const mailOptions = { + from: this.configService.get("app.senderEmail"), + to: email, + template: "trialExtendedEmail", + context: { + hubName: data.hubName, + planName: data.planName, + trialStart: this.formatDate(data.billingPeriodStart), + trialEnd: this.formatDate(data.billingPeriodEnd), + seats: data.totalSeats, + }, + subject: `Your trial for ${data.hubName} has been extended`, + }; + + await this.emailService.sendEmail(transporter, mailOptions); + } + } + + private async sendPlanAddedEmail(data: PaymentEmailData): Promise { + const transporter = this.emailService.createTransporter(); + + const mailOptions = { + from: this.configService.get("app.senderEmail"), + to: data.ownerEmail, + text: "Plan Updated", + template: "planUpgradedEmail", + context: { + firstName: this.extractFirstName(data.ownerName), + hubName: data.hubName, + newPlanName: data.planName, + features: data.features, + price: data.price, + interval: data.interval, + upgradeDate: data.upgradeDate, + nextBillingDate: data.nextBillingDate, + sparrowEmail: this.configService.get("support.sparrowEmail"), + }, + subject: `Your hub ${data.hubName} has been upgraded to ${data.planName}`, + }; + + await this.emailService.sendEmail(transporter, mailOptions); + } } diff --git a/src/modules/billing/services/stripe-subscription.service.ts b/src/modules/billing/services/stripe-subscription.service.ts index a82355ddd..80b904367 100644 --- a/src/modules/billing/services/stripe-subscription.service.ts +++ b/src/modules/billing/services/stripe-subscription.service.ts @@ -514,6 +514,10 @@ export class StripeSubscriptionService { const isTrialOngoing = trialEndDateStr && new Date(trialEndDateStr).getTime() > Date.now(); + const isTrialExtension = + metadata?.trialExtension === "true" || + (team?.billing?.in_trial === true && metadata?.trial_end_date); + // Initialize or update the licenses object based on billing seats const currentSeats = latestSubscription?.quantity || metadata?.userCount || 1; @@ -630,10 +634,17 @@ export class StripeSubscriptionService { // Create billing details object with successful payment status const billingDetails = { - current_period_start: period.start - ? new Date(period.start * 1000) - : new Date(), - current_period_end: period.end ? new Date(period.end * 1000) : null, + current_period_start: isTrialExtension + ? team.billing?.current_period_start + : period.start + ? new Date(period.start * 1000) + : new Date(), + + current_period_end: isTrialExtension + ? new Date(metadata.trial_end_date) + : period.end + ? new Date(period.end * 1000) + : null, amount_billed: amount, currency: invoice.currency, status: SubscriptionStatus.ACTIVE, @@ -663,7 +674,21 @@ export class StripeSubscriptionService { ), }; - await this.updateTeamPlanWithBilling(metadata.hubId, plan, billingDetails); + // Get current team plan + const existingTeam = await this.stripeSubscriptionRepo.findTeamById( + metadata.hubId, + ); + + // Skip webhook overwrite ONLY if admin changed plan recently + if (existingTeam?.billing?.updatedBy === BillingSource.API_CALL) { + console.log("Skipping webhook overwrite due to admin plan change"); + } else { + await this.updateTeamPlanWithBilling( + metadata.hubId, + plan, + billingDetails, + ); + } if (!isDowngrading) { const teamIdObject = new ObjectId(metadata.hubId); const updateTeam = diff --git a/src/modules/common/models/user.model.ts b/src/modules/common/models/user.model.ts index 68854fd7b..afba276a4 100644 --- a/src/modules/common/models/user.model.ts +++ b/src/modules/common/models/user.model.ts @@ -166,6 +166,10 @@ export class User { @ValidateNested() @Type(() => TourGuideDto) tourGuide?: TourGuideDto; + + @IsBoolean() + @IsOptional() + isWeeklyDigestEnabled?: boolean; } export class UserDto { diff --git a/src/modules/identity/controllers/user.controller.ts b/src/modules/identity/controllers/user.controller.ts index 0fbeb4262..9239ed304 100644 --- a/src/modules/identity/controllers/user.controller.ts +++ b/src/modules/identity/controllers/user.controller.ts @@ -10,6 +10,7 @@ import { Req, Res, UseGuards, + Query, } from "@nestjs/common"; import { ApiBearerAuth, @@ -614,4 +615,29 @@ export class UserController { result, ); } + + @Get("unsubscribe-weekly-digest") + @ApiOperation({ + summary: "Unsubscribe from weekly digest emails", + }) + async unsubscribeWeeklyDigest( + @Query("userId") userId: string, + @Res() res: FastifyReply, + ) { + await this.userService.disableWeeklyDigest(userId); + + return res.header("Content-Type", "text/html; charset=utf-8").send(` +
+

βœ… You have been unsubscribed

+

+ You will no longer receive weekly digest emails. +

+
+ `); + } } diff --git a/src/modules/identity/repositories/user.repository.ts b/src/modules/identity/repositories/user.repository.ts index 3c7da9a91..7070a32c4 100644 --- a/src/modules/identity/repositories/user.repository.ts +++ b/src/modules/identity/repositories/user.repository.ts @@ -3,7 +3,11 @@ import { Db, InsertOneResult, ModifyResult, ObjectId, WithId } from "mongodb"; import { Collections } from "@src/modules/common/enum/database.collection.enum"; import { createHmac } from "crypto"; import { RegisterPayload } from "../payloads/register.payload"; -import { UpdateUserDto, UserDto, UserTourGuideDto } from "../payloads/user.payload"; +import { + UpdateUserDto, + UserDto, + UserTourGuideDto, +} from "../payloads/user.payload"; import { EarlyAccessEmail, EmailServiceProvider, @@ -468,4 +472,85 @@ export class UserRepository { return false; } } + + async getAllUsers(): Promise[]> { + return await this.db + .collection(Collections.USER) + .find( + { isEmailVerified: true }, // only verified users + { projection: { password: 0 } }, + ) + .toArray(); + } + + async getUsersForWeeklyDigest(email?: string): Promise[]> { + return await this.db + .collection(Collections.USER) + .find( + { + isEmailVerified: true, + isWeeklyDigestEnabled: { $ne: false }, + ...(email ? { email } : {}), + }, + { + projection: { + email: 1, + name: 1, + isWeeklyDigestEnabled: 1, + }, + }, + ) + .toArray(); + } + + async disableWeeklyDigest(userId: string) { + return this.db + .collection(Collections.USER) + .updateOne( + { _id: new ObjectId(userId) }, + { $set: { isWeeklyDigestEnabled: false } }, + ); + } + + async updateUserByQuery(filter: any, update: any) { + return this.db.collection(Collections.USER).updateOne(filter, update); + } + + /** + * Fetch users for weekly digest in batches using cursor-based pagination. + * @param batchSize Number of users to fetch per batch + * @param lastCursor The _id of the last user from the previous batch (for cursor-based pagination) + * @param qaEmail Optional email for QA testing (to fetch a single user) + * @returns Array of users for the current batch + */ + async getUsersBatchForWeeklyDigest( + batchSize: number, + lastCursor?: ObjectId, + qaEmail?: string, + ): Promise[]> { + const query: any = { + isEmailVerified: true, + isWeeklyDigestEnabled: { $ne: false }, + ...(qaEmail ? { email: qaEmail } : {}), + }; + + // Cursor-based pagination: fetch users with _id greater than lastCursor + if (lastCursor) { + query._id = { $gt: lastCursor }; + } + + return await this.db + .collection(Collections.USER) + .find(query, { + projection: { + _id: 1, + email: 1, + name: 1, + isWeeklyDigestEnabled: 1, + }, + }) + .sort({ _id: 1 }) + .limit(batchSize) + .toArray(); + } } diff --git a/src/modules/identity/repositories/userInvites.repository.ts b/src/modules/identity/repositories/userInvites.repository.ts index 2a5b84451..362a6fa67 100644 --- a/src/modules/identity/repositories/userInvites.repository.ts +++ b/src/modules/identity/repositories/userInvites.repository.ts @@ -71,4 +71,65 @@ export class UserInvitesRepository { .deleteOne({ email }); return result; } + + async getPendingInvites(start: Date, end: Date, email: string) { + return this.db + .collection(Collections.USERINVITES) + .find({ + createdAt: { $gte: start, $lte: end }, + email: email, + }) + .limit(5) + .toArray(); + } + + /** + * Get pending invites for a batch of emails using aggregation. + * Returns invites grouped by email for efficient batch processing. + * @param start Start date range + * @param end End date range + * @param emails Array of email addresses to fetch pending invites for + * @returns Map of email to array of pending action strings + */ + async getPendingInvitesForBatch( + start: Date, + end: Date, + emails: string[], + ): Promise> { + const results = await this.db + .collection(Collections.USERINVITES) + .aggregate([ + { + $match: { + createdAt: { $gte: start, $lte: end }, + email: { $in: emails }, + }, + }, + { + $sort: { createdAt: -1 }, + }, + { + $group: { + _id: "$email", + invites: { $push: "$email" }, + }, + }, + { + $project: { + _id: 1, + invites: { $slice: ["$invites", 5] }, + }, + }, + ]) + .toArray(); + + const invitesMap = new Map(); + for (const result of results) { + const pendingActions = (result.invites || []).map( + (email: string) => `Invitation sent to ${email}`, + ); + invitesMap.set(result._id, pendingActions); + } + return invitesMap; + } } diff --git a/src/modules/identity/services/user.service.ts b/src/modules/identity/services/user.service.ts index 08c996653..2a79b87d7 100644 --- a/src/modules/identity/services/user.service.ts +++ b/src/modules/identity/services/user.service.ts @@ -840,4 +840,11 @@ export class UserService { }); return response; } + + async disableWeeklyDigest(userId: string) { + return this.userRepository.updateUserByQuery( + { _id: new ObjectId(userId) }, + { $set: { isWeeklyDigestEnabled: false } }, + ); + } } diff --git a/src/modules/user-admin/controllers/user-admin.hubs.controller.ts b/src/modules/user-admin/controllers/user-admin.hubs.controller.ts index cf8da42ba..433881a9e 100644 --- a/src/modules/user-admin/controllers/user-admin.hubs.controller.ts +++ b/src/modules/user-admin/controllers/user-admin.hubs.controller.ts @@ -22,6 +22,7 @@ import { ApiConsumes, ApiBody, ApiResponse, + ApiParam, } from "@nestjs/swagger"; import { FastifyReply } from "fastify"; import { ApiResponseService } from "@src/modules/common/services/api-response.service"; @@ -41,6 +42,9 @@ import { TeamService } from "@src/modules/identity/services/team.service"; import { ExtendedFastifyRequest } from "@src/types/fastify"; import { CreateOrUpdateAdminHubDto } from "../payloads/hub.payload"; import { SalesEmailService } from "@src/modules/workspace/services/sales-email.service"; +import { ExtendTrialDto } from "../payloads/trial-extension.payload"; +import { AddPlanDto } from "../payloads/add-plan.payload"; +import { ChangePlanDto } from "../payloads/change-plan.payload"; @Controller("api/admin") @ApiTags("admin hubs") @@ -379,4 +383,129 @@ export class AdminHubsController { return res.status(statusCode).send(responseData); } } + + @UseGuards(JwtAuthGuard, RolesGuard) + @Roles("super-admin") + @ApiBearerAuth() + @ApiOperation({ summary: "Extend trial period for a hub" }) + @ApiParam({ + name: "hubId", + description: "Unique Hub ID", + example: "69ae736e7ef406283329e75d", + }) + @ApiBody({ + type: ExtendTrialDto, + description: "Trial extension request body", + }) + @ApiResponse({ + status: 200, + description: "Trial extended successfully", + }) + @Post("hubs/:hubId/trial/extend") + async extendTrial( + @Param("hubId") hubId: string, + @Body() body: ExtendTrialDto, + @Req() request: any, + + @Res() res: FastifyReply, + ) { + console.log("USER:", request.user); + const result = await this.hubsService.extendTrial( + hubId, + body.extensionDays, + body.reason, + body.notifyCustomer, + ); + + const response = new ApiResponseService( + "Trial extended successfully", + HttpStatusCode.OK, + result, + ); + + return res.status(response.httpStatusCode).send(response); + } + + @UseGuards(JwtAuthGuard, RolesGuard) + @Roles("super-admin") + @ApiBearerAuth() + @ApiOperation({ summary: "Add a subscription plan to a hub" }) + @ApiParam({ + name: "hubId", + description: "Hub ID", + example: "69ae736e7ef406283329e75d", + }) + @ApiBody({ + type: AddPlanDto, + description: "Plan addition request body", + }) + @ApiResponse({ + status: 200, + description: "Plan added successfully", + }) + @Post("hubs/:hubId/plans/add") + async addPlan( + @Param("hubId") hubId: string, + @Body() body: AddPlanDto, + @Req() request: any, + @Res() res: FastifyReply, + ) { + const result = await this.hubsService.addPlanToHub( + hubId, + body.planId, + body.effectiveDate, + body.billingCycle, + body.notes, + ); + + const response = new ApiResponseService( + "Plan added successfully", + HttpStatusCode.OK, + result, + ); + + return res.status(response.httpStatusCode).send(response); + } + + @UseGuards(JwtAuthGuard, RolesGuard) + @Roles("super-admin") + @ApiBearerAuth() + @ApiOperation({ summary: "Change hub subscription plan (upgrade/downgrade)" }) + @ApiParam({ + name: "hubId", + description: "Hub ID", + example: "69ae736e7ef406283329e75d", + }) + @ApiBody({ + type: ChangePlanDto, + description: "Plan change request body", + }) + @ApiResponse({ + status: 200, + description: "Plan changed successfully", + }) + @Put("hubs/:hubId/plan/change") + async changePlan( + @Param("hubId") hubId: string, + @Body() body: ChangePlanDto, + @Req() request: any, + @Res() res: FastifyReply, + ) { + const result = await this.hubsService.changeHubPlan( + hubId, + body.currentPlanId, + body.newPlanId, + body.changeType, + body.effectiveDate, + body.prorate, + ); + + const response = new ApiResponseService( + "Plan changed successfully", + HttpStatusCode.OK, + result, + ); + + return res.status(response.httpStatusCode).send(response); + } } diff --git a/src/modules/user-admin/payloads/add-plan.payload.ts b/src/modules/user-admin/payloads/add-plan.payload.ts new file mode 100644 index 000000000..67096b5e5 --- /dev/null +++ b/src/modules/user-admin/payloads/add-plan.payload.ts @@ -0,0 +1,21 @@ +import { ApiProperty } from "@nestjs/swagger"; +import { IsString, IsOptional, IsDateString } from "class-validator"; + +export class AddPlanDto { + @ApiProperty({ example: "premium_plan_123" }) + @IsString() + planId: string; + + @ApiProperty({ example: "2024-01-15" }) + @IsDateString() + effectiveDate: string; + + @ApiProperty({ example: "monthly" }) + @IsString() + billingCycle: string; + + @ApiProperty({ example: "Adding premium features" }) + @IsOptional() + @IsString() + notes?: string; +} diff --git a/src/modules/user-admin/payloads/change-plan.payload.ts b/src/modules/user-admin/payloads/change-plan.payload.ts new file mode 100644 index 000000000..b2b5e2b28 --- /dev/null +++ b/src/modules/user-admin/payloads/change-plan.payload.ts @@ -0,0 +1,24 @@ +import { ApiProperty } from "@nestjs/swagger"; +import { IsString, IsBoolean, IsDateString } from "class-validator"; + +export class ChangePlanDto { + @ApiProperty({ example: "69a57d17ce77429c5623abcb" }) + @IsString() + currentPlanId: string; + + @ApiProperty({ example: "69a57d17ce77429c5623abcc" }) + @IsString() + newPlanId: string; + + @ApiProperty({ example: "upgrade" }) + @IsString() + changeType: string; + + @ApiProperty({ example: "2024-02-01" }) + @IsDateString() + effectiveDate: string; + + @ApiProperty({ example: true }) + @IsBoolean() + prorate: boolean; +} diff --git a/src/modules/user-admin/payloads/trial-extension.payload.ts b/src/modules/user-admin/payloads/trial-extension.payload.ts new file mode 100644 index 000000000..5148e7d9d --- /dev/null +++ b/src/modules/user-admin/payloads/trial-extension.payload.ts @@ -0,0 +1,24 @@ +import { ApiProperty } from "@nestjs/swagger"; +import { + IsBoolean, + IsNumber, + IsOptional, + IsString, + Min, +} from "class-validator"; + +export class ExtendTrialDto { + @ApiProperty({ example: 30 }) + @IsNumber() + @Min(1) + extensionDays: number; + + @ApiProperty({ example: "Customer requested extension" }) + @IsString() + reason: string; + + @ApiProperty({ example: true }) + @IsOptional() + @IsBoolean() + notifyCustomer?: boolean; +} diff --git a/src/modules/user-admin/repositories/user-admin.hubs.repository.ts b/src/modules/user-admin/repositories/user-admin.hubs.repository.ts index e080fd551..0c93534d3 100644 --- a/src/modules/user-admin/repositories/user-admin.hubs.repository.ts +++ b/src/modules/user-admin/repositories/user-admin.hubs.repository.ts @@ -248,4 +248,63 @@ export class AdminHubsRepository { throw new InternalServerErrorException("Failed to update team feedback"); } } + + async updateHubBillingPeriod( + hubId: string, + newTrialEnd: Date, + ): Promise { + try { + const hubObjectId = new ObjectId(hubId); + + await this.db.collection(Collections.TEAM).updateOne( + { _id: hubObjectId }, + { + $set: { + "billing.current_period_end": newTrialEnd, + }, + }, + ); + } catch (error) { + console.error("Error updating hub billing period:", error); + throw new InternalServerErrorException( + "Failed to update hub billing period", + ); + } + } + + async findPlanById(planId: string) { + try { + const planObjectId = new ObjectId(planId); + + const plan = await this.db + .collection(Collections.PLAN) + .findOne({ _id: planObjectId, active: true }); + + return plan; + } catch (error) { + console.error("Error fetching plan:", error); + throw new InternalServerErrorException("Failed to fetch plan"); + } + } + + async updateHubPlan(hubId: string, plan: any): Promise { + try { + const hubObjectId = new ObjectId(hubId); + + await this.db.collection(Collections.TEAM).updateOne( + { _id: hubObjectId }, + { + $set: { + plan: { + ...plan, + id: plan._id, + }, + }, + }, + ); + } catch (error) { + console.error("Error updating hub plan:", error); + throw new InternalServerErrorException("Failed to update hub plan"); + } + } } diff --git a/src/modules/user-admin/services/user-admin.hubs.service.ts b/src/modules/user-admin/services/user-admin.hubs.service.ts index 52d9276ac..85352bdce 100644 --- a/src/modules/user-admin/services/user-admin.hubs.service.ts +++ b/src/modules/user-admin/services/user-admin.hubs.service.ts @@ -1,4 +1,8 @@ -import { Injectable, NotFoundException } from "@nestjs/common"; +import { + BadRequestException, + Injectable, + NotFoundException, +} from "@nestjs/common"; import { ObjectId } from "mongodb"; import { AdminHubsRepository } from "../repositories/user-admin.hubs.repository"; @@ -6,6 +10,19 @@ import { AdminWorkspaceRepository } from "../repositories/user-admin.workspace.r import { TeamRole } from "@src/modules/common/enum/roles.enum"; import { PlanName } from "@src/modules/common/enum/plan.enum"; import { UserRepository } from "@src/modules/identity/repositories/user.repository"; +import { StripeSubscriptionService } from "@src/modules/billing/services/stripe-subscription.service"; +import { BillingAuditService } from "@src/modules/billing/services/billing-audit.service"; +import { + PaymentEmailService, + PaymentEmailType, +} from "@src/modules/billing/services/payment-email.service"; +import { + BillingActorType, + BillingSource, + PaymentProvider, +} from "@src/modules/common/enum/billing.enum"; +import { ConfigService } from "@nestjs/config"; +import { HttpService } from "@nestjs/axios"; interface SortOptions { sortBy: string; @@ -18,6 +35,11 @@ export class AdminHubsService { private readonly teamsRepo: AdminHubsRepository, private readonly workspaceRepo: AdminWorkspaceRepository, private readonly userRepo: UserRepository, + private readonly stripeSubscriptionService: StripeSubscriptionService, + private readonly billingAuditService: BillingAuditService, + private readonly paymentEmailService: PaymentEmailService, + private readonly configService: ConfigService, + private readonly httpService: HttpService, ) {} async getHubsForUser(userId: string) { @@ -39,7 +61,7 @@ export class AdminHubsService { role: matchedUser?.role, users: team.users, workspaces: team.workspaces, - plan: team?.plan?.name + plan: team?.plan?.name, }; }); } @@ -304,4 +326,424 @@ export class AdminHubsService { return { success: true }; } + + async extendTrial( + hubId: string, + extensionDays: number, + reason: string, + notifyCustomer?: boolean, + ) { + // Validate hub exists + const team = await this.teamsRepo.findHubById(hubId); + + if (!team) { + throw new NotFoundException("Hub not found"); + } + + // Validate trial status + if (team.billing?.in_trial !== true) { + throw new BadRequestException("Hub is not currently in trial"); + } + + // Validate extension limits + const MAX_TOTAL_TRIAL_DAYS = 180; + + const currentTrialEnd = new Date(team.billing.current_period_end); + + if (!currentTrialEnd) { + throw new BadRequestException("Trial end date not found"); + } + + // Calculate new trial end + const newTrialEnd = new Date( + currentTrialEnd.getTime() + extensionDays * 24 * 60 * 60 * 1000, + ); + + // Validate total trial duration + const trialStart = new Date(team.billing.current_period_start); + + const maxAllowedTrialEnd = new Date( + trialStart.getTime() + MAX_TOTAL_TRIAL_DAYS * 24 * 60 * 60 * 1000, + ); + + if (newTrialEnd > maxAllowedTrialEnd) { + throw new BadRequestException( + `Trial cannot exceed ${MAX_TOTAL_TRIAL_DAYS} days from start`, + ); + } + + // Get Stripe subscription + const stripeProvider = team.billing?.paymentProviders?.find( + (p: any) => p.provider === PaymentProvider.STRIPE, + ); + + if (!stripeProvider?.subscriptionId) { + throw new BadRequestException("Stripe subscription not found for hub"); + } + + const subscriptionId = stripeProvider.subscriptionId; + + if (!currentTrialEnd) { + throw new BadRequestException("Trial end date not found"); + } + + // Update Stripe if real subscription + if (!subscriptionId.startsWith("sub_test")) { + await this.stripeSubscriptionService["stripeService"].updateSubscription( + subscriptionId, + undefined, + { + hubId: hubId, + planName: team.plan?.name, + trial_end_date: newTrialEnd.toISOString(), + trialExtension: "true", + extensionDays: extensionDays.toString(), + }, + ); + } + + // Update database billing + await this.teamsRepo.updateHubBillingPeriod(hubId, newTrialEnd); + console.log("ADMIN API updating billing to:", newTrialEnd); + + // Record audit event + await this.billingAuditService.recordTrialStarted( + hubId, + team.plan?.name, + { + trialEndDate: newTrialEnd, + seats: team.billing?.seats || 1, + }, + { + actor: { + type: BillingActorType.SYSTEM, + name: "Admin Trial Extension", + }, + source: BillingSource.API_CALL, + reason, + }, + ); + + // Optional email notification + if (notifyCustomer) { + try { + const emails = team.users?.map((u: any) => u.email) || []; + + if (emails) { + await this.paymentEmailService.sendPaymentEmail( + PaymentEmailType.TRIAL_EXTENDED, + { + sendEmails: emails, + hubName: team.name, + planName: team.plan?.name, + billingPeriodStart: team.billing.current_period_start, + billingPeriodEnd: newTrialEnd, + totalSeats: team.billing?.seats || 1, + }, + ); + } + } catch (error) { + console.warn("Failed to send trial extension email", error); + } + } + + // Return response + return { + hubId, + previousTrialEnd: currentTrialEnd, + newTrialEnd, + extensionDays, + }; + } + + async addPlanToHub( + hubId: string, + planId: string, + effectiveDate: string, + billingCycle: string, + notes?: string, + ) { + // Validate hub exists + const team = await this.teamsRepo.findHubById(hubId); + + if (!team) { + throw new NotFoundException("Hub not found"); + } + + // Validate plan exists + const plan = await this.teamsRepo.findPlanById(planId); + + if (!plan) { + throw new BadRequestException("Plan not found or inactive"); + } + //Prevent adding same plan again + const currentPlan = team.plan?.name; + + if (currentPlan === plan.name) { + throw new BadRequestException(`Hub already has ${plan.name} plan`); + } + + // Calculate proration + let proratedAmount = 0; + + const billingStart = new Date(team.billing?.current_period_start); + const billingEnd = new Date(team.billing?.current_period_end); + const effective = new Date(effectiveDate); + + const totalPeriod = billingEnd.getTime() - billingStart.getTime(); + + const remainingPeriod = billingEnd.getTime() - effective.getTime(); + + if (remainingPeriod > 0) { + const remainingRatio = remainingPeriod / totalPeriod; + + const planPrice = plan.price || 0; + + proratedAmount = Math.round(planPrice * remainingRatio); + } + // Get Stripe subscription + const stripeProvider = team.billing?.paymentProviders?.find( + (p: any) => p.provider === PaymentProvider.STRIPE, + ); + + const subscriptionId = stripeProvider?.subscriptionId; + + // If no Stripe subscription (community/self-host hubs) + if (!subscriptionId) { + proratedAmount = 0; + } + + // Update Stripe subscription with new plan + if (subscriptionId && !subscriptionId.startsWith("sub_test")) { + await this.stripeSubscriptionService["stripeService"].updateSubscription( + subscriptionId, + undefined, + { + hubId: hubId, + newPlan: plan.name, + billingCycle, + effectiveDate, + notes, + }, + ); + } + + // Update hub plan in database + await this.teamsRepo.updateHubPlan(hubId, plan); + + // Record billing audit + await this.billingAuditService.recordSubscriptionCreated( + hubId, + plan.name, + { + proratedAmount, + billingCycle, + effectiveDate, + }, + { + actor: { + type: BillingActorType.SYSTEM, + name: "Admin Plan Addition", + }, + source: BillingSource.API_CALL, + reason: notes || "Admin added plan", + }, + ); + + // Send notification email + try { + const owner = team.users?.find((u: any) => u.role === "owner"); + + if (owner) { + await this.paymentEmailService.sendPaymentEmail( + PaymentEmailType.PLAN_ADDED, + { + ownerEmail: owner.email, + ownerName: owner.name, + hubName: team.name, + planName: plan.name, + + price: plan.price || 0, + interval: billingCycle || "month", + + upgradeDate: new Date(effectiveDate).toDateString(), + + nextBillingDate: team.billing?.current_period_end + ? new Date(team.billing.current_period_end).toDateString() + : "", + + features: [ + `Up to ${plan.limits?.workspacesPerHub?.value || "multiple"} workspaces`, + "Unlimited collaborators", + "Private hubs", + "Unlimited collections", + ], + }, + ); + } + } catch (error) { + console.warn("Failed to send plan addition email", error); + } + + return { + hubId, + previousPlan: currentPlan, + newPlan: plan.name, + effectiveDate, + billingCycle, + proratedAmount, + }; + } + + async changeHubPlan( + hubId: string, + currentPlanId: string, + newPlanId: string, + changeType: string, + effectiveDate: string, + prorate: boolean, + ) { + // Validate hub + const team = await this.teamsRepo.findHubById(hubId); + + if (!team) { + throw new NotFoundException("Hub not found"); + } + + // Fetch plans + const currentPlan = await this.teamsRepo.findPlanById(currentPlanId); + const newPlan = await this.teamsRepo.findPlanById(newPlanId); + + if (!currentPlan || !newPlan) { + throw new BadRequestException("Invalid plan"); + } + + // Validate hub current plan + if (team.plan?.id.toString() !== currentPlanId) { + throw new BadRequestException( + "Hub does not currently have the specified plan", + ); + } + + // Determine plan hierarchy + const currentPlanTier = currentPlan.limits?.workspacesPerHub?.value || 0; + + const newPlanTier = newPlan.limits?.workspacesPerHub?.value || 0; + + if (changeType === "upgrade" && newPlanTier <= currentPlanTier) { + throw new BadRequestException( + "New plan must be higher than current plan for upgrade", + ); + } + + if (changeType === "downgrade" && newPlanTier >= currentPlanTier) { + throw new BadRequestException( + "New plan must be lower than current plan for downgrade", + ); + } + + let proratedAmount = 0; + + if (prorate) { + const billingStart = new Date(team.billing?.current_period_start); + const billingEnd = new Date(team.billing?.current_period_end); + const effective = new Date(effectiveDate); + + const totalPeriod = billingEnd.getTime() - billingStart.getTime(); + + const remainingPeriod = billingEnd.getTime() - effective.getTime(); + + if (remainingPeriod > 0) { + const ratio = remainingPeriod / totalPeriod; + + const currentPrice = currentPlan.price || 0; + const newPrice = newPlan.price || 0; + + proratedAmount = Math.round((newPrice - currentPrice) * ratio); + } + } + + const stripeProvider = team.billing?.paymentProviders?.find( + (p: any) => p.provider === PaymentProvider.STRIPE, + ); + + const subscriptionId = stripeProvider?.subscriptionId; + + // Update Stripe subscription if exists + if (subscriptionId && !subscriptionId.startsWith("sub_test")) { + await this.stripeSubscriptionService["stripeService"].updateSubscription( + subscriptionId, + undefined, + { + hubId, + newPlan: newPlan.name, + changeType, + proratedAmount, + effectiveDate, + updatedByAdmin: true, + }, + ); + } + + await this.teamsRepo.updateHubPlan(hubId, newPlan); + + await this.billingAuditService.recordSubscriptionCreated( + hubId, + newPlan.name, + { + changeType, + proratedAmount, + effectiveDate, + }, + { + actor: { + type: BillingActorType.SYSTEM, + name: "Admin Plan Change", + }, + source: BillingSource.API_CALL, + reason: `Admin ${changeType}`, + }, + ); + try { + const owner = team.users?.find((u: any) => u.role === "owner"); + + if (owner) { + await this.paymentEmailService.sendPaymentEmail( + PaymentEmailType.PLAN_ADDED, + { + ownerEmail: owner.email, + ownerName: owner.name, + hubName: team.name, + planName: newPlan.name, + price: newPlan.price || 0, + interval: "month", + + upgradeDate: new Date(effectiveDate).toDateString(), + + nextBillingDate: team.billing?.current_period_end + ? new Date(team.billing.current_period_end).toDateString() + : "", + + features: [ + `Up to ${newPlan.limits?.workspacesPerHub?.value || "multiple"} workspaces`, + "Unlimited collaborators", + "Private hubs", + "Unlimited collections", + ], + }, + ); + } + } catch (error) { + console.warn("Failed to send plan change email", error); + } + + return { + hubId, + previousPlan: currentPlan.name, + newPlan: newPlan.name, + changeType, + effectiveDate, + proratedAmount, + }; + } } diff --git a/src/modules/user-admin/user-admin.module.ts b/src/modules/user-admin/user-admin.module.ts index b8ea9870a..adf5c7392 100644 --- a/src/modules/user-admin/user-admin.module.ts +++ b/src/modules/user-admin/user-admin.module.ts @@ -23,13 +23,19 @@ import { AdminUsersController } from "./controllers/user-admin.enterprise-user.c import { AdminUsersService } from "./services/user-admin.enterprise-user.service"; import { AdminUpdatesRepository } from "./repositories/user-admin.updates.repository"; import { BillingModule } from "../billing/billing.module"; +import { HttpModule } from "@nestjs/axios"; /** * Admin Module provides all necessary services, handlers, repositories, * and controllers related to the admin dashboard functionality. */ @Module({ - imports: [IdentityModule, WorkspaceModule, BillingModule.register()], + imports: [ + IdentityModule, + WorkspaceModule, + BillingModule.register(), + HttpModule, + ], providers: [ WorkspaceService, JwtService, diff --git a/src/modules/views/trialExtendedEmail.handlebars b/src/modules/views/trialExtendedEmail.handlebars new file mode 100644 index 000000000..e5f4dca93 --- /dev/null +++ b/src/modules/views/trialExtendedEmail.handlebars @@ -0,0 +1,187 @@ +{{!< layoutName}} + + + + + + Trial Email + + + + + + +
+
+
+ + {{> headerV2}} + + + + +
+ + + + +
+
+

+ Your Sparrow Trial Has Been Extended +

+
+
+
+
+ + + + + +
+ +
+ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
+ + + + +
+
+ +

+ What’s Next? +

+

Click the button below to navigate to your hub and start exploring:

+
+ + + + +
+ +
+
+
+
+
+ + + + +
+ +
+ + +
+ + + {{> footerV2}} + + +
+
+
+ + + \ No newline at end of file diff --git a/src/modules/views/weeklyDigestEmail.handlebars b/src/modules/views/weeklyDigestEmail.handlebars new file mode 100644 index 000000000..29dec296e --- /dev/null +++ b/src/modules/views/weeklyDigestEmail.handlebars @@ -0,0 +1,278 @@ + + + + + + + + +
+ + + + + + + + + + + + + + + {{!-- Date --}} + + + + + + + + + + + + + + + + + + + + {{!-- Collaboration Updates --}} + + {{#if collaborationUpdates.length}} + + + + + + {{/if}} + + + + {{!-- Pending Actions --}} + + {{#if pendingActions.length}} + + + + + + {{/if}} + + + + + + + + + + + + + + +{{> footer}} + +
+ Hey {{userName}}, how did your APIs behave this week? +
+ Sparrow – Weekly Digest +
+ {{dateRange}} +
+ + + + + + + + + +
+ +
+ Execution trend +
+ +
+ {{execution.total}} +
+ +
+ ↑ {{execution.percent}}% from last week +
+ +
+ + + + + {{#each execution.graph}} + + {{/each}} + + + + + + + + + + + + + + +
+
+
+
MTWTFSS
+ +
+ +
+ +
+ Additional highlights +
+ + + + + + {{!-- Repeat this block --}} + + + + + + + + + + + + + +
+ +
+ πŸ“‘ +
+ +
+ {{metrics.apisCreated}} +
+ +
+ API's created +
+ +
+
+ ⚑ +
+
+ {{metrics.testflowsExecuted}} +
+
+ Test Flows Executed +
+
+
+ πŸ‘€ +
+
+ {{metrics.activeWorkspaces}} +
+
+ Active Workspaces +
+
+
+ πŸ“ +
+
+ {{metrics.newCollections}} +
+
+ New Collections +
+
+
+ βž• +
+
+ {{metrics.newWorkspaces}} +
+
+ New Workspace +
+
+ +
+ +
+ Collaboration Updates +
+ +
    + {{#each collaborationUpdates}} +
  • {{this}}
  • + {{/each}} +
+ +
+ +
+ Pending Actions +
+ +
    + {{#each pendingActions}} +
  • {{this}}
  • + {{/each}} +
+ +
+ +
+ Continue in Sparrow.. +
+ + + +
+ +
+ You're receiving this email because you're part of an active Sparrow workspace. +
+ + + +
+ +
+ + + \ No newline at end of file diff --git a/src/modules/workspace/repositories/collection.repository.ts b/src/modules/workspace/repositories/collection.repository.ts index 252c6d858..172105311 100644 --- a/src/modules/workspace/repositories/collection.repository.ts +++ b/src/modules/workspace/repositories/collection.repository.ts @@ -2107,4 +2107,35 @@ export class CollectionRepository { }; } } + + // New Collections Count + async getNewCollectionsCount(start: Date, end: Date): Promise { + return await this.db.collection(Collections.COLLECTION).countDocuments({ + createdAt: { $gte: start, $lte: end }, + }); + } + + // APIs Created Count + async getApisCreatedCount(start: Date, end: Date): Promise { + const [result] = await this.db + .collection(Collections.COLLECTION) + .aggregate<{ count: number }>([ + { + $unwind: "$items", + }, + { + $match: { + "items.type": { $ne: "FOLDER" }, + "items.isDeleted": { $ne: true }, + "items.createdAt": { $gte: start, $lte: end }, + }, + }, + { + $count: "count", + }, + ]) + .toArray(); + + return result?.count ?? 0; + } } diff --git a/src/modules/workspace/repositories/testflow.repository.ts b/src/modules/workspace/repositories/testflow.repository.ts index c76cdae81..e99ed5fc1 100644 --- a/src/modules/workspace/repositories/testflow.repository.ts +++ b/src/modules/workspace/repositories/testflow.repository.ts @@ -612,4 +612,99 @@ export class TestflowRepository { ); return { datasets: result.value?.datasets || null }; } + + async getTestflowsExecutionCount(start: Date, end: Date): Promise { + return this.db.collection(Collections.TESTFLOW).countDocuments({ + $or: [ + { createdAt: { $gte: start, $lte: end } }, + { updatedAt: { $gte: start, $lte: end } }, + ], + }); + } + + /** + * Get testflow execution metrics for a batch of users. + * Aggregates testflow activity by workspaceId and maps to users. + * @param workspaceIds Array of workspace IDs the users have access to + * @param start Start date for activity range + * @param end End date for activity range + * @returns Map of workspaceId to testflow execution count + */ + async getTestflowMetricsForWorkspaces( + workspaceIds: string[], + start: Date, + end: Date, + ): Promise> { + const results = await this.db + .collection(Collections.TESTFLOW) + .aggregate([ + { + $match: { + workspaceId: { $in: workspaceIds }, + $or: [ + { createdAt: { $gte: start, $lte: end } }, + { updatedAt: { $gte: start, $lte: end } }, + ], + }, + }, + { + $group: { + _id: "$workspaceId", + executionCount: { $sum: 1 }, + }, + }, + ]) + .toArray(); + + const metricsMap = new Map(); + for (const result of results) { + metricsMap.set(result._id, result.executionCount || 0); + } + return metricsMap; + } + + /** + * Get testflow execution metrics for a batch of users. + * Uses the user's workspaces to compute aggregated testflow metrics. + * @param userWorkspacesMap Map of userId to array of workspaceIds + * @param start Start date for activity range + * @param end End date for activity range + * @returns Map of userId to testflow execution count + */ + async getTestflowMetricsForUserBatch( + userWorkspacesMap: Map, + start: Date, + end: Date, + ): Promise> { + // Collect all unique workspace IDs + const allWorkspaceIds = new Set(); + for (const workspaceIds of userWorkspacesMap.values()) { + for (const wsId of workspaceIds) { + allWorkspaceIds.add(wsId); + } + } + + if (allWorkspaceIds.size === 0) { + return new Map(); + } + + // Get testflow counts per workspace + const workspaceMetrics = await this.getTestflowMetricsForWorkspaces( + Array.from(allWorkspaceIds), + start, + end, + ); + + // Map workspace metrics back to users + const userMetrics = new Map(); + for (const [userId, workspaceIds] of userWorkspacesMap) { + let totalExecutions = 0; + for (const wsId of workspaceIds) { + totalExecutions += workspaceMetrics.get(wsId) || 0; + } + userMetrics.set(userId, totalExecutions); + } + + return userMetrics; + } } diff --git a/src/modules/workspace/repositories/updates.repository.ts b/src/modules/workspace/repositories/updates.repository.ts index cc9e97c5f..39ac58769 100644 --- a/src/modules/workspace/repositories/updates.repository.ts +++ b/src/modules/workspace/repositories/updates.repository.ts @@ -52,4 +52,82 @@ export class UpdatesRepository { .toArray(); return resposne; } + + async getWeeklyActivity(start: Date, end: Date) { + return this.db + .collection(Collections.UPDATES) + .aggregate([ + { + $match: { + createdAt: { $gte: start, $lte: end }, + }, + }, + { + $group: { + _id: { $dayOfWeek: "$createdAt" }, + count: { $sum: 1 }, + }, + }, + ]) + .toArray(); + } + + async getUpdatesForEmail(start: Date, end: Date, userId: string) { + return this.db + .collection(Collections.UPDATES) + .find({ + createdAt: { $gte: start, $lte: end }, + createdBy: userId, + }) + .sort({ createdAt: -1 }) + .limit(5) + .toArray(); + } + + /** + * Get updates for a batch of users using aggregation. + * Returns updates grouped by userId for efficient batch processing. + * @param start Start date range + * @param end End date range + * @param userIds Array of user IDs to fetch updates for + * @returns Map of userId to array of update messages + */ + async getUpdatesForBatch( + start: Date, + end: Date, + userIds: string[], + ): Promise> { + const results = await this.db + .collection(Collections.UPDATES) + .aggregate([ + { + $match: { + createdAt: { $gte: start, $lte: end }, + createdBy: { $in: userIds }, + }, + }, + { + $sort: { createdAt: -1 }, + }, + { + $group: { + _id: "$createdBy", + updates: { $push: "$message" }, + }, + }, + { + $project: { + _id: 1, + updates: { $slice: ["$updates", 5] }, + }, + }, + ]) + .toArray(); + + const updatesMap = new Map(); + for (const result of results) { + updatesMap.set(result._id, result.updates || []); + } + return updatesMap; + } } diff --git a/src/modules/workspace/repositories/workspace.repository.ts b/src/modules/workspace/repositories/workspace.repository.ts index b8900f766..88681e5de 100644 --- a/src/modules/workspace/repositories/workspace.repository.ts +++ b/src/modules/workspace/repositories/workspace.repository.ts @@ -458,4 +458,200 @@ export class WorkspaceRepository { return { workspaces, total }; } + + async getNewWorkspacesCount(start: Date, end: Date): Promise { + return await this.db + .collection(Collections.WORKSPACE) + .countDocuments({ + createdAt: { $gte: start, $lte: end }, + isRestricted: { $ne: true }, + isFreezed: { $ne: true }, + }); + } + + async getActiveWorkspacesCount(start: Date, end: Date): Promise { + return await this.db.collection(Collections.WORKSPACE).countDocuments({ + $or: [ + { createdAt: { $gte: start, $lte: end } }, + { updatedAt: { $gte: start, $lte: end } }, + ], + isRestricted: { $ne: true }, + isFreezed: { $ne: true }, + }); + } + + /** + * Get aggregated workspace metrics for a batch of users. + * Uses MongoDB aggregation to compute per-user metrics efficiently. + * @param userIds Array of user IDs to fetch metrics for + * @param start Start date for activity range + * @param end End date for activity range + * @returns Map of userId to metrics object + */ + async getWorkspaceMetricsForUserBatch( + userIds: string[], + start: Date, + end: Date, + ): Promise< + Map< + string, + { + activeWorkspaces: number; + newWorkspaces: number; + collectionsCount: number; + apisCount: number; + } + > + > { + const results = await this.db + .collection(Collections.WORKSPACE) + .aggregate([ + // Match workspaces where user is a member (in users array) + { + $match: { + "users.id": { $in: userIds }, + isRestricted: { $ne: true }, + isFreezed: { $ne: true }, + }, + }, + // Unwind users to get individual user-workspace pairs + { + $unwind: "$users", + }, + // Filter only the users we care about + { + $match: { + "users.id": { $in: userIds }, + }, + }, + // Lookup collections for each workspace + { + $lookup: { + from: Collections.COLLECTION, + let: { collectionIds: "$collection" }, + pipeline: [ + { + $match: { + $expr: { + $in: [ + "$_id", + { + $ifNull: [ + { + $map: { + input: "$$collectionIds", + as: "c", + in: "$$c.id", + }, + }, + [], + ], + }, + ], + }, + }, + }, + // Count non-folder and non-deleted items (APIs) + { + $project: { + apisCount: { + $size: { + $filter: { + input: { $ifNull: ["$items", []] }, + as: "item", + cond: { + $and: [ + { $ne: ["$$item.type", "FOLDER"] }, + { $ne: ["$$item.isDeleted", true] }, + ], + }, + }, + }, + }, + }, + }, + ], + as: "collectionsData", + }, + }, + // Group by userId and compute metrics + { + $group: { + _id: "$users.id", + activeWorkspaces: { + $sum: { + $cond: [ + { + $or: [ + { + $and: [ + { $gte: ["$createdAt", start] }, + { $lte: ["$createdAt", end] }, + ], + }, + { + $and: [ + { $gte: ["$updatedAt", start] }, + { $lte: ["$updatedAt", end] }, + ], + }, + ], + }, + 1, + 0, + ], + }, + }, + newWorkspaces: { + $sum: { + $cond: [ + { + $and: [ + { $gte: ["$createdAt", start] }, + { $lte: ["$createdAt", end] }, + ], + }, + 1, + 0, + ], + }, + }, + collectionsCount: { + $sum: { $size: { $ifNull: ["$collection", []] } }, + }, + apisCount: { + $sum: { + $reduce: { + input: "$collectionsData", + initialValue: 0, + in: { $add: ["$$value", "$$this.apisCount"] }, + }, + }, + }, + }, + }, + ]) + .toArray(); + + const metricsMap = new Map< + string, + { + activeWorkspaces: number; + newWorkspaces: number; + collectionsCount: number; + apisCount: number; + } + >(); + + for (const result of results) { + metricsMap.set(result._id, { + activeWorkspaces: result.activeWorkspaces || 0, + newWorkspaces: result.newWorkspaces || 0, + collectionsCount: result.collectionsCount || 0, + apisCount: result.apisCount || 0, + }); + } + + return metricsMap; + } } diff --git a/src/modules/workspace/schedulers/weekly-digest.scheduler.ts b/src/modules/workspace/schedulers/weekly-digest.scheduler.ts new file mode 100644 index 000000000..fc81a6194 --- /dev/null +++ b/src/modules/workspace/schedulers/weekly-digest.scheduler.ts @@ -0,0 +1,25 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { Cron, CronExpression } from "@nestjs/schedule"; +import { WeeklyDigestService } from "../services/weekly-digest.service"; + +@Injectable() +export class WeeklyDigestScheduler { + private readonly logger = new Logger(WeeklyDigestScheduler.name); + + constructor(private readonly weeklyDigestService: WeeklyDigestService) {} + + // Disabled until we are sure it works correctly and doesn't cause issues with the database load. We can enable it later once we have confidence in its stability. + // @Cron(CronExpression.EVERY_30_MINUTES, { + // name: "weekly-digest", + // waitForCompletion: true, + // }) + async handleWeeklyDigest() { + this.logger.log("Starting Weekly Digest Job..."); + + try { + await this.weeklyDigestService.processWeeklyDigest(); + } catch (error) { + this.logger.error("Weekly Digest job failed", error); + } + } +} diff --git a/src/modules/workspace/services/weekly-digest.service.ts b/src/modules/workspace/services/weekly-digest.service.ts new file mode 100644 index 000000000..432e303dd --- /dev/null +++ b/src/modules/workspace/services/weekly-digest.service.ts @@ -0,0 +1,506 @@ +import { Injectable, Logger } from "@nestjs/common"; +import { UserRepository } from "@src/modules/identity/repositories/user.repository"; +import { TestflowRepository } from "../repositories/testflow.repository"; +import { WorkspaceRepository } from "../repositories/workspace.repository"; +import { CollectionRepository } from "../repositories/collection.repository"; +import { EmailService } from "@src/modules/common/services/email.service"; +import { ConfigService } from "@nestjs/config"; +import { UpdatesRepository } from "../repositories/updates.repository"; +import { UserInvitesRepository } from "@src/modules/identity/repositories/userInvites.repository"; +import { ObjectId, WithId } from "mongodb"; +import { User } from "@src/modules/common/models/user.model"; + +/** Configuration for batch processing and concurrency */ +interface BatchConfig { + userBatchSize: number; + emailConcurrency: number; +} + +/** Per-user metrics computed via batch aggregation */ +interface UserMetrics { + activeWorkspaces: number; + newWorkspaces: number; + collectionsCount: number; + apisCount: number; + testflowExecutions: number; +} + +/** Activity graph data for the digest */ +interface ActivityGraph { + totalExecutions: number; + percentChange: number; + graph: Array<{ height: number; isMax: boolean }>; +} + +/** Email data for a single user including their metrics */ +interface UserEmailData { + user: WithId; + metrics: UserMetrics; + collaborationUpdates: string[]; + pendingActions: string[]; +} + +@Injectable() +export class WeeklyDigestService { + private static readonly QA_DIGEST_EMAIL = ""; + private static readonly DEFAULT_BATCH_SIZE = 100; + private static readonly DEFAULT_EMAIL_CONCURRENCY = 5; + + private readonly logger = new Logger(WeeklyDigestService.name); + + constructor( + private readonly userRepository: UserRepository, + private readonly testflowRepository: TestflowRepository, + private readonly workspaceRepository: WorkspaceRepository, + private readonly collectionRepository: CollectionRepository, + private readonly updatesRepository: UpdatesRepository, + private readonly emailService: EmailService, + private readonly configService: ConfigService, + private readonly userInvitesRepository: UserInvitesRepository, + ) {} + + /** + * Main entry point for processing weekly digest emails. + * Uses batching and cursor-based pagination to handle large user counts efficiently. + * All metrics are computed per-batch using MongoDB aggregation pipelines. + */ + async processWeeklyDigest(): Promise { + this.logger.log("Processing weekly digest emails..."); + + const config: BatchConfig = { + userBatchSize: WeeklyDigestService.DEFAULT_BATCH_SIZE, + emailConcurrency: WeeklyDigestService.DEFAULT_EMAIL_CONCURRENCY, + }; + + const qaDigestEmail = WeeklyDigestService.QA_DIGEST_EMAIL; + + // Time range for the digest (last 30 mins for testing, or use getLastWeekRange() for production) + const end = new Date(); + const start = new Date(end.getTime() - 30 * 60 * 1000); + const prevEnd = new Date(start); + const prevStart = new Date(prevEnd.getTime() - 30 * 60 * 1000); + + // Fetch lightweight global activity graph (only updates collection, not heavy) + const activityGraph = await this.fetchActivityGraph( + start, + end, + prevStart, + prevEnd, + ); + + // Process users in batches using cursor-based pagination + let lastCursor: ObjectId | undefined; + let totalUsersProcessed = 0; + let batchNumber = 0; + + while (true) { + batchNumber++; + this.logger.log(`Starting batch ${batchNumber}...`); + + // Fetch the next batch of users + const usersBatch = await this.getUsersBatch( + config.userBatchSize, + lastCursor, + qaDigestEmail, + ); + + if (usersBatch.length === 0) { + this.logger.log(`No more users to process. Ending batch processing.`); + break; + } + + this.logger.log( + `Batch ${batchNumber}: Processing ${usersBatch.length} users...`, + ); + + // Extract user IDs and emails for batch queries + const userIds = usersBatch.map((u) => u._id.toString()); + const emails = usersBatch.map((u) => u.email); + + // Fetch per-user data and metrics in bulk using aggregation + const userEmailDataMap = await this.getMetricsForUserBatch( + start, + end, + userIds, + emails, + usersBatch, + ); + + // Send emails with controlled concurrency + await this.sendEmailsBatch( + userEmailDataMap, + activityGraph, + start, + end, + config.emailConcurrency, + ); + + totalUsersProcessed += usersBatch.length; + this.logger.log( + `Batch ${batchNumber} complete. Total users processed: ${totalUsersProcessed}`, + ); + + // Update cursor for next batch + lastCursor = usersBatch[usersBatch.length - 1]._id; + + // If we got fewer users than the batch size, we've reached the end + if (usersBatch.length < config.userBatchSize) { + this.logger.log(`Reached end of users. Stopping batch processing.`); + break; + } + } + + this.logger.log( + `Weekly digest processing complete. Total users processed: ${totalUsersProcessed}`, + ); + } + + /** + * Fetch a batch of users using cursor-based pagination. + */ + private async getUsersBatch( + batchSize: number, + lastCursor?: ObjectId, + qaEmail?: string, + ): Promise[]> { + return this.userRepository.getUsersBatchForWeeklyDigest( + batchSize, + lastCursor, + qaEmail, + ); + } + + /** + * Fetch lightweight activity graph data. + * Only queries the updates collection which is lightweight compared to workspace/collection scans. + */ + private async fetchActivityGraph( + start: Date, + end: Date, + prevStart: Date, + prevEnd: Date, + ): Promise { + const [activityData, prevActivityData] = await Promise.all([ + this.updatesRepository.getWeeklyActivity(start, end), + this.updatesRepository.getWeeklyActivity(prevStart, prevEnd), + ]); + + const dailyExecutions = this.formatWeeklyGraph(activityData); + const totalExecutions = dailyExecutions.reduce((a, b) => a + b, 0); + + const prevDailyExecutions = this.formatWeeklyGraph(prevActivityData); + const previousCount = prevDailyExecutions.reduce((a, b) => a + b, 0); + + let percentChange = 0; + if (previousCount === 0 && totalExecutions > 0) { + percentChange = 100; + } else if (previousCount > 0) { + percentChange = Math.round( + ((totalExecutions - previousCount) / previousCount) * 100, + ); + } + + const graphHeights = this.normalizeGraphData(dailyExecutions); + const max = Math.max(...graphHeights); + const graph = graphHeights.map((height) => ({ + height, + isMax: height === max, + })); + + return { + totalExecutions, + percentChange, + graph, + }; + } + + /** + * Fetch per-user metrics for a batch of users using bulk aggregation queries. + * Computes workspace, collection, API, and testflow metrics using MongoDB aggregation. + * Avoids N+1 queries by fetching all data in bulk. + */ + private async getMetricsForUserBatch( + start: Date, + end: Date, + userIds: string[], + emails: string[], + users: WithId[], + ): Promise> { + // Build user-to-workspaces map for testflow metrics + const userWorkspacesMap = new Map(); + for (const user of users) { + const workspaceIds = (user.workspaces || []).map((w) => w.workspaceId); + userWorkspacesMap.set(user._id.toString(), workspaceIds); + } + + // Fetch all metrics in parallel using aggregation pipelines + const [workspaceMetricsMap, testflowMetricsMap, updatesMap, invitesMap] = + await Promise.all([ + this.workspaceRepository.getWorkspaceMetricsForUserBatch( + userIds, + start, + end, + ), + this.testflowRepository.getTestflowMetricsForUserBatch( + userWorkspacesMap, + start, + end, + ), + this.updatesRepository.getUpdatesForBatch(start, end, userIds), + this.userInvitesRepository.getPendingInvitesForBatch( + start, + end, + emails, + ), + ]); + + // Build the email data map for each user + const userEmailDataMap = new Map(); + + for (const user of users) { + if (user.isWeeklyDigestEnabled === false) { + continue; + } + + const userId = user._id.toString(); + const collaborationUpdates = updatesMap.get(userId) || []; + const pendingActions = invitesMap.get(user.email) || []; + + // Get workspace metrics for this user + const workspaceMetrics = workspaceMetricsMap.get(userId) || { + activeWorkspaces: 0, + newWorkspaces: 0, + collectionsCount: 0, + apisCount: 0, + }; + + // Get testflow metrics for this user + const testflowExecutions = testflowMetricsMap.get(userId) || 0; + + const metrics: UserMetrics = { + activeWorkspaces: workspaceMetrics.activeWorkspaces, + newWorkspaces: workspaceMetrics.newWorkspaces, + collectionsCount: workspaceMetrics.collectionsCount, + apisCount: workspaceMetrics.apisCount, + testflowExecutions, + }; + + userEmailDataMap.set(userId, { + user, + metrics, + collaborationUpdates, + pendingActions, + }); + } + + return userEmailDataMap; + } + + /** + * Send emails to a batch of users with controlled concurrency. + * Uses a promise pool pattern to limit concurrent email sends. + */ + private async sendEmailsBatch( + userEmailDataMap: Map, + activityGraph: ActivityGraph, + start: Date, + end: Date, + concurrency: number, + ): Promise { + const transporter = this.emailService.createTransporter(); + const senderEmail = this.configService.get("app.senderEmail"); + const appUrl = this.configService.get("app.url"); + + const users = Array.from(userEmailDataMap.values()); + + // Process emails with controlled concurrency using a promise pool + await this.processWithConcurrency( + users, + concurrency, + async (userData: UserEmailData) => { + try { + const { user, metrics, collaborationUpdates, pendingActions } = + userData; + + const unsubscribeLink = `${appUrl}/api/user/unsubscribe-weekly-digest?userId=${user._id}`; + + const mailOptions = { + from: senderEmail, + to: user.email, + template: "weeklyDigestEmail", + subject: "Your Weekly Digest πŸ“Š", + headers: { + "List-Unsubscribe": `<${unsubscribeLink}>`, + "List-Unsubscribe-Post": "List-Unsubscribe=One-Click", + }, + context: { + userName: user.name || user.email, + dateRange: `${start.toDateString()} - ${end.toDateString()}`, + // Activity graph is shared (lightweight global data) + execution: { + total: activityGraph.totalExecutions, + percent: activityGraph.percentChange, + graph: activityGraph.graph, + }, + // Per-user metrics computed via batch aggregation + metrics: { + newWorkspaces: metrics.newWorkspaces, + newCollections: metrics.collectionsCount, + apisCreated: metrics.apisCount, + testflowsExecuted: metrics.testflowExecutions, + activeWorkspaces: metrics.activeWorkspaces, + }, + ctaLink: "https://sparrowapp.dev", + collaborationUpdates, + pendingActions, + unsubscribeLink, + }, + }; + + await this.emailService.sendEmail(transporter, mailOptions); + this.logger.log(`Weekly digest sent to ${user.email}`); + } catch (error) { + this.logger.error( + `Failed to send weekly digest to ${userData.user.email}: ${error.message}`, + ); + } + }, + ); + } + + /** + * Process items with controlled concurrency using a promise pool pattern. + * This ensures we don't overwhelm the email service with too many concurrent requests. + */ + private async processWithConcurrency( + items: T[], + concurrency: number, + processor: (item: T) => Promise, + ): Promise { + const queue = [...items]; + const executing: Promise[] = []; + + while (queue.length > 0 || executing.length > 0) { + // Fill up to concurrency limit + while (executing.length < concurrency && queue.length > 0) { + const item = queue.shift()!; + const promise = processor(item).then(() => { + executing.splice(executing.indexOf(promise), 1); + }); + executing.push(promise); + } + + // Wait for at least one to complete + if (executing.length > 0) { + await Promise.race(executing); + } + } + } + + private getLastWeekRange() { + const now = new Date(); + + // Start = last Monday + const start = new Date(now); + start.setDate(now.getDate() - now.getDay() - 6); + start.setHours(0, 0, 0, 0); + + // End = last Sunday + const end = new Date(now); + end.setDate(now.getDate() - now.getDay()); + end.setHours(23, 59, 59, 999); + + return { start, end }; + } + + private getPreviousWeekRange() { + const now = new Date(); + + const end = new Date(now); + end.setDate(now.getDate() - now.getDay() - 7); + end.setHours(23, 59, 59, 999); + + const start = new Date(end); + start.setDate(end.getDate() - 6); + start.setHours(0, 0, 0, 0); + + return { start, end }; + } + + async getExecutionTrend( + userId: string, + currentStart: Date, + currentEnd: Date, + previousStart: Date, + previousEnd: Date, + testflows: any[], + ) { + let currentCount = 0; + let previousCount = 0; + + const dailyExecutions = Array(7).fill(0); // Mon β†’ Sun + + for (const testflow of testflows) { + if (!testflow.schedules) continue; + + for (const schedule of testflow.schedules) { + if (!schedule.schedularRunHistory) continue; + + for (const run of schedule.schedularRunHistory) { + const runDate = new Date(run.createdAt); + + const count = (run.successRequests || 0) + (run.failedRequests || 0); + + // Current week + if (runDate >= currentStart && runDate <= currentEnd) { + currentCount += count; + + const dayIndex = (runDate.getDay() + 6) % 7; // convert Sun=0 β†’ Mon=0 + dailyExecutions[dayIndex] += count; + } + + // Previous week + if (runDate >= previousStart && runDate <= previousEnd) { + previousCount += count; + } + } + } + } + + const percentChange = + previousCount === 0 + ? currentCount > 0 + ? 100 + : 0 + : Math.round(((currentCount - previousCount) / previousCount) * 100); + + return { + totalExecutions: currentCount, + percentChange, + dailyExecutions, + }; + } + + private normalizeGraphData(data: number[]) { + const max = Math.max(...data, 1); + + return data.map((value) => { + if (max === 0) return 12; + + const height = Math.round((value / max) * 40); + return height < 10 ? 10 : height; + }); + } + + private formatWeeklyGraph(data: any[]) { + const result = Array(7).fill(0); + + data.forEach((item) => { + const mongoDay = item._id; + const index = (mongoDay + 5) % 7; + result[index] = item.count; + }); + + return result; + } +} diff --git a/src/modules/workspace/workspace.module.ts b/src/modules/workspace/workspace.module.ts index 10277409c..69c781444 100644 --- a/src/modules/workspace/workspace.module.ts +++ b/src/modules/workspace/workspace.module.ts @@ -58,6 +58,7 @@ import { TestflowService } from "./services/testflow.service"; import { TeamUserService } from "../identity/services/team-user.service"; import { SalesEmailService } from "./services/sales-email.service"; import { PricingService } from "./services/pricing.repository"; +import { WeeklyDigestService } from "./services/weekly-digest.service"; // ---- Gateway import { @@ -83,6 +84,7 @@ import { TestflowSchedulerService } from "./services/testflow-schedular.service" import { TestflowRunService } from "./services/testflow-run.service"; import { ScheduleModule } from "@nestjs/schedule"; import { TestflowDataSetService } from "./services/testflow-dataset.service"; +import { WeeklyDigestScheduler } from "./schedulers/weekly-digest.scheduler"; /** * Workspace Module provides all necessary services, handlers, repositories, @@ -147,6 +149,8 @@ import { TestflowDataSetService } from "./services/testflow-dataset.service"; PricingService, PricingRepository, AiConsumptionScheduler, + WeeklyDigestScheduler, + WeeklyDigestService, ], exports: [ CollectionService,