From a421e99f9c0694f391d01f51475fe2cc77415705 Mon Sep 17 00:00:00 2001 From: Shuhui Luo <107524008+shuhuiluo@users.noreply.github.com> Date: Thu, 4 Dec 2025 00:08:22 -0500 Subject: [PATCH] feat: add thread-based event grouping for PRs and issues MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Group PR and issue events into threads: - PR/issue opened creates anchor thread - Follow-up events (comments, reviews, closed) reply to thread - 30-day thread expiration with daily cleanup - Filter expired threads in lookups Changes: - Add event_threads table for thread mappings - Add ThreadService for storage/lookup - Update EventProcessor with threading context - Include PR review comments in threading 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- drizzle/0006_certain_micromacro.sql | 16 + drizzle/meta/0006_snapshot.json | 963 +++++++++++++++++++++++++++ drizzle/meta/_journal.json | 7 + src/db/schema.ts | 41 ++ src/github-app/event-processor.ts | 128 +++- src/index.ts | 15 +- src/services/subscription-service.ts | 3 + src/services/thread-service.ts | 142 ++++ 8 files changed, 1289 insertions(+), 26 deletions(-) create mode 100644 drizzle/0006_certain_micromacro.sql create mode 100644 drizzle/meta/0006_snapshot.json create mode 100644 src/services/thread-service.ts diff --git a/drizzle/0006_certain_micromacro.sql b/drizzle/0006_certain_micromacro.sql new file mode 100644 index 0000000..c4cc426 --- /dev/null +++ b/drizzle/0006_certain_micromacro.sql @@ -0,0 +1,16 @@ +CREATE TABLE "event_threads" ( + "id" serial PRIMARY KEY NOT NULL, + "space_id" text NOT NULL, + "channel_id" text NOT NULL, + "repo_full_name" text NOT NULL, + "anchor_type" text NOT NULL, + "anchor_number" integer NOT NULL, + "thread_event_id" text NOT NULL, + "created_at" timestamp with time zone NOT NULL, + "expires_at" timestamp with time zone NOT NULL, + CONSTRAINT "anchor_type_check" CHECK ("event_threads"."anchor_type" IN ('pr', 'issue')) +); +--> statement-breakpoint +CREATE UNIQUE INDEX "event_threads_unique_idx" ON "event_threads" USING btree ("space_id","channel_id","repo_full_name","anchor_type","anchor_number");--> statement-breakpoint +CREATE INDEX "idx_event_threads_repo_anchor" ON "event_threads" USING btree ("repo_full_name","anchor_type","anchor_number");--> statement-breakpoint +CREATE INDEX "idx_event_threads_expires" ON "event_threads" USING btree ("expires_at"); \ No newline at end of file diff --git a/drizzle/meta/0006_snapshot.json b/drizzle/meta/0006_snapshot.json new file mode 100644 index 0000000..fe3324e --- /dev/null +++ b/drizzle/meta/0006_snapshot.json @@ -0,0 +1,963 @@ +{ + "id": "64e2be0e-e932-428c-a783-360d02b82ff1", + "prevId": "7e8206de-d629-4058-9115-5ff2bb607d75", + "version": "7", + "dialect": "postgresql", + "tables": { + "public.event_threads": { + "name": "event_threads", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": true, + "notNull": true + }, + "space_id": { + "name": "space_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "channel_id": { + "name": "channel_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "repo_full_name": { + "name": "repo_full_name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "anchor_type": { + "name": "anchor_type", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "anchor_number": { + "name": "anchor_number", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "thread_event_id": { + "name": "thread_event_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "event_threads_unique_idx": { + "name": "event_threads_unique_idx", + "columns": [ + { + "expression": "space_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "anchor_type", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "anchor_number", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_event_threads_repo_anchor": { + "name": "idx_event_threads_repo_anchor", + "columns": [ + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "anchor_type", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "anchor_number", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_event_threads_expires": { + "name": "idx_event_threads_expires", + "columns": [ + { + "expression": "expires_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "anchor_type_check": { + "name": "anchor_type_check", + "value": "\"event_threads\".\"anchor_type\" IN ('pr', 'issue')" + } + }, + "isRLSEnabled": false + }, + "public.github_installations": { + "name": "github_installations", + "schema": "", + "columns": { + "installation_id": { + "name": "installation_id", + "type": "integer", + "primaryKey": true, + "notNull": true + }, + "account_login": { + "name": "account_login", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "account_type": { + "name": "account_type", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "installed_at": { + "name": "installed_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "suspended_at": { + "name": "suspended_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "app_slug": { + "name": "app_slug", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'towns-github-bot'" + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "account_type_check": { + "name": "account_type_check", + "value": "\"github_installations\".\"account_type\" IN ('Organization', 'User')" + } + }, + "isRLSEnabled": false + }, + "public.github_subscriptions": { + "name": "github_subscriptions", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": true, + "notNull": true + }, + "space_id": { + "name": "space_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "channel_id": { + "name": "channel_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "repo_full_name": { + "name": "repo_full_name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "delivery_mode": { + "name": "delivery_mode", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "is_private": { + "name": "is_private", + "type": "boolean", + "primaryKey": false, + "notNull": true + }, + "created_by_towns_user_id": { + "name": "created_by_towns_user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_by_github_login": { + "name": "created_by_github_login", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "installation_id": { + "name": "installation_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "enabled": { + "name": "enabled", + "type": "boolean", + "primaryKey": false, + "notNull": true, + "default": true + }, + "event_types": { + "name": "event_types", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'pr,issues,commits,releases'" + }, + "branch_filter": { + "name": "branch_filter", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "github_subscriptions_unique_idx": { + "name": "github_subscriptions_unique_idx", + "columns": [ + { + "expression": "space_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_github_subscriptions_channel": { + "name": "idx_github_subscriptions_channel", + "columns": [ + { + "expression": "channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_github_subscriptions_repo": { + "name": "idx_github_subscriptions_repo", + "columns": [ + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "github_subscriptions_created_by_towns_user_id_github_user_tokens_towns_user_id_fk": { + "name": "github_subscriptions_created_by_towns_user_id_github_user_tokens_towns_user_id_fk", + "tableFrom": "github_subscriptions", + "tableTo": "github_user_tokens", + "columnsFrom": [ + "created_by_towns_user_id" + ], + "columnsTo": [ + "towns_user_id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + }, + "github_subscriptions_installation_id_github_installations_installation_id_fk": { + "name": "github_subscriptions_installation_id_github_installations_installation_id_fk", + "tableFrom": "github_subscriptions", + "tableTo": "github_installations", + "columnsFrom": [ + "installation_id" + ], + "columnsTo": [ + "installation_id" + ], + "onDelete": "set null", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "delivery_mode_check": { + "name": "delivery_mode_check", + "value": "\"github_subscriptions\".\"delivery_mode\" IN ('webhook', 'polling')" + } + }, + "isRLSEnabled": false + }, + "public.github_user_tokens": { + "name": "github_user_tokens", + "schema": "", + "columns": { + "towns_user_id": { + "name": "towns_user_id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "github_user_id": { + "name": "github_user_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "github_login": { + "name": "github_login", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "access_token": { + "name": "access_token", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "token_type": { + "name": "token_type", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "refresh_token": { + "name": "refresh_token", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "refresh_token_expires_at": { + "name": "refresh_token_expires_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "github_user_tokens_github_user_id_unique": { + "name": "github_user_tokens_github_user_id_unique", + "columns": [ + { + "expression": "github_user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.installation_repositories": { + "name": "installation_repositories", + "schema": "", + "columns": { + "installation_id": { + "name": "installation_id", + "type": "integer", + "primaryKey": false, + "notNull": true + }, + "repo_full_name": { + "name": "repo_full_name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "added_at": { + "name": "added_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_installation_repos_by_name": { + "name": "idx_installation_repos_by_name", + "columns": [ + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_installation_repos_by_install": { + "name": "idx_installation_repos_by_install", + "columns": [ + { + "expression": "installation_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "installation_repositories_installation_id_github_installations_installation_id_fk": { + "name": "installation_repositories_installation_id_github_installations_installation_id_fk", + "tableFrom": "installation_repositories", + "tableTo": "github_installations", + "columnsFrom": [ + "installation_id" + ], + "columnsTo": [ + "installation_id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": { + "installation_repositories_installation_id_repo_full_name_pk": { + "name": "installation_repositories_installation_id_repo_full_name_pk", + "columns": [ + "installation_id", + "repo_full_name" + ] + } + }, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.oauth_states": { + "name": "oauth_states", + "schema": "", + "columns": { + "state": { + "name": "state", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "towns_user_id": { + "name": "towns_user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "channel_id": { + "name": "channel_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "space_id": { + "name": "space_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "redirect_action": { + "name": "redirect_action", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "redirect_data": { + "name": "redirect_data", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_oauth_states_expires": { + "name": "idx_oauth_states_expires", + "columns": [ + { + "expression": "expires_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_oauth_states_towns_user_id": { + "name": "idx_oauth_states_towns_user_id", + "columns": [ + { + "expression": "towns_user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.pending_subscriptions": { + "name": "pending_subscriptions", + "schema": "", + "columns": { + "id": { + "name": "id", + "type": "serial", + "primaryKey": true, + "notNull": true + }, + "towns_user_id": { + "name": "towns_user_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "space_id": { + "name": "space_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "channel_id": { + "name": "channel_id", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "repo_full_name": { + "name": "repo_full_name", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "event_types": { + "name": "event_types", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "created_at": { + "name": "created_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "expires_at": { + "name": "expires_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": { + "idx_pending_subscriptions_expires": { + "name": "idx_pending_subscriptions_expires", + "columns": [ + { + "expression": "expires_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_pending_subscriptions_user": { + "name": "idx_pending_subscriptions_user", + "columns": [ + { + "expression": "towns_user_id", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "idx_pending_subscriptions_repo": { + "name": "idx_pending_subscriptions_repo", + "columns": [ + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + }, + "pending_subscriptions_unique_idx": { + "name": "pending_subscriptions_unique_idx", + "columns": [ + { + "expression": "space_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "channel_id", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "repo_full_name", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": true, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": { + "pending_subscriptions_towns_user_id_github_user_tokens_towns_user_id_fk": { + "name": "pending_subscriptions_towns_user_id_github_user_tokens_towns_user_id_fk", + "tableFrom": "pending_subscriptions", + "tableTo": "github_user_tokens", + "columnsFrom": [ + "towns_user_id" + ], + "columnsTo": [ + "towns_user_id" + ], + "onDelete": "cascade", + "onUpdate": "no action" + } + }, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.repo_polling_state": { + "name": "repo_polling_state", + "schema": "", + "columns": { + "repo": { + "name": "repo", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "etag": { + "name": "etag", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "last_event_id": { + "name": "last_event_id", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "last_polled_at": { + "name": "last_polled_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": false + }, + "default_branch": { + "name": "default_branch", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "updated_at": { + "name": "updated_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": {}, + "isRLSEnabled": false + }, + "public.webhook_deliveries": { + "name": "webhook_deliveries", + "schema": "", + "columns": { + "delivery_id": { + "name": "delivery_id", + "type": "text", + "primaryKey": true, + "notNull": true + }, + "installation_id": { + "name": "installation_id", + "type": "integer", + "primaryKey": false, + "notNull": false + }, + "event_type": { + "name": "event_type", + "type": "text", + "primaryKey": false, + "notNull": true + }, + "delivered_at": { + "name": "delivered_at", + "type": "timestamp with time zone", + "primaryKey": false, + "notNull": true + }, + "status": { + "name": "status", + "type": "text", + "primaryKey": false, + "notNull": true, + "default": "'pending'" + }, + "error": { + "name": "error", + "type": "text", + "primaryKey": false, + "notNull": false + }, + "retry_count": { + "name": "retry_count", + "type": "integer", + "primaryKey": false, + "notNull": true, + "default": 0 + } + }, + "indexes": { + "idx_deliveries_status": { + "name": "idx_deliveries_status", + "columns": [ + { + "expression": "status", + "isExpression": false, + "asc": true, + "nulls": "last" + }, + { + "expression": "delivered_at", + "isExpression": false, + "asc": true, + "nulls": "last" + } + ], + "isUnique": false, + "concurrently": false, + "method": "btree", + "with": {} + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "policies": {}, + "checkConstraints": { + "status_check": { + "name": "status_check", + "value": "\"webhook_deliveries\".\"status\" IN ('pending', 'success', 'failed')" + } + }, + "isRLSEnabled": false + } + }, + "enums": {}, + "schemas": {}, + "sequences": {}, + "roles": {}, + "policies": {}, + "views": {}, + "_meta": { + "columns": {}, + "schemas": {}, + "tables": {} + } +} \ No newline at end of file diff --git a/drizzle/meta/_journal.json b/drizzle/meta/_journal.json index 74bd3ca..7316829 100644 --- a/drizzle/meta/_journal.json +++ b/drizzle/meta/_journal.json @@ -43,6 +43,13 @@ "when": 1764410325645, "tag": "0005_neat_blindfold", "breakpoints": true + }, + { + "idx": 6, + "version": "7", + "when": 1764615938698, + "tag": "0006_certain_micromacro", + "breakpoints": true } ] } \ No newline at end of file diff --git a/src/db/schema.ts b/src/db/schema.ts index ff33067..91799ad 100644 --- a/src/db/schema.ts +++ b/src/db/schema.ts @@ -229,3 +229,44 @@ export const pendingSubscriptions = pgTable( ), }) ); + +/** + * Maps GitHub PR/issue events to Towns thread IDs for grouping related events + * Enables threading: PR opened events start threads, subsequent events reply + */ +export const eventThreads = pgTable( + "event_threads", + { + id: serial("id").primaryKey(), + spaceId: text("space_id").notNull(), + channelId: text("channel_id").notNull(), + repoFullName: text("repo_full_name").notNull(), + anchorType: text("anchor_type").notNull(), // 'pr' | 'issue' + anchorNumber: integer("anchor_number").notNull(), // PR/issue number + threadEventId: text("thread_event_id").notNull(), // Towns eventId of anchor message + createdAt: timestamp("created_at", { withTimezone: true }).notNull(), + expiresAt: timestamp("expires_at", { withTimezone: true }).notNull(), // Auto-cleanup after 30 days + }, + table => ({ + anchorTypeCheck: check( + "anchor_type_check", + sql`${table.anchorType} IN ('pr', 'issue')` + ), + // Unique constraint on (space, channel, repo, type, number) + uniqueThread: uniqueIndex("event_threads_unique_idx").on( + table.spaceId, + table.channelId, + table.repoFullName, + table.anchorType, + table.anchorNumber + ), + // Index for looking up threads by repo and PR/issue number + repoAnchorIndex: index("idx_event_threads_repo_anchor").on( + table.repoFullName, + table.anchorType, + table.anchorNumber + ), + // Index for cleanup job to find expired threads + expiresIndex: index("idx_event_threads_expires").on(table.expiresAt), + }) +); diff --git a/src/github-app/event-processor.ts b/src/github-app/event-processor.ts index 52b8fd6..4c3a5a7 100644 --- a/src/github-app/event-processor.ts +++ b/src/github-app/event-processor.ts @@ -19,6 +19,7 @@ import type { BranchFilter, SubscriptionService, } from "../services/subscription-service"; +import type { AnchorType, ThreadService } from "../services/thread-service"; import type { TownsBot } from "../types/bot"; import type { CreatePayload, @@ -35,18 +36,34 @@ import type { WorkflowRunPayload, } from "../types/webhooks"; +/** + * Threading context for events that can be grouped into threads + */ +interface ThreadingContext { + anchorType: AnchorType; + anchorNumber: number; + isAnchor: boolean; // true for "opened" events that start a thread +} + /** * EventProcessor - Routes webhook events to formatters and sends to subscribed channels * * Maps webhook event types to subscription event types and filters by user preferences. + * Supports threading: PR/issue opened events start threads, follow-up events reply to them. */ export class EventProcessor { private bot: TownsBot; private subscriptionService: SubscriptionService; + private threadService: ThreadService; - constructor(bot: TownsBot, subscriptionService: SubscriptionService) { + constructor( + bot: TownsBot, + subscriptionService: SubscriptionService, + threadService: ThreadService + ) { this.bot = bot; this.subscriptionService = subscriptionService; + this.threadService = threadService; } /** @@ -58,6 +75,7 @@ export class EventProcessor { * @param formatter - Function to format the event as a message * @param logContext - Optional context string for logging * @param branchContext - Optional branch context for branch-specific filtering + * @param threadingContext - Optional threading context for PR/issue grouping */ private async processEvent< T extends { repository: { full_name: string; default_branch: string } }, @@ -66,7 +84,8 @@ export class EventProcessor { eventType: EventType, formatter: (event: T) => string, logContext?: string, - branchContext?: { branch: string } + branchContext?: { branch: string }, + threadingContext?: ThreadingContext ) { if (logContext) { console.log(`Processing ${logContext}`); @@ -80,9 +99,11 @@ export class EventProcessor { ); } + const repoFullName = event.repository.full_name; + // Get subscribed channels for this repo (webhook mode only) const channels = await this.subscriptionService.getRepoSubscribers( - event.repository.full_name, + repoFullName, "webhook" ); @@ -119,37 +140,67 @@ export class EventProcessor { return; } - // Send to all interested channels in parallel - const results = await Promise.allSettled( - interestedChannels.map(channel => - this.bot.sendMessage(channel.channelId, message) - ) - ); + // Send to all interested channels + // For threaded events, look up or create thread mappings per channel + const sendPromises = interestedChannels.map(async channel => { + const { spaceId, channelId } = channel; + + try { + // For follow-up events, look up existing thread (undefined if not found or anchor) + const threadId = + threadingContext && !threadingContext.isAnchor + ? ((await this.threadService.getThreadId( + spaceId, + channelId, + repoFullName, + threadingContext.anchorType, + threadingContext.anchorNumber + )) ?? undefined) + : undefined; + + // Send message (threaded for follow-ups, top-level for anchors/no-context) + const { eventId } = await this.bot.sendMessage(channelId, message, { + threadId, + }); - // Log failures - results.forEach((result, index) => { - if (result.status === "rejected") { - console.error( - `Failed to send to ${interestedChannels[index].channelId}:`, - result.reason - ); + // Store thread mapping for anchor events + if (threadingContext?.isAnchor && eventId) { + await this.threadService.storeThread({ + spaceId, + channelId, + repoFullName, + anchorType: threadingContext.anchorType, + anchorNumber: threadingContext.anchorNumber, + threadEventId: eventId, + }); + } + } catch (error) { + console.error(`Failed to send to ${channel.channelId}:`, error); } }); + + await Promise.allSettled(sendPromises); } /** * Process a pull request webhook event * Branch filter applies to base branch (merge target) + * Threading: "opened" starts thread, other actions reply to it */ async onPullRequest(event: PullRequestPayload) { - const { pull_request, repository } = event; + const { pull_request, repository, action } = event; const baseBranch = pull_request.base.ref; await this.processEvent( event, "pr", formatPullRequest, - `PR event: ${event.action} - ${repository.full_name}#${pull_request.number}`, - { branch: baseBranch } + `PR event: ${action} - ${repository.full_name}#${pull_request.number}`, + { branch: baseBranch }, + { + anchorType: "pr", + anchorNumber: pull_request.number, + isAnchor: action === "opened", + } ); } @@ -172,14 +223,21 @@ export class EventProcessor { /** * Process an issues webhook event + * Threading: "opened" starts thread, other actions reply to it */ async onIssues(event: IssuesPayload) { - const { issue, repository } = event; + const { issue, repository, action } = event; await this.processEvent( event, "issues", formatIssue, - `issue event: ${event.action} - ${repository.full_name}#${issue.number}` + `issue event: ${action} - ${repository.full_name}#${issue.number}`, + undefined, + { + anchorType: "issue", + anchorNumber: issue.number, + isAnchor: action === "opened", + } ); } @@ -215,20 +273,32 @@ export class EventProcessor { /** * Process an issue comment webhook event + * Threading: comments thread to parent PR or issue + * Note: GitHub fires issue_comment for both issues AND PRs */ async onIssueComment(event: IssueCommentPayload) { const { issue, repository } = event; + // Determine if this is a comment on a PR or an issue + // GitHub includes pull_request field in the issue object for PR comments + const isPrComment = "pull_request" in issue && issue.pull_request != null; await this.processEvent( event, "comments", formatIssueComment, - `issue comment event: ${event.action} - ${repository.full_name}#${issue.number}` + `issue comment event: ${event.action} - ${repository.full_name}#${issue.number}`, + undefined, + { + anchorType: isPrComment ? "pr" : "issue", + anchorNumber: issue.number, + isAnchor: false, // Comments are never anchors + } ); } /** * Process a pull request review webhook event * Branch filter applies to the PR's base branch (merge target) + * Threading: reviews thread to parent PR */ async onPullRequestReview(event: PullRequestReviewPayload) { const { pull_request, repository } = event; @@ -238,7 +308,12 @@ export class EventProcessor { "reviews", formatPullRequestReview, `PR review event: ${event.action} - ${repository.full_name}#${pull_request.number}`, - { branch: baseBranch } + { branch: baseBranch }, + { + anchorType: "pr", + anchorNumber: pull_request.number, + isAnchor: false, // Reviews are never anchors + } ); } @@ -254,7 +329,12 @@ export class EventProcessor { "review_comments", formatPullRequestReviewComment, `PR review comment event: ${event.action} - ${repository.full_name}#${pull_request.number}`, - { branch: baseBranch } + { branch: baseBranch }, + { + anchorType: "pr", + anchorNumber: pull_request.number, + isAnchor: false, + } ); } diff --git a/src/index.ts b/src/index.ts index 01c9200..cb11c02 100644 --- a/src/index.ts +++ b/src/index.ts @@ -17,6 +17,7 @@ import { handleOAuthCallback } from "./routes/oauth-callback"; import { GitHubOAuthService } from "./services/github-oauth-service"; import { PollingService } from "./services/polling-service"; import { SubscriptionService } from "./services/subscription-service"; +import { ThreadService } from "./services/thread-service"; await runMigrations(); console.log("✅ Database ready (schema ensured)"); @@ -48,8 +49,15 @@ const subscriptionService = new SubscriptionService( // Enable automatic subscription upgrades when repos are added to GitHub App installationService.setSubscriptionService(subscriptionService); +// Thread service for grouping related events +const threadService = new ThreadService(); + // Event processing service -const eventProcessor = new EventProcessor(bot, subscriptionService); +const eventProcessor = new EventProcessor( + bot, + subscriptionService, + threadService +); // Polling service (5 minute intervals) const pollingService = new PollingService( @@ -230,7 +238,10 @@ console.log("✅ GitHub polling service started (5 minute intervals)"); // Start periodic cleanup of expired OAuth states (every hour) oauthService.startOAuthStateCleanup(); - console.log("✅ OAuth state cleanup started (hourly)"); +// Start periodic cleanup of expired thread mappings (every 24 hours) +threadService.startPeriodicCleanup(); +console.log("✅ Thread cleanup service started (daily cleanup)"); + export default app; diff --git a/src/services/subscription-service.ts b/src/services/subscription-service.ts index d0948c9..5924562 100644 --- a/src/services/subscription-service.ts +++ b/src/services/subscription-service.ts @@ -616,6 +616,7 @@ export class SubscriptionService { deliveryMode?: "webhook" | "polling" ): Promise< Array<{ + spaceId: string; channelId: string; eventTypes: EventType[]; branchFilter: BranchFilter; @@ -629,6 +630,7 @@ export class SubscriptionService { const results = await db .select({ + spaceId: githubSubscriptions.spaceId, channelId: githubSubscriptions.channelId, eventTypes: githubSubscriptions.eventTypes, branchFilter: githubSubscriptions.branchFilter, @@ -637,6 +639,7 @@ export class SubscriptionService { .where(and(...conditions)); return results.map(r => ({ + spaceId: r.spaceId, channelId: r.channelId, eventTypes: parseEventTypes(r.eventTypes), branchFilter: r.branchFilter, diff --git a/src/services/thread-service.ts b/src/services/thread-service.ts new file mode 100644 index 0000000..5e92a13 --- /dev/null +++ b/src/services/thread-service.ts @@ -0,0 +1,142 @@ +import { and, eq, gte, lt } from "drizzle-orm"; + +import { db } from "../db"; +import { eventThreads } from "../db/schema"; + +export type AnchorType = "pr" | "issue"; + +/** Default thread expiration: 30 days */ +const DEFAULT_EXPIRY_DAYS = 30; + +/** + * ThreadService - Manages thread mappings for GitHub event grouping + * + * Stores mappings from (repo, PR/issue number) to Towns thread IDs, + * enabling related events to be sent as thread replies. + */ +export class ThreadService { + /** + * Get the thread event ID for a PR or issue + * Returns null if no thread exists (anchor wasn't seen or expired) + */ + async getThreadId( + spaceId: string, + channelId: string, + repoFullName: string, + anchorType: AnchorType, + anchorNumber: number + ): Promise { + const results = await db + .select({ threadEventId: eventThreads.threadEventId }) + .from(eventThreads) + .where( + and( + eq(eventThreads.spaceId, spaceId), + eq(eventThreads.channelId, channelId), + eq(eventThreads.repoFullName, repoFullName), + eq(eventThreads.anchorType, anchorType), + eq(eventThreads.anchorNumber, anchorNumber), + gte(eventThreads.expiresAt, new Date()) // Exclude expired threads + ) + ) + .limit(1); + + return results[0]?.threadEventId ?? null; + } + + /** + * Store a thread mapping when an anchor event (PR/issue opened) is sent + */ + async storeThread( + params: { + spaceId: string; + channelId: string; + repoFullName: string; + anchorType: AnchorType; + anchorNumber: number; + threadEventId: string; + }, + expiryDays = DEFAULT_EXPIRY_DAYS + ): Promise { + const now = new Date(); + const expiresAt = new Date(now); + expiresAt.setDate(expiresAt.getDate() + expiryDays); + + // Upsert: update if exists, insert if not + await db + .insert(eventThreads) + .values({ + ...params, + createdAt: now, + expiresAt, + }) + .onConflictDoUpdate({ + target: [ + eventThreads.spaceId, + eventThreads.channelId, + eventThreads.repoFullName, + eventThreads.anchorType, + eventThreads.anchorNumber, + ], + set: { + threadEventId: params.threadEventId, + expiresAt, + }, + }); + } + + /** + * Delete expired thread mappings + * Should be called periodically (e.g., daily cleanup job) + */ + async cleanupExpired(): Promise { + const result = await db + .delete(eventThreads) + .where(lt(eventThreads.expiresAt, new Date())) + .returning({ id: eventThreads.id }); + + return result.length; + } + + /** + * Start periodic cleanup of expired thread mappings + * + * @param intervalMs - Cleanup interval in milliseconds (default: 24 hours) + * @returns Timer ID that can be used to stop the cleanup + */ + startPeriodicCleanup( + intervalMs: number = 24 * 60 * 60 * 1000 + ): ReturnType { + console.log( + `[Thread Cleanup] Starting periodic cleanup (every ${intervalMs / 1000 / 60 / 60} hours)` + ); + + // Run cleanup immediately on start + this.cleanupExpired() + .then(count => { + if (count > 0) { + console.log( + `[Thread Cleanup] Removed ${count} expired thread mappings` + ); + } + }) + .catch(error => { + console.error("[Thread Cleanup] Initial cleanup failed:", error); + }); + + // Schedule periodic cleanup + return setInterval(() => { + this.cleanupExpired() + .then(count => { + if (count > 0) { + console.log( + `[Thread Cleanup] Removed ${count} expired thread mappings` + ); + } + }) + .catch(error => { + console.error("[Thread Cleanup] Periodic cleanup failed:", error); + }); + }, intervalMs); + } +}