diff --git a/CHANGELOG.md b/CHANGELOG.md index d315ee6..d00d9ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## 1.5.0 (2025-12-29) + +- feat: add metrics router for prometheus +- feat: add job routes for admin query +- fix: enhance job handling and logging in crawler + ## 1.4.7 (2025-12-15) - chore: update deps diff --git a/blocklets/snap-kit/api/src/index.ts b/blocklets/snap-kit/api/src/index.ts index e500830..7ef0120 100644 --- a/blocklets/snap-kit/api/src/index.ts +++ b/blocklets/snap-kit/api/src/index.ts @@ -10,13 +10,13 @@ import path from 'path'; import env from './libs/env'; import { logger } from './libs/logger'; import routes from './routes'; +import adminRoutes from './routes/admin'; +import metricsRoutes from './routes/metrics'; const { name, version } = require('../../package.json'); dotenv.config(); -logger.debug('preferences', env.preferences); - export const app = express(); createLogger.setupAccessLogger(app); @@ -26,10 +26,10 @@ app.use(express.json({ limit: '1 mb' })); app.use(express.urlencoded({ extended: true, limit: '1 mb' })); app.use(cors()); -const router = express.Router(); -router.use('/api', routes); +app.use('/api/admin', adminRoutes); +app.use('/api/metrics', metricsRoutes); +app.use('/api', routes); -app.use(router); app.use('/data', express.static(path.join(env.dataDir, 'data'), { maxAge: '365d', index: false })); // const isProduction = process.env.NODE_ENV === 'production' || process.env.ABT_NODE_SERVICE_ENV === 'production'; @@ -61,7 +61,7 @@ export const server = app.listen(port, async (err?: any) => { try { await initCrawler({ - concurrency: Math.max(1, env.preferences.concurrency || 0), + concurrency: Math.max(1, env.preferences.crawlConcurrency || 0), siteCron: { enabled: !!env.preferences.cronEnabled, immediate: !!env.preferences.cronImmediate, diff --git a/blocklets/snap-kit/api/src/routes/admin.ts b/blocklets/snap-kit/api/src/routes/admin.ts new file mode 100644 index 0000000..89acdd5 --- /dev/null +++ b/blocklets/snap-kit/api/src/routes/admin.ts @@ -0,0 +1,69 @@ +import { Job } from '@arcblock/crawler'; +import { Joi } from '@arcblock/validator'; +import { auth, session } from '@blocklet/sdk/lib/middlewares'; +import { Router } from 'express'; + +import { logger } from '../libs/logger'; + +const router = Router(); + +/** + * Admin API: Get job queue stats + */ +router.get('/jobs/stats', session({ accessKey: true }), auth({ roles: ['admin', 'owner'] }), async (_, res) => { + const result = await Job.stats(); + + logger.info('GET /admin/jobs/stats', result); + + return res.json({ + code: 'ok', + data: result, + }); +}); + +/** + * Admin API: Get job list with pagination + */ +const jobsSchema = Joi.object({ + page: Joi.number().integer().min(1).default(1), + pageSize: Joi.number().integer().min(1).max(100).default(20), + queue: Joi.string(), +}); +router.get('/jobs', session({ accessKey: true }), auth({ roles: ['admin', 'owner'] }), async (req, res) => { + const params = await jobsSchema.validateAsync(req.query); + const result = await Job.paginate(params); + + logger.info('GET /admin/jobs', { params, total: result.total }); + + return res.json({ + code: 'ok', + data: result, + }); +}); + +/** + * Admin API: Delete jobs by queue name or job ids + */ +const deleteJobsSchema = Joi.object({ + queue: Joi.string(), + ids: Joi.array().items(Joi.string()), +}).or('queue', 'ids'); +router.delete('/jobs', session({ accessKey: true }), auth({ roles: ['admin', 'owner'] }), async (req, res) => { + const params = await deleteJobsSchema.validateAsync(req.body); + + let result; + if (params.queue) { + result = await Job.deleteByQueue(params.queue); + logger.info('DELETE /admin/jobs by queue', { queue: params.queue, ...result }); + } else { + result = await Job.deleteByIds(params.ids); + logger.info('DELETE /admin/jobs by ids', { count: params.ids.length, ...result }); + } + + return res.json({ + code: 'ok', + data: result, + }); +}); + +export default router; diff --git a/blocklets/snap-kit/api/src/routes/index.ts b/blocklets/snap-kit/api/src/routes/index.ts index a3c20c9..481798c 100644 --- a/blocklets/snap-kit/api/src/routes/index.ts +++ b/blocklets/snap-kit/api/src/routes/index.ts @@ -1,29 +1,26 @@ import { crawlCode, crawlUrl, getLatestSnapshot, getSnapshot } from '@arcblock/crawler'; import { Joi } from '@arcblock/validator'; +import { auth, session } from '@blocklet/sdk/lib/middlewares'; import { Router } from 'express'; import qs from 'querystring'; import { logger } from '../libs/logger'; -const { session, auth } = require('@blocklet/sdk/lib/middlewares'); - const router = Router(); -router.use(session({ accessKey: true })); - /** * Crawl page html */ const crawlSchema = Joi.object({ url: Joi.string().uri().required(), headers: Joi.object().pattern(Joi.string(), Joi.string()).max(30), - timeout: Joi.number().integer().min(10).max(120).default(120), + timeout: Joi.number().integer().min(10).max(120).default(60), waitTime: Joi.number().integer().min(0).max(120).default(0), cookies: Joi.array().items(Joi.object({ name: Joi.string().required(), value: Joi.string().required() })), localStorage: Joi.array().items(Joi.object({ key: Joi.string().required(), value: Joi.string().required() })), sync: Joi.boolean().default(false), }); -router.post('/crawl', auth({ methods: ['accessKey'] }), async (req, res) => { +router.post('/crawl', session({ accessKey: true }), auth({ methods: ['accessKey'] }), async (req, res) => { const params = await crawlSchema.validateAsync(req.body); res.setTimeout(params.timeout * 1000, () => { @@ -71,7 +68,7 @@ const crawlGetSchema = Joi.object({ url: Joi.string().uri(), }).or('jobId', 'url'); -router.get('/crawl', auth({ methods: ['accessKey'] }), async (req, res) => { +router.get('/crawl', session({ accessKey: true }), auth({ methods: ['accessKey'] }), async (req, res) => { const params = await crawlGetSchema.validateAsync(req.query); const snapshot = params.jobId ? await getSnapshot(params.jobId) : await getLatestSnapshot(params.url); @@ -94,7 +91,7 @@ const snapSchema = Joi.object({ height: Joi.number().integer().min(500).default(900), quality: Joi.number().integer().min(1).max(100).default(80), format: Joi.string().valid('png', 'jpeg', 'webp').default('webp'), - timeout: Joi.number().integer().min(0).max(120).default(120), + timeout: Joi.number().integer().min(0).max(120).default(60), waitTime: Joi.number().integer().min(0).max(120).default(0), fullPage: Joi.boolean().default(false), headers: Joi.object().pattern(Joi.string(), Joi.string()).max(30), @@ -102,7 +99,7 @@ const snapSchema = Joi.object({ localStorage: Joi.array().items(Joi.object({ key: Joi.string().required(), value: Joi.string().required() })), sync: Joi.boolean().default(false), }); -router.post('/snap', auth({ methods: ['accessKey'] }), async (req, res) => { +router.post('/snap', session({ accessKey: true }), auth({ methods: ['accessKey'] }), async (req, res) => { const params = await snapSchema.validateAsync(req.body); res.setTimeout(params.timeout * 1000, () => { @@ -148,7 +145,7 @@ router.post('/snap', auth({ methods: ['accessKey'] }), async (req, res) => { const snapGetSchema = Joi.object({ jobId: Joi.string().required(), }); -router.get('/snap', auth({ methods: ['accessKey'] }), async (req, res) => { +router.get('/snap', session({ accessKey: true }), auth({ methods: ['accessKey'] }), async (req, res) => { const params = await snapGetSchema.validateAsync(req.query); const snapshot = await getSnapshot(params.jobId); @@ -187,10 +184,10 @@ const carbonSchema = Joi.object({ code: Joi.string().required(), format: Joi.string().valid('png', 'jpeg', 'webp').default('png'), sync: Joi.boolean().default(false), - timeout: Joi.number().integer().min(0).max(120).default(120), + timeout: Joi.number().integer().min(0).max(120).default(60), }); -router.post('/carbon', auth({ methods: ['accessKey'] }), async (req, res) => { +router.post('/carbon', session({ accessKey: true }), auth({ methods: ['accessKey'] }), async (req, res) => { const params = await carbonSchema.validateAsync(req.body); const { sync, timeout, ...carbonParams } = params; @@ -235,7 +232,7 @@ router.post('/carbon', auth({ methods: ['accessKey'] }), async (req, res) => { const carbonGetSchema = Joi.object({ jobId: Joi.string().required(), }); -router.get('/carbon', auth({ methods: ['accessKey'] }), async (req, res) => { +router.get('/carbon', session({ accessKey: true }), auth({ methods: ['accessKey'] }), async (req, res) => { const params = await carbonGetSchema.validateAsync(req.query); const snapshot = await getSnapshot(params.jobId); diff --git a/blocklets/snap-kit/api/src/routes/metrics.ts b/blocklets/snap-kit/api/src/routes/metrics.ts new file mode 100644 index 0000000..18479e2 --- /dev/null +++ b/blocklets/snap-kit/api/src/routes/metrics.ts @@ -0,0 +1,12 @@ +import { getContentType, getMetrics } from '@arcblock/crawler'; +import { Router } from 'express'; + +const router = Router(); + +router.get('/', async (_req, res) => { + const metrics = await getMetrics(); + res.set('Content-Type', getContentType()); + res.end(metrics); +}); + +export default router; diff --git a/blocklets/snap-kit/blocklet.yml b/blocklets/snap-kit/blocklet.yml index 36db329..6ae077c 100644 --- a/blocklets/snap-kit/blocklet.yml +++ b/blocklets/snap-kit/blocklet.yml @@ -16,7 +16,7 @@ repository: type: git url: git+https://github.com/blocklet/create-blocklet.git specVersion: 1.2.8 -version: 1.4.7 +version: 1.5.0 logo: logo.png files: - dist diff --git a/package.json b/package.json index 506c75c..2c98d66 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "crawler", "private": true, - "version": "1.4.7", + "version": "1.5.0", "scripts": { "dev": "pnpm run --filter @arcblock/crawler dev & pnpm run --filter @arcblock/crawler-middleware dev & pnpm run --filter @blocklet/snap-kit dev", "build:packages": "pnpm -r build", diff --git a/packages/crawler/package.json b/packages/crawler/package.json index 33631cc..9628ae5 100644 --- a/packages/crawler/package.json +++ b/packages/crawler/package.json @@ -1,6 +1,6 @@ { "name": "@arcblock/crawler", - "version": "1.4.7", + "version": "1.5.0", "main": "lib/cjs/index.js", "module": "lib/esm/index.js", "types": "lib/cjs/index.d.ts", @@ -69,6 +69,7 @@ "lodash": "^4.17.21", "lru-cache": "^10.4.3", "p-map": "^7.0.3", + "prom-client": "^15.1.3", "robots-parser": "^3.0.1", "sitemap": "^7.1.2", "sqlite3": "^5.1.7", diff --git a/packages/crawler/src/crawler.ts b/packages/crawler/src/crawler.ts index 8b7f40a..893d3f8 100644 --- a/packages/crawler/src/crawler.ts +++ b/packages/crawler/src/crawler.ts @@ -7,6 +7,7 @@ import fs from 'fs-extra'; import path from 'path'; import { config, logger } from './config'; +import { jobDurationSeconds, jobTotalLatencySeconds, jobsEnqueuedTotal, jobsTotal } from './metrics'; import { initPage } from './puppeteer'; import { createCarbonImage } from './services/carbon'; import { convertJobToSnapshot, deleteSnapshots, formatSnapshot } from './services/snapshot'; @@ -15,20 +16,20 @@ import { findMaxScrollHeight, formatUrl, isAcceptCrawler, md5, sleep } from './u const { BaseState } = require('@abtnode/models'); -let crawlQueue; -let syncQueue; -let codeQueue; -let cronQueue; - -export { crawlQueue, syncQueue, codeQueue, cronQueue }; +export const queueMap = { + urlCrawler: null as any, + syncCrawler: null as any, + codeCrawler: null as any, + cronJobs: null as any, +}; export function initQueue() { - crawlQueue = createCrawlQueue('urlCrawler'); - syncQueue = createCrawlQueue('syncCrawler'); - codeQueue = createCrawlQueue('codeCrawler', { + queueMap.urlCrawler = createCrawlQueue('urlCrawler'); + queueMap.syncCrawler = createCrawlQueue('syncCrawler'); + queueMap.codeCrawler = createCrawlQueue('codeCrawler', { handleScreenshot: createCarbonImage, }); - cronQueue = createCrawlQueue('cronJobs'); + queueMap.cronJobs = createCrawlQueue('cronJobs'); } type PageHandler = { @@ -41,28 +42,15 @@ export function createCrawlQueue(queue: string, handler?: PageHandler) { return createQueue({ store: new SequelizeStore(db, queue), - concurrency: config.concurrency, + options: { + concurrency: config.concurrency, + enableScheduledJob: true, + }, onJob: async (job: JobState) => { - logger.info('Starting to execute crawl job', job); - - // check robots.txt - if (!job.ignoreRobots) { - const canCrawl = await isAcceptCrawler(job.url); - if (!canCrawl) { - logger.error(`failed to crawl ${job.url}, denied by robots.txt`, job); - const snapshot = convertJobToSnapshot({ - job, - snapshot: { - status: 'failed', - error: 'Denied by robots.txt', - }, - }); - await Snapshot.upsert(snapshot); - return snapshot; - } - } + const startTime = Date.now(); + let status: 'success' | 'failed' = 'failed'; - const formattedJob: JobState = { + const formattedJob = { ...job, cookies: (config.cookies || []).concat(job.cookies || []), localStorage: (config.localStorage || []).concat(job.localStorage || []), @@ -70,6 +58,25 @@ export function createCrawlQueue(queue: string, handler?: PageHandler) { }; try { + logger.info(`Starting to execute ${queue} job`, { ...job, queueSize: await Job.count() }); + + // check robots.txt + if (!job.ignoreRobots) { + const canCrawl = await isAcceptCrawler(job.url); + if (!canCrawl) { + logger.error(`failed to crawl ${job.url}, denied by robots.txt`, job); + const snapshot = convertJobToSnapshot({ + job, + snapshot: { + status: 'failed', + error: 'Denied by robots.txt', + }, + }); + await Snapshot.upsert(snapshot); + return snapshot; + } + } + // get page content later const result = await getPageContent(formattedJob, handler); @@ -86,23 +93,21 @@ export function createCrawlQueue(queue: string, handler?: PageHandler) { await Snapshot.upsert(snapshot); return snapshot; } + const snapshot = await sequelize.transaction(async (txn) => { // delete old snapshot if (formattedJob.replace) { - try { - const deletedJobIds = await deleteSnapshots( - { - url: formattedJob.url, - replace: true, - }, - { txn }, - ); - if (deletedJobIds) { - logger.info('Deleted old snapshot', { deletedJobIds }); - } - } catch (error) { + const deletedJobIds = await deleteSnapshots( + { + url: formattedJob.url, + replace: true, + }, + { txn }, + ).catch((error) => { logger.error('Failed to delete old snapshot', { error, formattedJob }); - } + }); + + logger.info('Deleted old snapshot', { deletedJobIds }); } // save html and screenshot to data dir @@ -126,10 +131,13 @@ export function createCrawlQueue(queue: string, handler?: PageHandler) { return snapshot; }); + status = 'success'; return snapshot; } catch (error) { logger.error(`Failed to crawl ${formattedJob.url}`, { error, formattedJob }); + status = 'failed'; + const snapshot = convertJobToSnapshot({ job: formattedJob, snapshot: { @@ -139,6 +147,13 @@ export function createCrawlQueue(queue: string, handler?: PageHandler) { }); await Snapshot.upsert(snapshot); return snapshot; + } finally { + const now = Date.now(); + jobsTotal.inc({ queue, status }); + jobDurationSeconds.observe({ queue, status }, (now - startTime) / 1000); + if (job.enqueuedAt) { + jobTotalLatencySeconds.observe({ queue, status }, (now - job.enqueuedAt) / 1000); + } } }, }); @@ -200,7 +215,7 @@ export const getPageContent = async ( height = 900, quality = 80, format = 'webp', - timeout = 90 * 1000, + timeout = 60 * 1000, waitTime = 0, fullPage = false, headers, @@ -365,10 +380,15 @@ export const getPageContent = async ( */ // eslint-disable-next-line require-await export async function enqueue( - queue, + queueName: keyof typeof queueMap, params: Omit, callback?: (snapshot: SnapshotModel | null) => void, ) { + const queue = queueMap[queueName]; + if (!queue) { + throw new Error(`Queue ${queueName} not found`); + } + // skip duplicate job const existsJob = await Job.isExists(params); if (existsJob && !params.sync) { @@ -376,28 +396,55 @@ export async function enqueue( return existsJob.id; } - logger.info('enqueue crawl job', params); - const jobId = randomUUID(); - const job = queue.push({ ...params, id: jobId }); + const enqueuedAt = Date.now(); + const job = queue.push({ job: { ...params, id: jobId, enqueuedAt }, jobId }); + jobsEnqueuedTotal.inc({ queue: queueName }); + + // Get current queue size for logging + const queueSize = await Job.count(); + logger.info('enqueue crawl job', { ...params, jobId, queueSize }); job.on('finished', async ({ result }) => { - logger.info(`Crawl completed ${params.url}, status: ${result ? 'success' : 'failed'}`, { job: params, result }); - callback?.(result ? await formatSnapshot(result) : null); + try { + const isSuccess = result?.status === 'success'; + const queueSize = await Job.count(); + + logger.info(`Crawl completed ${params.url}, status: ${isSuccess ? 'success' : 'failed'}`, { + job: params, + result, + queueSize, + }); + + callback?.(result ? await formatSnapshot(result) : null); + } catch (error) { + logger.error(`Error in finished event handler for ${params.url}`, { error }); + callback?.(null); + } }); - job.on('failed', ({ error }) => { - logger.error(`Failed to execute job for ${params.url}`, { error, job: params }); - callback?.(null); + job.on('failed', async ({ error }) => { + try { + const queueSize = await Job.count(); + logger.error(`Failed to execute job for ${params.url}`, { error, job: params, queueSize }); + } catch (err) { + logger.error(`Error in failed event handler for ${params.url}`, { error: err }); + } finally { + callback?.(null); + } }); return jobId; } export function crawlUrl(params: Omit, callback?: (snapshot: SnapshotModel | null) => void) { - return enqueue(params.sync ? syncQueue : crawlQueue, params, callback); + return enqueue(params.sync ? 'syncCrawler' : 'urlCrawler', params, callback); } export function crawlCode(params: Omit, callback?: (snapshot: SnapshotModel | null) => void) { - return enqueue(codeQueue, { ignoreRobots: true, includeHtml: false, includeScreenshot: true, ...params }, callback); + return enqueue( + 'codeCrawler', + { ignoreRobots: true, includeHtml: false, includeScreenshot: true, ...params }, + callback, + ); } diff --git a/packages/crawler/src/index.ts b/packages/crawler/src/index.ts index 634349e..1bd2620 100644 --- a/packages/crawler/src/index.ts +++ b/packages/crawler/src/index.ts @@ -9,7 +9,9 @@ import { migrate } from './store/migrate'; export * from './crawler'; export * from './services/snapshot'; +export * from './store/job'; export * as utils from './utils'; +export * from './metrics'; export async function initCrawler( params: Pick, diff --git a/packages/crawler/src/metrics.ts b/packages/crawler/src/metrics.ts new file mode 100644 index 0000000..8b6f52a --- /dev/null +++ b/packages/crawler/src/metrics.ts @@ -0,0 +1,78 @@ +import { Counter, Gauge, Histogram, Registry } from 'prom-client'; + +import { Job } from './store'; + +// Create a new registry +const register = new Registry(); + +// ========== Counter - 爬取任务计数 ========== +export const jobsTotal = new Counter({ + name: 'crawler_jobs_total', + help: 'Total number of crawl jobs processed', + labelNames: ['queue', 'status'] as const, + registers: [register], +}); + +// ========== Counter - 入队任务数 ========== +export const jobsEnqueuedTotal = new Counter({ + name: 'crawler_jobs_enqueued_total', + help: 'Total number of crawl jobs enqueued', + labelNames: ['queue'] as const, + registers: [register], +}); + +// ========== Histogram - 任务执行耗时 ========== +export const jobDurationSeconds = new Histogram({ + name: 'crawler_job_duration_seconds', + help: 'Duration of crawl job execution in seconds', + labelNames: ['queue', 'status'] as const, + buckets: [10, 30, 60, 120, 300, 600, 900, 1800, 3600], + registers: [register], +}); + +// ========== Histogram - 入队到完成总耗时 ========== +export const jobTotalLatencySeconds = new Histogram({ + name: 'crawler_job_total_latency_seconds', + help: 'Total latency from enqueue to completion in seconds', + labelNames: ['queue', 'status'] as const, + buckets: [10, 30, 60, 120, 300, 600, 900, 1800, 3600], + registers: [register], +}); + +// ========== Gauge - 队列大小 ========== +export const queueSize = new Gauge({ + name: 'crawler_queue_size', + help: 'Current number of jobs in queue', + labelNames: ['queue'] as const, + registers: [register], +}); + +/** + * Collect all metrics by querying database + */ +export async function collectMetrics() { + try { + // 收集队列大小 + const jobStats = await Job.stats(); + jobStats.queues.forEach((q) => { + queueSize.set({ queue: q.queue }, q.count); + }); + } catch (error) { + console.error('Error collecting metrics:', error); + } +} + +/** + * Get metrics in Prometheus format + */ +export async function getMetrics() { + await collectMetrics(); + return register.metrics(); +} + +/** + * Get content type for metrics endpoint + */ +export function getContentType() { + return register.contentType; +} diff --git a/packages/crawler/src/puppeteer.ts b/packages/crawler/src/puppeteer.ts index fa5981d..867215a 100644 --- a/packages/crawler/src/puppeteer.ts +++ b/packages/crawler/src/puppeteer.ts @@ -4,6 +4,7 @@ import path from 'path'; import { clearInterval, setInterval } from 'timers'; import { config, logger } from './config'; +import { Job } from './store'; import { CRAWLER_FLAG, sleep } from './utils'; const BrowserStatus = { @@ -120,21 +121,25 @@ export async function launchBrowser() { '--no-sandbox', '--no-zygote', '--disable-setuid-sandbox', - '--disable-gpu', '--disable-dev-shm-usage', '--disable-site-isolation-trials', - '--disable-accelerated-2d-canvas', '--disable-extensions', - '--js-flags=--max_old_space_size=512', // 限制V8内存 + '--js-flags=--max_old_space_size=768', // 限制V8内存 '--disable-background-networking', '--disable-default-apps', // '--disable-web-security', // 允许跨域请求 - '--disable-software-rasterizer', '--disable-crash-reporter', '--disable-service-workers', '--disable-notifications', '--disable-infobars', '--font-render-hinting=none', + // WebGL: use software GL fallback for servers without GPU + '--enable-webgl', + '--ignore-gpu-blocklist', + '--use-gl=swiftshader', + '--use-angle=swiftshader', + '--enable-unsafe-swiftshader', + '--disable-gpu-sandbox', ], }); logger.info('Launch browser'); @@ -160,12 +165,21 @@ function checkBrowserActivated() { browserActivatedTimer = setInterval(async () => { if (browser) { const pages = await browser.pages().catch(() => [] as Page[]); - if (pages.length === 1 && pages[0]?.url() === 'about:blank') { + const jobCount = await Job.count().catch(() => 0); + + // Check if browser is inactive: only blank page AND no pending jobs + const isInactive = pages.length === 1 && pages[0]?.url() === 'about:blank' && jobCount === 0; + + if (isInactive) { count++; logger.debug(`Browser inactive count: ${count}/3`); } else { - count = 0; // 重置计数器! + count = 0; + if (jobCount > 0) { + logger.debug(`Browser has ${jobCount} pending jobs, keeping active`); + } } + if (count >= 3) { logger.info('Browser inactive for 3 minutes, closing...'); await closeBrowser({ diff --git a/packages/crawler/src/site.ts b/packages/crawler/src/site.ts index adf5d04..f8d82cf 100644 --- a/packages/crawler/src/site.ts +++ b/packages/crawler/src/site.ts @@ -3,8 +3,9 @@ import { randomUUID } from 'node:crypto'; import { SitemapItem } from 'sitemap'; import { Site, config, logger } from './config'; -import { cronQueue } from './crawler'; -import { Snapshot } from './store'; +import { queueMap } from './crawler'; +import { jobsEnqueuedTotal } from './metrics'; +import { Job, Snapshot } from './store'; import { formatUrl, getSitemapList } from './utils'; const crawlBlockletRunningMap = new Map(); @@ -74,25 +75,35 @@ export const crawlSite = async ({ url, pathname, interval = 0 }: Site) => { const jobId = randomUUID(); - cronQueue.push({ - id: jobId, - url, - lastModified: sitemapItem.lastmod, - includeScreenshot: false, - includeHtml: true, - replace: true, + queueMap.cronJobs.push({ + job: { + id: jobId, + url, + lastModified: sitemapItem.lastmod, + includeScreenshot: false, + includeHtml: true, + replace: true, + enqueuedAt: Date.now(), + }, + jobId, + delay: 5, }); + jobsEnqueuedTotal.inc({ queue: 'cronJobs' }); return jobId; }, { concurrency: config.siteCron?.concurrency || 30 }, ); + // Get current queue size for logging + const queueSize = await Job.count(); + logger.info('Enqueued jobs from sitemap finished', { url, pathname, processCount, crawlCount, + queueSize, }); return jobIds; diff --git a/packages/crawler/src/store/job.ts b/packages/crawler/src/store/job.ts index cb12db4..f516f97 100644 --- a/packages/crawler/src/store/job.ts +++ b/packages/crawler/src/store/job.ts @@ -19,6 +19,7 @@ export interface JobState { replace?: boolean; sync?: boolean; ignoreRobots?: boolean; + enqueuedAt?: number; headers?: Record; cookies?: CookieParam[]; localStorage?: { key: string; value: string }[]; @@ -32,6 +33,8 @@ export interface JobModel { willRunAt: number; delay: number; cancelled: boolean; + processingBy: string | null; + processingAt: number | null; } export class Job extends Model implements JobModel { @@ -43,12 +46,16 @@ export class Job extends Model implements JobModel { public retryCount!: JobModel['retryCount']; - public willRunAt!: JobModel['willRunAt']; - public delay!: JobModel['delay']; + public willRunAt!: JobModel['willRunAt']; + public cancelled!: JobModel['cancelled']; + public processingBy!: JobModel['processingBy']; + + public processingAt!: JobModel['processingAt']; + static initModel(sequelize: Sequelize) { return Job.init( { @@ -77,6 +84,14 @@ export class Job extends Model implements JobModel { type: DataTypes.BOOLEAN, defaultValue: false, }, + processingBy: { + type: DataTypes.STRING(32), + allowNull: true, + }, + processingAt: { + type: DataTypes.INTEGER, + allowNull: true, + }, createdAt: { type: DataTypes.DATE, defaultValue: DataTypes.NOW, @@ -133,4 +148,60 @@ export class Job extends Model implements JobModel { return existsJob?.get(); } + + static async paginate({ + page = 1, + pageSize = 20, + queue, + }: { + page?: number; + pageSize?: number; + queue?: string; + } = {}) { + const where = queue ? { queue } : {}; + const offset = (page - 1) * pageSize; + + const { count, rows } = await Job.findAndCountAll({ + where, + order: [['createdAt', 'DESC']], + limit: pageSize, + offset, + }); + + return { + total: count, + page, + pageSize, + totalPages: Math.ceil(count / pageSize), + data: rows.map((row) => row.toJSON()), + }; + } + + static async stats() { + const results = (await Job.findAll({ + attributes: ['queue', [sequelize.fn('COUNT', sequelize.col('id')), 'count']], + group: ['queue'], + raw: true, + })) as unknown as Array<{ queue: string; count: number }>; + + const total = results.reduce((sum, item) => sum + Number(item.count), 0); + + return { + total, + queues: results.map((item) => ({ + queue: item.queue, + count: Number(item.count), + })), + }; + } + + static async deleteByQueue(queue: string) { + const count = await Job.destroy({ where: { queue } }); + return { deleted: count }; + } + + static async deleteByIds(ids: string[]) { + const count = await Job.destroy({ where: { id: ids } }); + return { deleted: count }; + } } diff --git a/packages/crawler/src/store/migrate.ts b/packages/crawler/src/store/migrate.ts index 474ceb4..c43e753 100644 --- a/packages/crawler/src/store/migrate.ts +++ b/packages/crawler/src/store/migrate.ts @@ -4,6 +4,7 @@ import { SequelizeStorage, Umzug } from 'umzug'; import { sequelize } from './index'; import * as migration20250615 from './migrations/20250615-genesis'; import * as migration20250616Replace from './migrations/20250616-replace'; +import * as migration20251226JobProcessing from './migrations/20251226-job-processing'; const umzug = new Umzug({ migrations: [ @@ -17,6 +18,11 @@ const umzug = new Umzug({ up: ({ context }) => migration20250616Replace.up({ context }), down: ({ context }) => migration20250616Replace.down({ context }), }, + { + name: '20251226-job-processing', + up: ({ context }) => migration20251226JobProcessing.up({ context }), + down: ({ context }) => migration20251226JobProcessing.down({ context }), + }, ], context: sequelize.getQueryInterface(), storage: new SequelizeStorage({ sequelize }), diff --git a/packages/crawler/src/store/migrations/20251226-job-processing.ts b/packages/crawler/src/store/migrations/20251226-job-processing.ts new file mode 100644 index 0000000..093a428 --- /dev/null +++ b/packages/crawler/src/store/migrations/20251226-job-processing.ts @@ -0,0 +1,27 @@ +/* eslint-disable no-console */ +import { DataTypes } from '@sequelize/core'; + +export async function up({ context }) { + console.log('[20251226-job-processing:up] Migrating...'); + + await context.addColumn('jobs', 'processingBy', { + type: DataTypes.STRING(32), + allowNull: true, + }); + + await context.addColumn('jobs', 'processingAt', { + type: DataTypes.INTEGER, + allowNull: true, + }); + + console.log('[20251226-job-processing:up] Migrated successfully!'); +} + +export async function down({ context }) { + console.log('[20251226-job-processing:down] Migrating...'); + + await context.removeColumn('jobs', 'processingBy'); + await context.removeColumn('jobs', 'processingAt'); + + console.log('[20251226-job-processing:down] Migrated successfully!'); +} diff --git a/packages/middleware/package.json b/packages/middleware/package.json index f5a30a0..d3396fa 100644 --- a/packages/middleware/package.json +++ b/packages/middleware/package.json @@ -1,6 +1,6 @@ { "name": "@arcblock/crawler-middleware", - "version": "1.4.7", + "version": "1.5.0", "main": "lib/cjs/index.js", "module": "lib/esm/index.js", "types": "lib/cjs/index.d.ts", diff --git a/packages/middleware/src/index.ts b/packages/middleware/src/index.ts index 557f4dc..894a491 100644 --- a/packages/middleware/src/index.ts +++ b/packages/middleware/src/index.ts @@ -65,6 +65,7 @@ export function createSnapshotMiddleware({ const fullUrl = getFullUrl(req); + // @ts-ignore if (!isSpider(req) || isSelfCrawler(req) || isStaticFile(req)) { return next(); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index ca9f418..d518612 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -207,6 +207,9 @@ importers: p-map: specifier: ^7.0.3 version: 7.0.3 + prom-client: + specifier: ^15.1.3 + version: 15.1.3 robots-parser: specifier: ^3.0.1 version: 3.0.1 @@ -893,6 +896,10 @@ packages: '@ocap/wallet@1.27.14': resolution: {integrity: sha512-Axn83Apqd8lUMJxiWx23HYpw7McN7na1Xjw1NkStcJRxA4CVum9Xr96nT8V2RlEkWASzoUsYnH5fvzetQDvpfA==} + '@opentelemetry/api@1.9.0': + resolution: {integrity: sha512-3giAOQvZiH5F9bMlMiv8+GSPMeqg0dbaeo58/0SlA9sxSqZhnUtxzX9/2FzyhS9sWQf5S0GJE0AKBrFqjpeYcg==} + engines: {node: '>=8.0.0'} + '@peculiar/asn1-android@2.3.16': resolution: {integrity: sha512-a1viIv3bIahXNssrOIkXZIlI2ePpZaNmR30d4aBL99mu2rO+mT9D6zBsp7H6eROWGtmwv0Ionp5olJurIo09dw==} @@ -1792,6 +1799,9 @@ packages: bindings@1.5.0: resolution: {integrity: sha512-p2q/t/mhvuOj/UeLlV6566GD/guowlr0hHxClI0W9m7MWYkL1F0hLo+0Aexs9HSPCtR1SXQ0TD3MMKrXZajbiQ==} + bintrees@1.0.2: + resolution: {integrity: sha512-VOMgTMwjAaUG580SXn3LacVgjurrbMme7ZZNYGSSV7mmtY6QQRh0Eg3pwIcntQ77DErK1L0NxkbetjcoXzVwKw==} + bl@1.2.3: resolution: {integrity: sha512-pvcNpa0UU69UT341rO6AYy4FVAIkUHuZXRIWbq+zHnsVcRzDDjIAhGuuYoi0d//cwIwtt4pkpKycWEfjdV+vww==} @@ -4697,6 +4707,10 @@ packages: resolution: {integrity: sha512-7PiHtLll5LdnKIMw100I+8xJXR5gW2QwWYkT6iJva0bXitZKa/XMrSbdmg3r2Xnaidz9Qumd0VPaMrZlF9V9sA==} engines: {node: '>=0.4.0'} + prom-client@15.1.3: + resolution: {integrity: sha512-6ZiOBfCywsD4k1BN9IX0uZhF+tJkV8q8llP64G5Hajs4JOeVLPCwpPVcpXy3BwYiUGgyJzsJJQeOIv7+hDSq8g==} + engines: {node: ^16 || ^18 || >=20} + promise-inflight@1.0.1: resolution: {integrity: sha512-6zWPyEOFaQBJYcGMHBKTKJ3u6TBsnMFOIZSa6ce1e/ZrrsOlnHRHbabMjLiBYKp+n44X9eUI6VUPaukCXHuG4g==} peerDependencies: @@ -5538,6 +5552,9 @@ packages: resolution: {integrity: sha512-5KcOFzPvd1nGFVrmB7H4+QAWVjYOf//+QTbOj0GpXbqtqbKGWVczG+rq6VhXAtdtlKLTs16NAmHRyF5vbggQ2w==} engines: {node: '>=8'} + tdigest@0.1.2: + resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==} + text-decoder@1.2.3: resolution: {integrity: sha512-3/o9z3X0X0fTupwsYvR03pJ/DjWuqqrfwBgTQzdWDiQSm9KitAyz/9WqsT2JQW7KV2m+bC2ol/zqpW37NHxLaA==} @@ -7301,6 +7318,8 @@ snapshots: transitivePeerDependencies: - supports-color + '@opentelemetry/api@1.9.0': {} + '@peculiar/asn1-android@2.3.16': dependencies: '@peculiar/asn1-schema': 2.3.15 @@ -8361,6 +8380,8 @@ snapshots: dependencies: file-uri-to-path: 1.0.0 + bintrees@1.0.2: {} + bl@1.2.3: dependencies: readable-stream: 2.3.8 @@ -11671,6 +11692,11 @@ snapshots: progress@2.0.3: {} + prom-client@15.1.3: + dependencies: + '@opentelemetry/api': 1.9.0 + tdigest: 0.1.2 + promise-inflight@1.0.1: optional: true @@ -12709,6 +12735,10 @@ snapshots: arrify: 2.0.1 execa: 3.4.0 + tdigest@0.1.2: + dependencies: + bintrees: 1.0.2 + text-decoder@1.2.3: dependencies: b4a: 1.6.7 diff --git a/version b/version index b000a6a..3e1ad72 100644 --- a/version +++ b/version @@ -1 +1 @@ -1.4.7 \ No newline at end of file +1.5.0 \ No newline at end of file