Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion packages/api-routes/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ export interface ApiRoutesOptions {
skipAuth?: boolean

/** Callback when a run is created (wire up job runner) */
onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null) => void
onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null, kind?: string) => void
/** Provider configuration summary for settings endpoint */
providerSummary?: ProviderSummaryEntry[]
/** Callback when a provider config is updated via API */
Expand Down
12 changes: 6 additions & 6 deletions packages/api-routes/src/runs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { resolveProject, writeAuditLog } from './helpers.js'
import { queueRunIfProjectIdle } from './run-queue.js'

export interface RunRoutesOptions {
onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: LocationContext | null) => void
onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: LocationContext | null, kind?: string) => void
}

export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) {
Expand All @@ -21,7 +21,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) {
if (!project) return

const kind = request.body?.kind ?? 'answer-visibility'
if (kind !== 'answer-visibility') {
if (kind !== 'answer-visibility' && kind !== 'social-monitor') {
const err = unsupportedKind(kind)
return reply.status(err.statusCode).send(err.toJSON())
}
Expand Down Expand Up @@ -92,7 +92,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) {
})
const r = app.db.select().from(runs).where(eq(runs.id, runId)).get()!
if (opts.onRunCreated) {
opts.onRunCreated(runId, project.id, providers, loc)
opts.onRunCreated(runId, project.id, providers, loc, kind)
}
results.push({ ...formatRun(r), location: loc.label })
}
Expand Down Expand Up @@ -126,7 +126,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) {
const run = app.db.select().from(runs).where(eq(runs.id, runId)).get()!

if (opts.onRunCreated) {
opts.onRunCreated(runId, project.id, providers, resolvedLocation)
opts.onRunCreated(runId, project.id, providers, resolvedLocation, kind)
}

return reply.status(201).send(formatRun(run))
Expand Down Expand Up @@ -156,7 +156,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) {
}

const kind = request.body?.kind ?? 'answer-visibility'
if (kind !== 'answer-visibility') {
if (kind !== 'answer-visibility' && kind !== 'social-monitor') {
const err = unsupportedKind(kind)
return reply.status(err.statusCode).send(err.toJSON())
}
Expand Down Expand Up @@ -200,7 +200,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) {

const run = app.db.select().from(runs).where(eq(runs.id, runId)).get()!
if (opts.onRunCreated) {
opts.onRunCreated(runId, project.id, providers)
opts.onRunCreated(runId, project.id, providers, undefined, kind)
}

results.push({ ...formatRun(run), projectName: project.name })
Expand Down
187 changes: 184 additions & 3 deletions packages/canonry/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@ import path from 'node:path'
import os from 'node:os'
import { eq, inArray } from 'drizzle-orm'
import type { DatabaseClient } from '@ainyc/canonry-db'
import { runs, keywords, competitors, projects, querySnapshots, usageCounters } from '@ainyc/canonry-db'
import type { ProviderName, NormalizedQueryResult, LocationContext } from '@ainyc/canonry-contracts'
import { runs, keywords, competitors, projects, querySnapshots, usageCounters, socialMentions } from '@ainyc/canonry-db'
import type { ProviderName, NormalizedQueryResult, LocationContext, SocialPlatformName } from '@ainyc/canonry-contracts'
import { effectiveDomains, normalizeProjectDomain, isBrowserProvider } from '@ainyc/canonry-contracts'
import type { ProviderRegistry, RegisteredProvider } from './provider-registry.js'
import type { SocialPlatformRegistry } from './social-registry.js'
import { trackEvent } from './telemetry.js'

export class JobRunner {
private db: DatabaseClient
private registry: ProviderRegistry
private socialRegistry: SocialPlatformRegistry | undefined
onRunCompleted?: (runId: string, projectId: string) => Promise<void>

constructor(db: DatabaseClient, registry: ProviderRegistry) {
constructor(db: DatabaseClient, registry: ProviderRegistry, socialRegistry?: SocialPlatformRegistry) {
this.db = db
this.registry = registry
this.socialRegistry = socialRegistry
}

recoverStaleRuns(): void {
Expand Down Expand Up @@ -414,6 +417,184 @@ export class JobRunner {
}
}

async executeSocialMonitorRun(runId: string, projectId: string): Promise<void> {
const now = new Date().toISOString()
const startTime = Date.now()

try {
// Mark run as running
this.db
.update(runs)
.set({ status: 'running', startedAt: now })
.where(eq(runs.id, runId))
.run()

// Fetch project
const project = this.db
.select()
.from(projects)
.where(eq(projects.id, projectId))
.get()

if (!project) {
throw new Error(`Project ${projectId} not found`)
}

if (!this.socialRegistry || this.socialRegistry.size === 0) {
throw new Error('No social platform adapters configured.')
}

// Resolve which platforms to use (all configured, since no per-project platform config yet)
const activePlatforms = this.socialRegistry.getAll()

console.log(`[JobRunner] Social run ${runId}: dispatching to ${activePlatforms.length} platforms: ${activePlatforms.map(p => p.adapter.name).join(', ')}`)

// Fetch keywords and canonical domains
const projectKeywords = this.db
.select()
.from(keywords)
.where(eq(keywords.projectId, projectId))
.all()

const allDomains = effectiveDomains({
canonicalDomain: project.canonicalDomain,
ownedDomains: JSON.parse(project.ownedDomains || '[]') as string[],
})

const keywordTexts = projectKeywords.map(k => k.keyword)

// Per-platform rate limiting windows
const minuteWindows = new Map<SocialPlatformName, number[]>()
for (const p of activePlatforms) {
minuteWindows.set(p.adapter.name, [])
}

const platformErrors = new Map<SocialPlatformName, string>()
let totalMentionsInserted = 0

for (const registeredPlatform of activePlatforms) {
const { adapter, quotaPolicy } = registeredPlatform
const platformName = adapter.name

try {
// Rate-limit per platform
await this.waitForRateLimit(
minuteWindows.get(platformName)!,
quotaPolicy.maxRequestsPerMinute,
)

const mentions = await adapter.searchMentions(
{ keywords: keywordTexts, domains: allDomains },
quotaPolicy,
)

console.log(`[JobRunner] ${platformName}: fetched ${mentions.length} mentions`)

for (const mention of mentions) {
const mentionId = crypto.randomUUID()
const createdAt = new Date().toISOString()

// UPSERT on (platform, external_id) — skip already-captured posts
this.db
.insert(socialMentions)
.values({
id: mentionId,
projectId,
runId,
platform: mention.platform,
externalId: mention.externalId,
url: mention.url,
author: mention.author,
title: mention.title ?? null,
content: mention.content,
postedAt: mention.postedAt,
raw: mention.raw ? JSON.stringify(mention.raw) : null,
createdAt,
})
.onConflictDoNothing()
.run()

totalMentionsInserted++
}
} catch (err: unknown) {
const msg = err instanceof Error ? err.message : String(err)
console.error(`[JobRunner] ${platformName}: social-monitor FAILED: ${msg}`)
platformErrors.set(platformName, msg)
}
}

// Determine final run status
const allFailed = totalMentionsInserted === 0 && platformErrors.size > 0
const someFailed = platformErrors.size > 0

if (allFailed) {
const errorDetail = JSON.stringify(Object.fromEntries(platformErrors))
this.db
.update(runs)
.set({ status: 'failed', finishedAt: new Date().toISOString(), error: errorDetail })
.where(eq(runs.id, runId))
.run()
} else if (someFailed) {
const errorDetail = JSON.stringify(Object.fromEntries(platformErrors))
this.db
.update(runs)
.set({ status: 'partial', finishedAt: new Date().toISOString(), error: errorDetail })
.where(eq(runs.id, runId))
.run()
} else {
this.db
.update(runs)
.set({ status: 'completed', finishedAt: new Date().toISOString() })
.where(eq(runs.id, runId))
.run()
}

const finalStatus = allFailed ? 'failed' : someFailed ? 'partial' : 'completed'
trackEvent('run.completed', {
status: finalStatus,
kind: 'social-monitor',
platformCount: activePlatforms.length,
platforms: activePlatforms.map(p => p.adapter.name),
mentionsInserted: totalMentionsInserted,
durationMs: Date.now() - startTime,
})

this.incrementUsage(projectId, 'social-monitor-runs', 1)

if (this.onRunCompleted) {
this.onRunCompleted(runId, projectId).catch((err: unknown) => {
console.error('[JobRunner] Notification callback failed:', err)
})
}
} catch (err: unknown) {
const errorMessage = err instanceof Error ? err.message : String(err)
this.db
.update(runs)
.set({
status: 'failed',
finishedAt: new Date().toISOString(),
error: errorMessage,
})
.where(eq(runs.id, runId))
.run()

trackEvent('run.completed', {
status: 'failed',
kind: 'social-monitor',
platformCount: 0,
platforms: [],
mentionsInserted: 0,
durationMs: Date.now() - startTime,
})

if (this.onRunCompleted) {
this.onRunCompleted(runId, projectId).catch((notifErr: unknown) => {
console.error('[JobRunner] Notification callback failed:', notifErr)
})
}
}
}

private async waitForRateLimit(window: number[], maxPerMinute: number): Promise<void> {
const now = Date.now()
const windowStart = now - 60_000
Expand Down
22 changes: 17 additions & 5 deletions packages/canonry/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import { JobRunner } from './job-runner.js'
import { executeGscSync } from './gsc-sync.js'
import { executeInspectSitemap } from './gsc-inspect-sitemap.js'
import { ProviderRegistry } from './provider-registry.js'
import { SocialPlatformRegistry } from './social-registry.js'
import { Scheduler } from './scheduler.js'
import { Notifier } from './notifier.js'
import { fetchSiteText } from './site-fetch.js'
Expand Down Expand Up @@ -147,7 +148,12 @@ export async function createServer(opts: {
const port = opts.config.port ?? 4100
const serverUrl = `http://localhost:${port}`

const jobRunner = new JobRunner(opts.db, registry)
// Build social platform registry from config
const socialRegistry = new SocialPlatformRegistry()
// Social platform adapters are registered here as they become available.
// e.g. if (opts.config.social?.reddit?.clientId) { socialRegistry.register(redditAdapter, ...) }

const jobRunner = new JobRunner(opts.db, registry, socialRegistry)
jobRunner.recoverStaleRuns()
const notifier = new Notifier(opts.db, serverUrl)
jobRunner.onRunCompleted = (runId, projectId) => notifier.onRunCompleted(runId, projectId)
Expand Down Expand Up @@ -325,11 +331,17 @@ export async function createServer(opts: {
googleSettingsSummary,
bingSettingsSummary,
bingConnectionStore,
onRunCreated: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null) => {
onRunCreated: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null, kind?: string) => {
// Fire and forget — run executes in background
jobRunner.executeRun(runId, projectId, providers as ProviderName[] | undefined, location).catch((err: unknown) => {
app.log.error({ runId, err }, 'Job runner failed')
})
if (kind === 'social-monitor') {
jobRunner.executeSocialMonitorRun(runId, projectId).catch((err: unknown) => {
app.log.error({ runId, err }, 'Social monitor job runner failed')
})
} else {
jobRunner.executeRun(runId, projectId, providers as ProviderName[] | undefined, location).catch((err: unknown) => {
app.log.error({ runId, err }, 'Job runner failed')
})
}
},
onProviderUpdate: (providerName: string, apiKey: string, model?: string, baseUrl?: string, incomingQuota?: Partial<import('@ainyc/canonry-contracts').ProviderQuotaPolicy>) => {
const name = providerName as keyof typeof adapterMap
Expand Down
46 changes: 46 additions & 0 deletions packages/canonry/src/social-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import type { SocialPlatformAdapter, SocialPlatformName, SocialQuotaPolicy } from '@ainyc/canonry-contracts'

export interface RegisteredSocialPlatform {
adapter: SocialPlatformAdapter
quotaPolicy: SocialQuotaPolicy
}

/**
* Registry of configured social platform adapters.
* Mirrors the ProviderRegistry pattern used for AI providers.
*/
export class SocialPlatformRegistry {
private platforms = new Map<SocialPlatformName, RegisteredSocialPlatform>()

register(adapter: SocialPlatformAdapter, quotaPolicy: SocialQuotaPolicy): void {
this.platforms.set(adapter.name, { adapter, quotaPolicy })
}

get(name: SocialPlatformName): RegisteredSocialPlatform | undefined {
return this.platforms.get(name)
}

getAll(): RegisteredSocialPlatform[] {
return [...this.platforms.values()]
}

getForProject(platformNames: SocialPlatformName[]): RegisteredSocialPlatform[] {
// Empty array means "use all configured platforms"
if (platformNames.length === 0) {
return this.getAll()
}
const result: RegisteredSocialPlatform[] = []
const seen = new Set<SocialPlatformName>()
for (const name of platformNames) {
if (seen.has(name)) continue
seen.add(name)
const platform = this.platforms.get(name)
if (platform) result.push(platform)
}
return result
}

get size(): number {
return this.platforms.size
}
}
1 change: 1 addition & 0 deletions packages/contracts/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ export * from './run.js'
export * from './schedule.js'
export * from './analytics.js'
export * from './source-categories.js'
export * from './social.js'
2 changes: 1 addition & 1 deletion packages/contracts/src/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { providerNameSchema } from './provider.js'
export const runStatusSchema = z.enum(['queued', 'running', 'completed', 'partial', 'failed'])
export type RunStatus = z.infer<typeof runStatusSchema>

export const runKindSchema = z.enum(['answer-visibility', 'site-audit', 'gsc-sync', 'inspect-sitemap'])
export const runKindSchema = z.enum(['answer-visibility', 'site-audit', 'gsc-sync', 'inspect-sitemap', 'social-monitor'])
export type RunKind = z.infer<typeof runKindSchema>

export const runTriggerSchema = z.enum(['manual', 'scheduled', 'config-apply'])
Expand Down
Loading
Loading