Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
4804e55
feat: create script to enqueue inactive organizations for deletion
wilsonrivera Dec 16, 2025
a0c978a
chore: update script and tests
wilsonrivera Dec 18, 2025
ea9ecda
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Dec 18, 2025
e82d01a
chore: linting and tests
wilsonrivera Dec 18, 2025
67c0a22
chore: remove non-null assertion
wilsonrivera Dec 18, 2025
9b98d21
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Dec 19, 2025
2394618
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Dec 22, 2025
df04df0
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 5, 2026
27dcdf9
chore: implement as organization cleanup as cron job
wilsonrivera Jan 7, 2026
d7cd912
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 7, 2026
b24bb91
chore: fix test
wilsonrivera Jan 7, 2026
e1c4068
Merge remote-tracking branch 'origin/wilson/eng-7753-delete-inactive-…
wilsonrivera Jan 7, 2026
8ac01c0
chore: fix commented `continue`
wilsonrivera Jan 7, 2026
4cf6c72
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 7, 2026
499ec46
chore: update schedule to run once a month
wilsonrivera Jan 7, 2026
d0b68fb
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 8, 2026
9911158
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 16, 2026
cee6641
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 20, 2026
70b005c
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 20, 2026
f709e35
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 23, 2026
0a2bfb8
Merge branch 'main' into wilson/eng-7753-delete-inactive-organizations
wilsonrivera Jan 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions controlplane/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
"@wundergraph/protographic": "workspace:*",
"axios": "^1.12.2",
"axios-retry": "^4.5.0",
"bullmq": "^5.10.0",
"bullmq": "5.66.4",
"cookie": "^0.7.2",
"date-fns": "^3.6.0",
"dotenv": "^16.4.5",
Expand All @@ -75,7 +75,7 @@
"fastify-plugin": "^4.5.1",
"fastify-raw-body": "^4.3.0",
"graphql": "^16.9.0",
"ioredis": "^5.4.1",
"ioredis": "5.8.2",
"isomorphic-dompurify": "^2.33.0",
"jose": "^5.2.4",
"lodash": "^4.17.21",
Expand Down
204 changes: 103 additions & 101 deletions controlplane/src/core/bufservices/organization/deleteOrganization.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,116 +25,118 @@ export function deleteOrganization(
const authContext = await opts.authenticator.authenticate(ctx.requestHeader);
logger = enrichLogger(ctx, logger, authContext);

const orgRepo = new OrganizationRepository(logger, opts.db, opts.billingDefaultPlanId);
const auditLogRepo = new AuditLogRepository(opts.db);
const billingRepo = new BillingRepository(opts.db);

const memberships = await orgRepo.memberships({ userId: authContext.userId });
const orgCount = memberships.length;

const org = await orgRepo.byId(authContext.organizationId);
if (!org) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: `Organization not found`,
},
};
}

const subscription = await billingRepo.getActiveSubscriptionOfOrganization(org.id);
if (subscription?.id) {
return {
response: {
code: EnumStatusCode.ERR,
details: 'The organization subscription must be cancelled before the organization is deleted.',
},
};
}

const user = await orgRepo.getOrganizationMember({
organizationID: authContext.organizationId,
userID: authContext.userId || req.userID,
});
return opts.db.transaction(async (tx) => {
const orgRepo = new OrganizationRepository(logger, tx, opts.billingDefaultPlanId);
const auditLogRepo = new AuditLogRepository(tx);
const billingRepo = new BillingRepository(tx);

const memberships = await orgRepo.memberships({ userId: authContext.userId });
const orgCount = memberships.length;

const org = await orgRepo.byId(authContext.organizationId);
if (!org) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: `Organization not found`,
},
};
}

const subscription = await billingRepo.getActiveSubscriptionOfOrganization(org.id);
if (subscription?.id) {
return {
response: {
code: EnumStatusCode.ERR,
details: 'The organization subscription must be cancelled before the organization is deleted.',
},
};
}

const user = await orgRepo.getOrganizationMember({
organizationID: authContext.organizationId,
userID: authContext.userId || req.userID,
});

if (!user) {
return {
response: {
code: EnumStatusCode.ERR,
details: 'User is not a part of this organization.',
},
};
}
if (!user) {
return {
response: {
code: EnumStatusCode.ERR,
details: 'User is not a part of this organization.',
},
};
}

// non admins cannot delete the organization
if (!user.rbac.isOrganizationAdmin) {
throw new UnauthorizedError();
}

// Minimum one organization is required for a user
if (orgCount <= 1) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: 'Minimum one organization is required for a user.',
},
};
}

// If the organization deletion have already been queued we shouldn't do it again
if (org.deletion) {
return {
response: {
code: EnumStatusCode.OK,
},
};
}

const organizationMembers = await orgRepo.getMembers({ organizationID: org.id });
const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin);

const now = new Date();
const oneMonthFromNow = addDays(now, delayForManualOrgDeletionInDays);

await orgRepo.queueOrganizationDeletion({
organizationId: org.id,
queuedBy: authContext.userDisplayName,
deleteOrganizationQueue: opts.queues.deleteOrganizationQueue,
});

// non admins cannot delete the organization
if (!user.rbac.isOrganizationAdmin) {
throw new UnauthorizedError();
}
await auditLogRepo.addAuditLog({
organizationId: org.id,
organizationSlug: authContext.organizationSlug,
auditAction: 'organization.deletion_queued',
action: 'queued_deletion',
actorId: authContext.userId,
auditableType: 'organization',
auditableDisplayName: org.name,
actorDisplayName: authContext.userDisplayName,
apiKeyName: authContext.apiKeyName,
actorType: authContext.auth === 'api_key' ? 'api_key' : 'user',
});

// Minimum one organization is required for a user
if (orgCount <= 1) {
return {
response: {
code: EnumStatusCode.ERR_NOT_FOUND,
details: 'Minimum one organization is required for a user.',
},
};
}
if (opts.mailerClient && orgAdmins.length > 0) {
const intl = Intl.DateTimeFormat(undefined, {
dateStyle: 'medium',
timeStyle: 'short',
});

await opts.mailerClient.sendOrganizationDeletionQueuedEmail({
receiverEmails: orgAdmins.map((m) => m.email),
organizationName: org.name,
userDisplayName: authContext.userDisplayName,
queuedOnDate: intl.format(now),
deletionDate: intl.format(oneMonthFromNow),
restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`,
});
}

// If the organization deletion have already been queued we shouldn't do it again
if (org.deletion) {
return {
response: {
code: EnumStatusCode.OK,
},
};
}

const organizationMembers = await orgRepo.getMembers({ organizationID: org.id });
const orgAdmins = organizationMembers.filter((m) => m.rbac.isOrganizationAdmin);

const now = new Date();
const oneMonthFromNow = addDays(now, delayForManualOrgDeletionInDays);

await orgRepo.queueOrganizationDeletion({
organizationId: org.id,
queuedBy: authContext.userDisplayName,
deleteOrganizationQueue: opts.queues.deleteOrganizationQueue,
});

await auditLogRepo.addAuditLog({
organizationId: org.id,
organizationSlug: authContext.organizationSlug,
auditAction: 'organization.deletion_queued',
action: 'queued_deletion',
actorId: authContext.userId,
auditableType: 'organization',
auditableDisplayName: org.name,
actorDisplayName: authContext.userDisplayName,
apiKeyName: authContext.apiKeyName,
actorType: authContext.auth === 'api_key' ? 'api_key' : 'user',
});

if (opts.mailerClient && orgAdmins.length > 0) {
const intl = Intl.DateTimeFormat(undefined, {
dateStyle: 'medium',
timeStyle: 'short',
});

await opts.mailerClient.sendOrganizationDeletionQueuedEmail({
receiverEmails: orgAdmins.map((m) => m.email),
organizationName: org.name,
userDisplayName: authContext.userDisplayName,
queuedOnDate: intl.format(now),
deletionDate: intl.format(oneMonthFromNow),
restoreLink: `${process.env.WEB_BASE_URL}/${org.slug}/settings`,
});
}

return {
response: {
code: EnumStatusCode.OK,
},
};
});
}
41 changes: 40 additions & 1 deletion controlplane/src/core/build-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import { App } from 'octokit';
import { Worker } from 'bullmq';
import routes from './routes.js';
import fastifyHealth from './plugins/health.js';
import fastifyMetrics, { MetricsPluginOptions } from './plugins/metrics.js';
import fastifyMetrics from './plugins/metrics.js';
import fastifyDatabase from './plugins/database.js';
import fastifyClickHouse from './plugins/clickhouse.js';
import fastifyRedis from './plugins/redis.js';
Expand Down Expand Up @@ -53,6 +53,14 @@ import {
createReactivateOrganizationWorker,
ReactivateOrganizationQueue,
} from './workers/ReactivateOrganizationWorker.js';
import {
QueueInactiveOrganizationsDeletionQueue,
createQueueInactiveOrganizationsDeletionWorker,
} from './workers/QueueInactiveOrganizationsDeletionWorker.js';
import {
createNotifyOrganizationDeletionQueuedWorker,
NotifyOrganizationDeletionQueuedQueue,
} from './workers/NotifyOrganizationDeletionQueuedWorker.js';

export interface BuildConfig {
logger: LoggerOptions;
Expand Down Expand Up @@ -394,6 +402,37 @@ export default async function build(opts: BuildConfig) {
}),
);

const notifyOrganizationDeletionQueuedQueue = new NotifyOrganizationDeletionQueuedQueue(
logger,
fastify.redisForQueue,
);
bullWorkers.push(
createNotifyOrganizationDeletionQueuedWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
logger,
mailer: mailerClient,
}),
);

const queueInactiveOrganizationsDeletionQueue = new QueueInactiveOrganizationsDeletionQueue(
logger,
fastify.redisForQueue,
);
bullWorkers.push(
createQueueInactiveOrganizationsDeletionWorker({
redisConnection: fastify.redisForWorker,
db: fastify.db,
realm: opts.keycloak.realm,
keycloak: keycloakClient,
deleteOrganizationQueue,
notifyOrganizationDeletionQueuedQueue,
logger,
}),
);

await queueInactiveOrganizationsDeletionQueue.scheduleJob();

// required to verify webhook payloads
await fastify.register(import('fastify-raw-body'), {
field: 'rawBody',
Expand Down
41 changes: 20 additions & 21 deletions controlplane/src/core/repositories/OrganizationRepository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -939,33 +939,32 @@ export class OrganizationRepository {
return result[0];
}

public queueOrganizationDeletion(input: {
public async queueOrganizationDeletion(input: {
organizationId: string;
queuedBy?: string;
deleteOrganizationQueue: DeleteOrganizationQueue;
deleteDelayInDays?: number;
}) {
return this.db.transaction(async (tx) => {
const now = new Date();
await tx
.update(schema.organizations)
.set({
queuedForDeletionAt: now,
queuedForDeletionBy: input.queuedBy,
})
.where(eq(schema.organizations.id, input.organizationId));
const now = new Date();
await this.db
.update(schema.organizations)
.set({
queuedForDeletionAt: now,
queuedForDeletionBy: input.queuedBy,
})
.where(eq(schema.organizations.id, input.organizationId));

const deleteAt = addDays(now, delayForManualOrgDeletionInDays);
const delay = Number(deleteAt) - Number(now);
const deleteAt = addDays(now, input.deleteDelayInDays || delayForManualOrgDeletionInDays);
const delay = Number(deleteAt) - Number(now);

return await input.deleteOrganizationQueue.addJob(
{
organizationId: input.organizationId,
},
{
delay,
},
);
});
return await input.deleteOrganizationQueue.addJob(
{
organizationId: input.organizationId,
},
{
delay,
},
);
}

public restoreOrganization(input: { organizationId: string; deleteOrganizationQueue: DeleteOrganizationQueue }) {
Expand Down
2 changes: 1 addition & 1 deletion controlplane/src/core/workers/CacheWarmerWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ export const createCacheWarmerWorker = (input: {
concurrency: 10,
});
worker.on('stalled', (job) => {
log.warn({ joinId: job }, `Job stalled`);
log.warn({ jobId: job }, `Job stalled`);
});
worker.on('error', (err) => {
log.error(err, 'Worker error');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ export const createDeactivateOrganizationWorker = (input: {
},
);
worker.on('stalled', (job) => {
log.warn({ joinId: job }, `Job stalled`);
log.warn({ jobId: job }, `Job stalled`);
});
worker.on('error', (err) => {
log.error(err, 'Worker error');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ export const createDeleteOrganizationAuditLogsWorker = (input: {
);

worker.on('stalled', (job) => {
log.warn({ joinId: job }, 'Job stalled');
log.warn({ jobId: job }, 'Job stalled');
});
worker.on('error', (err) => {
log.error(err, 'Worker error');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ export const createDeleteOrganizationWorker = (input: {
},
);
worker.on('stalled', (job) => {
log.warn({ joinId: job }, `Job stalled`);
log.warn({ jobId: job }, `Job stalled`);
});
worker.on('error', (err) => {
log.error(err, 'Worker error');
Expand Down
2 changes: 1 addition & 1 deletion controlplane/src/core/workers/DeleteUserQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ export const createDeleteUserWorker = (input: {
concurrency: 10,
});
worker.on('stalled', (job) => {
log.warn({ joinId: job }, `Job stalled`);
log.warn({ jobId: job }, `Job stalled`);
});
worker.on('error', (err) => {
log.error(err, 'Worker error');
Expand Down
Loading
Loading