Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
"main": "./dist/bot.js",
"type": "module",
"scripts": {
"start": "NODE_ENV=production node dist/bot.js",
"start": "NODE_ENV=production node --import ./dist/instrumentation.js dist/bot.js",
"build": "tsup",
"dev:raw": "NODE_ENV=development tsx watch --clear-screen=false --env-file=.env src/bot.ts",
"dev:raw": "NODE_ENV=development tsx watch --clear-screen=false --env-file=.env --import ./src/instrumentation.ts src/bot.ts",
"dev": "pnpm run dev:raw | pino-pretty",
"test": "vitest",
"typecheck": "tsc --noEmit",
Expand Down Expand Up @@ -37,6 +37,20 @@
"@grammyjs/menu": "^1.3.1",
"@grammyjs/parse-mode": "^1.11.1",
"@grammyjs/runner": "^2.0.3",
"@opentelemetry/api": "^1.9.0",
"@opentelemetry/exporter-logs-otlp-proto": "^0.213.0",
"@opentelemetry/exporter-metrics-otlp-proto": "^0.213.0",
"@opentelemetry/exporter-trace-otlp-proto": "^0.213.0",
"@opentelemetry/instrumentation-http": "^0.213.0",
"@opentelemetry/instrumentation-pino": "^0.59.0",
"@opentelemetry/instrumentation-redis-4": "^0.49.0",
"@opentelemetry/resources": "^2.6.0",
"@opentelemetry/sdk-logs": "^0.213.0",
"@opentelemetry/sdk-metrics": "^2.6.0",
"@opentelemetry/sdk-node": "^0.213.0",
"@opentelemetry/sdk-trace-base": "^2.6.0",
"@opentelemetry/sdk-trace-node": "^2.6.0",
"@opentelemetry/semantic-conventions": "^1.40.0",
"@polinetwork/backend": "^0.15.3",
"@t3-oss/env-core": "^0.13.4",
"@trpc/client": "^11.5.1",
Expand Down
836 changes: 822 additions & 14 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

35 changes: 34 additions & 1 deletion src/backend.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,46 @@
import { type AppRouter, TRPC_PATH } from "@polinetwork/backend"
import { createTRPCClient, httpBatchLink, TRPCClientError } from "@trpc/client"
import type { inferRouterInputs, inferRouterOutputs } from "@trpc/server"
import { observable } from "@trpc/server/observable"
import { SuperJSON } from "superjson"

import { env } from "./env"
import { logger } from "./logger"
import { BotAttributes, botMetrics } from "./telemetry"

const url = `http://${env.BACKEND_URL}${TRPC_PATH}`
export const api = createTRPCClient<AppRouter>({ links: [httpBatchLink({ url, transformer: SuperJSON })] })
export const api = createTRPCClient<AppRouter>({
links: [
// Custom link that measures tRPC call duration
() =>
({ op, next }) => {
const start = performance.now()
return observable((observer) => {
const sub = next(op).subscribe({
next(value) {
botMetrics.trpcDuration.record(performance.now() - start, {
[BotAttributes.TRPC_PROCEDURE]: op.path,
[BotAttributes.TRPC_SUCCESS]: true,
})
observer.next(value)
},
error(err) {
botMetrics.trpcDuration.record(performance.now() - start, {
[BotAttributes.TRPC_PROCEDURE]: op.path,
[BotAttributes.TRPC_SUCCESS]: false,
})
observer.error(err)
},
complete() {
observer.complete()
},
})
return sub.unsubscribe
})
},
httpBatchLink({ url, transformer: SuperJSON }),
],
})

export type ApiOutput = inferRouterOutputs<AppRouter>
export type ApiInput = inferRouterInputs<AppRouter>
Expand Down
18 changes: 17 additions & 1 deletion src/bot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@ import { checkUsername } from "./middlewares/check-username"
import { GroupSpecificActions } from "./middlewares/group-specific-actions"
import { messageLink } from "./middlewares/message-link"
import { MessageUserStorage } from "./middlewares/message-user-storage"
import { telemetryMiddleware } from "./middlewares/telemetry"
import { modules, sharedDataInit } from "./modules"
import { Moderation } from "./modules/moderation"
import { redis } from "./redis"
import { BotAttributes, recordException } from "./telemetry"
import { once } from "./utils/once"
import { setTelegramId } from "./utils/telegram-id"
import type { Context, ModuleShared } from "./utils/types"
Expand Down Expand Up @@ -63,6 +65,9 @@ bot.use(
})
)

// Telemetry: root span per update — must be first after sequentialize
bot.use(telemetryMiddleware)

bot.init().then(() => {
const sharedData: ModuleShared = {
api: bot.api,
Expand Down Expand Up @@ -94,6 +99,10 @@ bot.on("message", checkUsername)

bot.catch(async (err) => {
const { error } = err
recordException(error, {
name: "bot.error",
attributes: { [BotAttributes.IMPORTANCE]: "high" },
})
if (error instanceof GrammyError) {
await tgLogger.exception({ type: "BOT_ERROR", error }, "bot.catch() -- middleware stack")
} else if (error instanceof HttpError) {
Expand Down Expand Up @@ -123,7 +132,10 @@ const terminate = once(async (signal: NodeJS.Signals) => {
const p2 = redis.quit()
const p3 = runner.isRunning() && runner.stop()
const p4 = modules.stop()
await Promise.all([p1, p2, p3, p4])
// Flush pending telemetry (set by instrumentation.ts via globalThis)
const otelShutdown = (globalThis as Record<string, unknown>).__otelShutdown as (() => Promise<void>) | undefined
const p5 = otelShutdown?.() ?? Promise.resolve()
await Promise.all([p1, p2, p3, p4, p5])
logger.info("Bot stopped!")
process.exit(0)
})
Expand All @@ -132,6 +144,10 @@ process.on("SIGINT", () => terminate("SIGINT"))
process.on("SIGTERM", () => terminate("SIGTERM"))

process.on("unhandledRejection", (reason: Error, promise) => {
recordException(reason, {
name: "bot.unhandled_rejection",
attributes: { [BotAttributes.IMPORTANCE]: "high" },
})
logger.fatal({ reason, promise }, "UNHANDLED PROMISE REJECTION")
void tgLogger.exception({ type: "UNHANDLED_PROMISE", error: reason, promise })
})
5 changes: 5 additions & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ export const env = createEnv({
REDIS_PORT: z.coerce.number().min(1).max(65535).default(6379),
REDIS_USERNAME: z.string().min(1).optional(),
REDIS_PASSWORD: z.string().min(1).optional(),
LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace", "silent"]).default("debug"),
NODE_ENV: z.enum(["development", "production"]).default("development"),
OPENAI_API_KEY: z.string().optional(),
OTEL_EXPORTER_OTLP_ENDPOINT: z.string().default("http://localhost:4318"),
OTEL_SERVICE_NAME: z.string().default("polinetwork-telegram-bot"),
OTEL_SERVICE_VERSION: z.string().default("unknown"),
OTEL_STORAGE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(0.1),
},

runtimeEnv: process.env,
Expand Down
97 changes: 97 additions & 0 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import type { Attributes } from "@opentelemetry/api"
import { OTLPLogExporter } from "@opentelemetry/exporter-logs-otlp-proto"
import { OTLPMetricExporter } from "@opentelemetry/exporter-metrics-otlp-proto"
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-proto"
import { HttpInstrumentation, type HttpInstrumentationConfig } from "@opentelemetry/instrumentation-http"
import { PinoInstrumentation } from "@opentelemetry/instrumentation-pino"
import { RedisInstrumentation } from "@opentelemetry/instrumentation-redis-4"
import { resourceFromAttributes } from "@opentelemetry/resources"
import { BatchLogRecordProcessor } from "@opentelemetry/sdk-logs"
import { PeriodicExportingMetricReader } from "@opentelemetry/sdk-metrics"
import { NodeSDK } from "@opentelemetry/sdk-node"
import {
ParentBasedSampler,
type Sampler,
SamplingDecision,
type SamplingResult,
TraceIdRatioBasedSampler,
} from "@opentelemetry/sdk-trace-node"
import {
ATTR_SERVICE_NAME,
ATTR_SERVICE_VERSION,
SEMRESATTRS_DEPLOYMENT_ENVIRONMENT,
} from "@opentelemetry/semantic-conventions"
import { env } from "./env"

const endpoint = env.OTEL_EXPORTER_OTLP_ENDPOINT
const serviceName = env.OTEL_SERVICE_NAME
const serviceVersion = env.OTEL_SERVICE_VERSION
const storageRate = env.OTEL_STORAGE_SAMPLE_RATE
const nodeEnv = env.NODE_ENV

function shouldIgnoreOutgoingHttpRequest(
request: Parameters<NonNullable<HttpInstrumentationConfig["ignoreOutgoingRequestHook"]>>[0]
) {
if (typeof request.path === "string") return request.path.endsWith("/getUpdates")
return false
}

/**
* Custom sampler that always traces high-importance spans (commands, automod)
* and samples storage/caching operations at a configurable rate.
*/
class BotSampler implements Sampler {
private ratioSampler = new TraceIdRatioBasedSampler(storageRate)

shouldSample(
context: Parameters<Sampler["shouldSample"]>[0],
traceId: string,
_spanName: string,
_spanKind: Parameters<Sampler["shouldSample"]>[3],
attributes: Attributes
): SamplingResult {
const importance = attributes["bot.importance"] as string | undefined

if (importance === "high") {
return { decision: SamplingDecision.RECORD_AND_SAMPLED }
}

if (importance === "low") {
return this.ratioSampler.shouldSample(context, traceId)
}

// Default: always sample (covers auto-instrumented HTTP, Redis, etc.)
return { decision: SamplingDecision.RECORD_AND_SAMPLED }
}

toString(): string {
return `BotSampler{storageRate=${storageRate}}`
}
}

const sdk = new NodeSDK({
resource: resourceFromAttributes({
[ATTR_SERVICE_NAME]: serviceName,
[ATTR_SERVICE_VERSION]: serviceVersion,
[SEMRESATTRS_DEPLOYMENT_ENVIRONMENT]: nodeEnv,
}),
sampler: new ParentBasedSampler({ root: new BotSampler() }),
traceExporter: new OTLPTraceExporter({ url: `${endpoint}/v1/traces` }),
metricReader: new PeriodicExportingMetricReader({
exporter: new OTLPMetricExporter({ url: `${endpoint}/v1/metrics` }),
}),
logRecordProcessor: new BatchLogRecordProcessor(new OTLPLogExporter({ url: `${endpoint}/v1/logs` })),
instrumentations: [
new HttpInstrumentation({
ignoreOutgoingRequestHook: shouldIgnoreOutgoingHttpRequest,
}),
new RedisInstrumentation(),
new PinoInstrumentation(),
],
})

sdk.start()

// Expose shutdown via globalThis so the app can flush telemetry on exit
// without importing this file (which would cause tsup to bundle it twice).
;(globalThis as Record<string, unknown>).__otelShutdown = () => sdk.shutdown()
25 changes: 24 additions & 1 deletion src/lib/managed-commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type { ChatMember, Message } from "grammy/types"
import type { Result } from "neverthrow"
import { err, ok } from "neverthrow"
import type { LogFn } from "pino"
import { BotAttributes, botMetrics, withSpan } from "@/telemetry"
import { fmt } from "@/utils/format"
import { wait } from "@/utils/wait"
import type {
Expand Down Expand Up @@ -375,6 +376,10 @@ export class ManagedCommands<
if (cmd.permissions) {
const allowed = await this.permissionHandler({ command: cmd, context: ctx })
if (!allowed) {
botMetrics.commandsCount.add(1, {
[BotAttributes.COMMAND_NAME]: cmd.trigger,
[BotAttributes.COMMAND_PERMITTED]: false,
})
this.logger.info(
{ command_permissions: cmd.permissions },
`[ManagedCommands] command '/${cmd.trigger}' invoked by ${this.printUsername(ctx)} without permissions`
Expand All @@ -387,8 +392,26 @@ export class ManagedCommands<
}
}

botMetrics.commandsCount.add(1, {
[BotAttributes.COMMAND_NAME]: cmd.trigger,
[BotAttributes.COMMAND_PERMITTED]: true,
})

// enter the conversation that handles the command execution
await ctx.conversation.enter(cmd.trigger)
await withSpan(
`bot.command.${cmd.trigger}`,
{
[BotAttributes.IMPORTANCE]: "high",
[BotAttributes.COMMAND_NAME]: cmd.trigger,
[BotAttributes.COMMAND_SCOPE]: cmd.scope ?? "both",
[BotAttributes.CHAT_ID]: ctx.chat.id,
[BotAttributes.USER_ID]: ctx.from?.id ?? 0,
[BotAttributes.USERNAME]: ctx.from?.username ?? "unknown",
},
async () => {
await ctx.conversation.enter(cmd.trigger)
}
)
})
return this
}
Expand Down
17 changes: 13 additions & 4 deletions src/logger.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import { createEnv } from "@t3-oss/env-core"
import pino from "pino"
import { z } from "zod/v4"

const loggerEnv = createEnv({
server: {
LOG_LEVEL: z.enum(["fatal", "error", "warn", "info", "debug", "trace", "silent"]).default("debug"),
},
runtimeEnv: process.env,
emptyStringAsUndefined: true,
})

export const logger = pino({
// the reason why we use process.env instead of @/env is that
// we want the logger to be working also in tests where we do not have
// environment variables set. If we used @/env it would throw an error
level: process.env.LOG_LEVEL || "debug",
// Keep logger bootstrap independent from the main app env module:
// tests may import the logger without having the full runtime env set.
level: loggerEnv.LOG_LEVEL,
})
Loading
Loading