diff --git a/packages/api-routes/src/index.ts b/packages/api-routes/src/index.ts index 76b1688..31c385c 100644 --- a/packages/api-routes/src/index.ts +++ b/packages/api-routes/src/index.ts @@ -42,7 +42,7 @@ export interface ApiRoutesOptions { skipAuth?: boolean /** Callback when a run is created (wire up job runner) */ - onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null) => void + onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null, kind?: string) => void /** Provider configuration summary for settings endpoint */ providerSummary?: ProviderSummaryEntry[] /** Callback when a provider config is updated via API */ diff --git a/packages/api-routes/src/runs.ts b/packages/api-routes/src/runs.ts index 5fe0e34..611d3a9 100644 --- a/packages/api-routes/src/runs.ts +++ b/packages/api-routes/src/runs.ts @@ -8,7 +8,7 @@ import { resolveProject, writeAuditLog } from './helpers.js' import { queueRunIfProjectIdle } from './run-queue.js' export interface RunRoutesOptions { - onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: LocationContext | null) => void + onRunCreated?: (runId: string, projectId: string, providers?: string[], location?: LocationContext | null, kind?: string) => void } export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) { @@ -21,7 +21,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) { if (!project) return const kind = request.body?.kind ?? 'answer-visibility' - if (kind !== 'answer-visibility') { + if (kind !== 'answer-visibility' && kind !== 'social-monitor') { const err = unsupportedKind(kind) return reply.status(err.statusCode).send(err.toJSON()) } @@ -92,7 +92,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) { }) const r = app.db.select().from(runs).where(eq(runs.id, runId)).get()! if (opts.onRunCreated) { - opts.onRunCreated(runId, project.id, providers, loc) + opts.onRunCreated(runId, project.id, providers, loc, kind) } results.push({ ...formatRun(r), location: loc.label }) } @@ -126,7 +126,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) { const run = app.db.select().from(runs).where(eq(runs.id, runId)).get()! if (opts.onRunCreated) { - opts.onRunCreated(runId, project.id, providers, resolvedLocation) + opts.onRunCreated(runId, project.id, providers, resolvedLocation, kind) } return reply.status(201).send(formatRun(run)) @@ -156,7 +156,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) { } const kind = request.body?.kind ?? 'answer-visibility' - if (kind !== 'answer-visibility') { + if (kind !== 'answer-visibility' && kind !== 'social-monitor') { const err = unsupportedKind(kind) return reply.status(err.statusCode).send(err.toJSON()) } @@ -200,7 +200,7 @@ export async function runRoutes(app: FastifyInstance, opts: RunRoutesOptions) { const run = app.db.select().from(runs).where(eq(runs.id, runId)).get()! if (opts.onRunCreated) { - opts.onRunCreated(runId, project.id, providers) + opts.onRunCreated(runId, project.id, providers, undefined, kind) } results.push({ ...formatRun(run), projectName: project.name }) diff --git a/packages/canonry/src/job-runner.ts b/packages/canonry/src/job-runner.ts index 0970ef7..8f53499 100644 --- a/packages/canonry/src/job-runner.ts +++ b/packages/canonry/src/job-runner.ts @@ -4,20 +4,23 @@ import path from 'node:path' import os from 'node:os' import { eq, inArray } from 'drizzle-orm' import type { DatabaseClient } from '@ainyc/canonry-db' -import { runs, keywords, competitors, projects, querySnapshots, usageCounters } from '@ainyc/canonry-db' -import type { ProviderName, NormalizedQueryResult, LocationContext } from '@ainyc/canonry-contracts' +import { runs, keywords, competitors, projects, querySnapshots, usageCounters, socialMentions } from '@ainyc/canonry-db' +import type { ProviderName, NormalizedQueryResult, LocationContext, SocialPlatformName } from '@ainyc/canonry-contracts' import { effectiveDomains, normalizeProjectDomain, isBrowserProvider } from '@ainyc/canonry-contracts' import type { ProviderRegistry, RegisteredProvider } from './provider-registry.js' +import type { SocialPlatformRegistry } from './social-registry.js' import { trackEvent } from './telemetry.js' export class JobRunner { private db: DatabaseClient private registry: ProviderRegistry + private socialRegistry: SocialPlatformRegistry | undefined onRunCompleted?: (runId: string, projectId: string) => Promise - constructor(db: DatabaseClient, registry: ProviderRegistry) { + constructor(db: DatabaseClient, registry: ProviderRegistry, socialRegistry?: SocialPlatformRegistry) { this.db = db this.registry = registry + this.socialRegistry = socialRegistry } recoverStaleRuns(): void { @@ -414,6 +417,184 @@ export class JobRunner { } } + async executeSocialMonitorRun(runId: string, projectId: string): Promise { + const now = new Date().toISOString() + const startTime = Date.now() + + try { + // Mark run as running + this.db + .update(runs) + .set({ status: 'running', startedAt: now }) + .where(eq(runs.id, runId)) + .run() + + // Fetch project + const project = this.db + .select() + .from(projects) + .where(eq(projects.id, projectId)) + .get() + + if (!project) { + throw new Error(`Project ${projectId} not found`) + } + + if (!this.socialRegistry || this.socialRegistry.size === 0) { + throw new Error('No social platform adapters configured.') + } + + // Resolve which platforms to use (all configured, since no per-project platform config yet) + const activePlatforms = this.socialRegistry.getAll() + + console.log(`[JobRunner] Social run ${runId}: dispatching to ${activePlatforms.length} platforms: ${activePlatforms.map(p => p.adapter.name).join(', ')}`) + + // Fetch keywords and canonical domains + const projectKeywords = this.db + .select() + .from(keywords) + .where(eq(keywords.projectId, projectId)) + .all() + + const allDomains = effectiveDomains({ + canonicalDomain: project.canonicalDomain, + ownedDomains: JSON.parse(project.ownedDomains || '[]') as string[], + }) + + const keywordTexts = projectKeywords.map(k => k.keyword) + + // Per-platform rate limiting windows + const minuteWindows = new Map() + for (const p of activePlatforms) { + minuteWindows.set(p.adapter.name, []) + } + + const platformErrors = new Map() + let totalMentionsInserted = 0 + + for (const registeredPlatform of activePlatforms) { + const { adapter, quotaPolicy } = registeredPlatform + const platformName = adapter.name + + try { + // Rate-limit per platform + await this.waitForRateLimit( + minuteWindows.get(platformName)!, + quotaPolicy.maxRequestsPerMinute, + ) + + const mentions = await adapter.searchMentions( + { keywords: keywordTexts, domains: allDomains }, + quotaPolicy, + ) + + console.log(`[JobRunner] ${platformName}: fetched ${mentions.length} mentions`) + + for (const mention of mentions) { + const mentionId = crypto.randomUUID() + const createdAt = new Date().toISOString() + + // UPSERT on (platform, external_id) — skip already-captured posts + this.db + .insert(socialMentions) + .values({ + id: mentionId, + projectId, + runId, + platform: mention.platform, + externalId: mention.externalId, + url: mention.url, + author: mention.author, + title: mention.title ?? null, + content: mention.content, + postedAt: mention.postedAt, + raw: mention.raw ? JSON.stringify(mention.raw) : null, + createdAt, + }) + .onConflictDoNothing() + .run() + + totalMentionsInserted++ + } + } catch (err: unknown) { + const msg = err instanceof Error ? err.message : String(err) + console.error(`[JobRunner] ${platformName}: social-monitor FAILED: ${msg}`) + platformErrors.set(platformName, msg) + } + } + + // Determine final run status + const allFailed = totalMentionsInserted === 0 && platformErrors.size > 0 + const someFailed = platformErrors.size > 0 + + if (allFailed) { + const errorDetail = JSON.stringify(Object.fromEntries(platformErrors)) + this.db + .update(runs) + .set({ status: 'failed', finishedAt: new Date().toISOString(), error: errorDetail }) + .where(eq(runs.id, runId)) + .run() + } else if (someFailed) { + const errorDetail = JSON.stringify(Object.fromEntries(platformErrors)) + this.db + .update(runs) + .set({ status: 'partial', finishedAt: new Date().toISOString(), error: errorDetail }) + .where(eq(runs.id, runId)) + .run() + } else { + this.db + .update(runs) + .set({ status: 'completed', finishedAt: new Date().toISOString() }) + .where(eq(runs.id, runId)) + .run() + } + + const finalStatus = allFailed ? 'failed' : someFailed ? 'partial' : 'completed' + trackEvent('run.completed', { + status: finalStatus, + kind: 'social-monitor', + platformCount: activePlatforms.length, + platforms: activePlatforms.map(p => p.adapter.name), + mentionsInserted: totalMentionsInserted, + durationMs: Date.now() - startTime, + }) + + this.incrementUsage(projectId, 'social-monitor-runs', 1) + + if (this.onRunCompleted) { + this.onRunCompleted(runId, projectId).catch((err: unknown) => { + console.error('[JobRunner] Notification callback failed:', err) + }) + } + } catch (err: unknown) { + const errorMessage = err instanceof Error ? err.message : String(err) + this.db + .update(runs) + .set({ + status: 'failed', + finishedAt: new Date().toISOString(), + error: errorMessage, + }) + .where(eq(runs.id, runId)) + .run() + + trackEvent('run.completed', { + status: 'failed', + kind: 'social-monitor', + platformCount: 0, + platforms: [], + mentionsInserted: 0, + durationMs: Date.now() - startTime, + }) + + if (this.onRunCompleted) { + this.onRunCompleted(runId, projectId).catch((notifErr: unknown) => { + console.error('[JobRunner] Notification callback failed:', notifErr) + }) + } + } + } + private async waitForRateLimit(window: number[], maxPerMinute: number): Promise { const now = Date.now() const windowStart = now - 60_000 diff --git a/packages/canonry/src/server.ts b/packages/canonry/src/server.ts index dd47bda..80deb49 100644 --- a/packages/canonry/src/server.ts +++ b/packages/canonry/src/server.ts @@ -32,6 +32,7 @@ import { JobRunner } from './job-runner.js' import { executeGscSync } from './gsc-sync.js' import { executeInspectSitemap } from './gsc-inspect-sitemap.js' import { ProviderRegistry } from './provider-registry.js' +import { SocialPlatformRegistry } from './social-registry.js' import { Scheduler } from './scheduler.js' import { Notifier } from './notifier.js' import { fetchSiteText } from './site-fetch.js' @@ -147,7 +148,12 @@ export async function createServer(opts: { const port = opts.config.port ?? 4100 const serverUrl = `http://localhost:${port}` - const jobRunner = new JobRunner(opts.db, registry) + // Build social platform registry from config + const socialRegistry = new SocialPlatformRegistry() + // Social platform adapters are registered here as they become available. + // e.g. if (opts.config.social?.reddit?.clientId) { socialRegistry.register(redditAdapter, ...) } + + const jobRunner = new JobRunner(opts.db, registry, socialRegistry) jobRunner.recoverStaleRuns() const notifier = new Notifier(opts.db, serverUrl) jobRunner.onRunCompleted = (runId, projectId) => notifier.onRunCompleted(runId, projectId) @@ -325,11 +331,17 @@ export async function createServer(opts: { googleSettingsSummary, bingSettingsSummary, bingConnectionStore, - onRunCreated: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null) => { + onRunCreated: (runId: string, projectId: string, providers?: string[], location?: import('@ainyc/canonry-contracts').LocationContext | null, kind?: string) => { // Fire and forget — run executes in background - jobRunner.executeRun(runId, projectId, providers as ProviderName[] | undefined, location).catch((err: unknown) => { - app.log.error({ runId, err }, 'Job runner failed') - }) + if (kind === 'social-monitor') { + jobRunner.executeSocialMonitorRun(runId, projectId).catch((err: unknown) => { + app.log.error({ runId, err }, 'Social monitor job runner failed') + }) + } else { + jobRunner.executeRun(runId, projectId, providers as ProviderName[] | undefined, location).catch((err: unknown) => { + app.log.error({ runId, err }, 'Job runner failed') + }) + } }, onProviderUpdate: (providerName: string, apiKey: string, model?: string, baseUrl?: string, incomingQuota?: Partial) => { const name = providerName as keyof typeof adapterMap diff --git a/packages/canonry/src/social-registry.ts b/packages/canonry/src/social-registry.ts new file mode 100644 index 0000000..e493190 --- /dev/null +++ b/packages/canonry/src/social-registry.ts @@ -0,0 +1,46 @@ +import type { SocialPlatformAdapter, SocialPlatformName, SocialQuotaPolicy } from '@ainyc/canonry-contracts' + +export interface RegisteredSocialPlatform { + adapter: SocialPlatformAdapter + quotaPolicy: SocialQuotaPolicy +} + +/** + * Registry of configured social platform adapters. + * Mirrors the ProviderRegistry pattern used for AI providers. + */ +export class SocialPlatformRegistry { + private platforms = new Map() + + register(adapter: SocialPlatformAdapter, quotaPolicy: SocialQuotaPolicy): void { + this.platforms.set(adapter.name, { adapter, quotaPolicy }) + } + + get(name: SocialPlatformName): RegisteredSocialPlatform | undefined { + return this.platforms.get(name) + } + + getAll(): RegisteredSocialPlatform[] { + return [...this.platforms.values()] + } + + getForProject(platformNames: SocialPlatformName[]): RegisteredSocialPlatform[] { + // Empty array means "use all configured platforms" + if (platformNames.length === 0) { + return this.getAll() + } + const result: RegisteredSocialPlatform[] = [] + const seen = new Set() + for (const name of platformNames) { + if (seen.has(name)) continue + seen.add(name) + const platform = this.platforms.get(name) + if (platform) result.push(platform) + } + return result + } + + get size(): number { + return this.platforms.size + } +} diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index abfccca..7f67b2e 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -10,3 +10,4 @@ export * from './run.js' export * from './schedule.js' export * from './analytics.js' export * from './source-categories.js' +export * from './social.js' diff --git a/packages/contracts/src/run.ts b/packages/contracts/src/run.ts index 379450b..b97fa1a 100644 --- a/packages/contracts/src/run.ts +++ b/packages/contracts/src/run.ts @@ -4,7 +4,7 @@ import { providerNameSchema } from './provider.js' export const runStatusSchema = z.enum(['queued', 'running', 'completed', 'partial', 'failed']) export type RunStatus = z.infer -export const runKindSchema = z.enum(['answer-visibility', 'site-audit', 'gsc-sync', 'inspect-sitemap']) +export const runKindSchema = z.enum(['answer-visibility', 'site-audit', 'gsc-sync', 'inspect-sitemap', 'social-monitor']) export type RunKind = z.infer export const runTriggerSchema = z.enum(['manual', 'scheduled', 'config-apply']) diff --git a/packages/contracts/src/social.ts b/packages/contracts/src/social.ts new file mode 100644 index 0000000..90ef178 --- /dev/null +++ b/packages/contracts/src/social.ts @@ -0,0 +1,61 @@ +import { z } from 'zod' + +/** Known social platform identifiers */ +export const SOCIAL_PLATFORM_NAMES = ['reddit'] as const +export const socialPlatformNameSchema = z.enum(SOCIAL_PLATFORM_NAMES) +export type SocialPlatformName = z.infer + +export const socialQuotaPolicySchema = z.object({ + maxRequestsPerMinute: z.number().int().positive(), + maxRequestsPerDay: z.number().int().positive(), +}) +export type SocialQuotaPolicy = z.infer + +/** A single social mention returned by an adapter */ +export interface SocialMention { + /** Platform that produced this mention */ + platform: SocialPlatformName + /** Unique identifier on the source platform (used for deduplication) */ + externalId: string + /** URL to the original post / comment */ + url: string + /** Author handle or username */ + author: string + /** Post or comment body */ + content: string + /** Post title (for Reddit posts; empty for comments) */ + title?: string + /** ISO-8601 timestamp of the original post */ + postedAt: string + /** Raw JSON from the platform API (for debugging / future enrichment) */ + raw?: Record +} + +/** Input passed to every platform adapter */ +export interface SocialSearchInput { + /** Project keywords to search for */ + keywords: string[] + /** Canonical and owned domains of the project */ + domains: string[] +} + +export interface SocialPlatformAdapter { + name: SocialPlatformName + searchMentions(input: SocialSearchInput, quotaPolicy: SocialQuotaPolicy): Promise +} + +/** DTO returned by the API for a social mention */ +export const socialMentionDtoSchema = z.object({ + id: z.string(), + projectId: z.string(), + runId: z.string(), + platform: socialPlatformNameSchema, + externalId: z.string(), + url: z.string(), + author: z.string(), + title: z.string().nullable().optional(), + content: z.string(), + postedAt: z.string(), + createdAt: z.string(), +}) +export type SocialMentionDto = z.infer diff --git a/packages/db/src/schema.ts b/packages/db/src/schema.ts index c73819e..9e1be45 100644 --- a/packages/db/src/schema.ts +++ b/packages/db/src/schema.ts @@ -243,6 +243,25 @@ export const bingKeywordStats = sqliteTable('bing_keyword_stats', { index('idx_bing_keyword_query').on(table.query), ]) +export const socialMentions = sqliteTable('social_mentions', { + id: text('id').primaryKey(), + projectId: text('project_id').notNull().references(() => projects.id, { onDelete: 'cascade' }), + runId: text('run_id').notNull().references(() => runs.id, { onDelete: 'cascade' }), + platform: text('platform').notNull(), + externalId: text('external_id').notNull(), + url: text('url').notNull(), + author: text('author').notNull(), + title: text('title'), + content: text('content').notNull(), + postedAt: text('posted_at').notNull(), + raw: text('raw'), + createdAt: text('created_at').notNull(), +}, (table) => [ + uniqueIndex('idx_social_mentions_platform_external').on(table.platform, table.externalId), + index('idx_social_mentions_project').on(table.projectId), + index('idx_social_mentions_run').on(table.runId), +]) + export const usageCounters = sqliteTable('usage_counters', { id: text('id').primaryKey(), scope: text('scope').notNull(),