diff --git a/src/common/cron.ts b/src/common/cron.ts new file mode 100644 index 0000000..fab5b26 --- /dev/null +++ b/src/common/cron.ts @@ -0,0 +1,3 @@ +export enum FunkyCronExpression { + EVERY_3_MINUTES = '*/3 * * * *', +} diff --git a/src/common/date/date-buckets.ts b/src/common/date/date-buckets.ts index d4e2a46..e999a44 100644 --- a/src/common/date/date-buckets.ts +++ b/src/common/date/date-buckets.ts @@ -264,3 +264,31 @@ export const generateSeriesRecordPoints = >( ), })); }; + +export const generateSeriesTileCountPoints = ( + dates: DatePoint[], + counts: number[], + range: PartialDateRange, +): SeriesCountPoint[] => { + return generateSeriesPoints(counts, dates, range).map( + (e) => + new SeriesCountPoint({ + date: e.date, + value: e.value.reduce((sum, count) => sum + count, 0), + }), + ); +}; + +export const generateSeriesLastTileCountPoints = ( + dates: DatePoint[], + counts: number[], + range: PartialDateRange, +): SeriesCountPoint[] => { + return generateSeriesPoints(counts, dates, range).map( + (e) => + new SeriesCountPoint({ + date: e.date, + value: e.value.length > 0 ? e.value[e.value.length - 1]! : 0, + }), + ); +}; diff --git a/src/common/index.ts b/src/common/index.ts index 21e922c..89bb88c 100644 --- a/src/common/index.ts +++ b/src/common/index.ts @@ -1,6 +1,7 @@ +export * from './date'; export * from './case'; export * from './chart.dto'; -export * from './date'; +export * from './cron'; export * from './id-range.dto'; export * from './limit'; export * from './logs'; @@ -10,4 +11,5 @@ export * from './raw'; export * from './repository'; export * from './request-context'; export * from './seed'; +export * from './tile'; export * from './timezone'; diff --git a/src/common/tile.ts b/src/common/tile.ts new file mode 100644 index 0000000..b0bc648 --- /dev/null +++ b/src/common/tile.ts @@ -0,0 +1,209 @@ +import { add, isEqual, min } from 'date-fns'; +import { Repository } from 'typeorm'; + +import { DateRange, PartialDateRange, TimeScale, startOf } from './date'; + +export enum TileType { + uploadHourly = 'upload_hourly', + postPendingHourly = 'post_pending_hourly', +} + +export interface TileService { + /** + * Interval in hours between tiles of this type. + */ + interval: number; + + /** + * Get the available tiling ranges for this tile type. + */ + getRanges: () => Promise; + + /** + * Find missing tiles in the given date range. + */ + findMissing: (range: TilingRange) => Promise; + + /** + * Wipe all tiles of this type, or tiles within a date range if specified. + */ + wipe: (range?: PartialDateRange) => Promise; +} + +export interface TilingRange { + /** + * The date range for the tiles. + */ + dateRange: DateRange; + + /** + * The minimum updatedAt timestamp for the tiles in this range. + */ + updatedAt?: Date; + + /** + * Type identifier for this range. Undefined when not applicable. + */ + type?: string; +} + +export const getTilingRanges = ( + manifests: Array, + types: string[], +): TilingRange[] => { + const events: Array<{ + time: Date; + type: string; + delta: number; + updatedAt?: Date; + }> = []; + + for (const manifest of manifests) { + const type = manifest.type ?? 'default'; + const start = startOf(TimeScale.Hour, manifest.dateRange.startDate); + const endFloor = startOf(TimeScale.Hour, manifest.dateRange.endDate); + const end = + endFloor.getTime() === manifest.dateRange.endDate.getTime() + ? endFloor + : add(endFloor, { hours: 1 }); + + events.push({ time: start, type, delta: 1, updatedAt: manifest.updatedAt }); + events.push({ time: end, type, delta: -1, updatedAt: manifest.updatedAt }); + } + + events.sort((a, b) => { + const diff = a.time.getTime() - b.time.getTime(); + if (diff !== 0) return diff; + // We want splits in ranges, to make them manageable. + // To return one continuous range, this could be flipped. + return a.delta - b.delta; + }); + + const depths = new Map(); + + const ranges: TilingRange[] = []; + let start: Date | null = null; + let updated: Date | undefined = undefined; + + for (const event of events) { + const prevDepth = depths.get(event.type) ?? 0; + const newDepth = prevDepth + event.delta; + depths.set(event.type, newDepth); + + const active = types.every((type) => (depths.get(type) ?? 0) > 0); + + if (active) { + if (start === null) { + start = event.time; + } + if (event.updatedAt && (!updated || event.updatedAt > updated)) { + updated = event.updatedAt; + } + } else if (start !== null) { + ranges.push({ + dateRange: new DateRange({ + startDate: start, + endDate: event.time, + scale: TimeScale.Hour, + }), + updatedAt: updated, + }); + start = null; + updated = undefined; + } + } + + return ranges; +}; + +export const groupTimesIntoRanges = (times: Date[]): DateRange[] => { + if (times.length === 0) return []; + + const sorted = [...times].sort((a, b) => a.getTime() - b.getTime()); + const ranges: DateRange[] = []; + + let start = sorted[0]!; + let end = add(sorted[0]!, { hours: 1 }); + + for (const current of sorted.slice(1)) { + if (isEqual(current, end)) { + end = add(current, { hours: 1 }); + } else { + ranges.push( + new DateRange({ + startDate: start, + endDate: end, + scale: TimeScale.Hour, + }), + ); + start = current; + end = add(current, { hours: 1 }); + } + } + + ranges.push( + new DateRange({ + startDate: start, + endDate: end, + scale: TimeScale.Hour, + }), + ); + + return ranges; +}; + +export const chunkDateRange = ( + range: DateRange, + maxHours: number, +): DateRange[] => { + const chunks: DateRange[] = []; + let current = range.startDate; + + while (current < range.endDate) { + const chunkEnd = add(current, { hours: maxHours }); + const actualEnd = min([chunkEnd, range.endDate]); + + chunks.push( + new DateRange({ + startDate: current, + endDate: actualEnd, + scale: TimeScale.Hour, + }), + ); + + current = actualEnd; + } + + return chunks; +}; + +export interface WithTileTime { + time: Date; + updatedAt: Date; +} + +export async function findMissingOrStaleTiles( + repository: Repository, + range: TilingRange, +): Promise { + const startTime = startOf(TimeScale.Hour, range.dateRange.startDate); + const endTime = startOf(TimeScale.Hour, range.dateRange.endDate); + const tableName = repository.metadata.tableName; + + const query = ` + SELECT series.hour AS time + FROM generate_series( + $1::timestamptz, + $2::timestamptz - interval '1 hour', + interval '1 hour' + ) AS series(hour) + LEFT JOIN ${tableName} tile ON tile.time = series.hour + WHERE tile.time IS NULL + OR ($3::timestamptz IS NOT NULL AND tile.updated_at < $3::timestamptz) + `; + + const params = [startTime, endTime, range.updatedAt || null]; + const result = await repository.query(query, params); + + return result.map((row: { time: Date }) => row.time); +} diff --git a/src/health/health.module.ts b/src/health/health.module.ts index 92d9104..ee1cff2 100644 --- a/src/health/health.module.ts +++ b/src/health/health.module.ts @@ -2,9 +2,10 @@ import { Module } from '@nestjs/common'; import { HealthController } from './health.controller'; import { ManifestHealthModule } from './manifests/manifest-health.module'; +import { TileHealthModule } from './tiles/tile-health.module'; @Module({ - imports: [ManifestHealthModule], + imports: [ManifestHealthModule, TileHealthModule], controllers: [HealthController], }) export class HealthModule {} diff --git a/src/health/tiles/index.ts b/src/health/tiles/index.ts new file mode 100644 index 0000000..7a1d7b9 --- /dev/null +++ b/src/health/tiles/index.ts @@ -0,0 +1,5 @@ +export * from './tile-health.controller'; +export * from './tile-health.dto'; +export * from './tile-health.module'; +export * from './tile-health.service'; +export * from './tile-health.utils'; diff --git a/src/health/tiles/tile-health.controller.ts b/src/health/tiles/tile-health.controller.ts new file mode 100644 index 0000000..51379d9 --- /dev/null +++ b/src/health/tiles/tile-health.controller.ts @@ -0,0 +1,71 @@ +import { + Controller, + Delete, + Get, + Param, + Query, + UseGuards, +} from '@nestjs/common'; +import { + ApiBearerAuth, + ApiOperation, + ApiParam, + ApiResponse, + ApiTags, +} from '@nestjs/swagger'; +import { ServerAdminGuard } from 'src/auth/auth.guard'; +import { PartialDateRange, TileType } from 'src/common'; +import { PaginationParams } from 'src/common/pagination.dto'; + +import { TileHealth } from './tile-health.dto'; +import { TileHealthService } from './tile-health.service'; + +@ApiTags('Health') +@Controller('health/tiles') +export class TileHealthController { + constructor(private readonly tileHealthService: TileHealthService) {} + + @Get() + @ApiOperation({ + summary: 'Retrieve tile health', + description: 'Retrieve tile health and coverage information', + operationId: 'getTileHealth', + }) + @ApiResponse({ + status: 200, + description: 'Tile health information', + type: [TileHealth], + }) + @UseGuards(ServerAdminGuard) + @ApiBearerAuth() + async getTileHealth( + @Query() pages?: PaginationParams, + ): Promise { + return this.tileHealthService.tiles(pages); + } + + // This is kind of awkward, being handled in the health controller. + @Delete(':type') + @ApiOperation({ + summary: 'Delete all tiles of a type', + description: 'Delete all tiles of the specified type', + operationId: 'deleteTilesByType', + }) + @ApiParam({ + name: 'type', + enum: TileType, + description: 'The type of tiles to delete', + }) + @ApiResponse({ + status: 204, + description: 'Tiles deleted successfully', + }) + @UseGuards(ServerAdminGuard) + @ApiBearerAuth() + async deleteTilesByType( + @Param('type') type: TileType, + @Query() range?: PartialDateRange, + ): Promise { + return this.tileHealthService.deleteTilesByType(type, range); + } +} diff --git a/src/health/tiles/tile-health.dto.ts b/src/health/tiles/tile-health.dto.ts new file mode 100644 index 0000000..6f65cd2 --- /dev/null +++ b/src/health/tiles/tile-health.dto.ts @@ -0,0 +1,29 @@ +import { ApiProperty } from '@nestjs/swagger'; +import { Raw, TileType } from 'src/common'; + +export class TileSlice { + constructor(value: Raw) { + Object.assign(this, value); + } + + startDate: Date; + endDate: Date; + + available: number; + unavailable: number; + none: number; +} + +export class TileHealth { + constructor(value: Raw) { + Object.assign(this, value); + } + + @ApiProperty({ enum: TileType, enumName: 'TileType' }) + type: TileType; + startDate: Date; + endDate: Date; + expected: number; + actual: number; + slices: TileSlice[]; +} diff --git a/src/health/tiles/tile-health.module.ts b/src/health/tiles/tile-health.module.ts new file mode 100644 index 0000000..f48d95a --- /dev/null +++ b/src/health/tiles/tile-health.module.ts @@ -0,0 +1,15 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { UploadTilesModule } from 'src/upload/tiles/upload-tiles.module'; + +import { TileHealthController } from './tile-health.controller'; +import { TileHealthService } from './tile-health.service'; + +@Module({ + imports: [TypeOrmModule.forFeature([ManifestEntity]), UploadTilesModule], + controllers: [TileHealthController], + providers: [TileHealthService], + exports: [TileHealthService], +}) +export class TileHealthModule {} diff --git a/src/health/tiles/tile-health.service.ts b/src/health/tiles/tile-health.service.ts new file mode 100644 index 0000000..43332eb --- /dev/null +++ b/src/health/tiles/tile-health.service.ts @@ -0,0 +1,93 @@ +import { Injectable } from '@nestjs/common'; +import { Cacheable } from 'src/app/browser.module'; +import { + PaginationParams, + PartialDateRange, + TileService, + TileType, +} from 'src/common'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { UploadTilesEntity } from 'src/upload/tiles/upload-tiles.entity'; +import { UploadTilesService } from 'src/upload/tiles/upload-tiles.service'; + +import { TileHealth } from './tile-health.dto'; +import { generateTileSlices } from './tile-health.utils'; + +@Injectable() +export class TileHealthService { + constructor(private readonly uploadTilesService: UploadTilesService) {} + + private tileServices: Partial> = { + [TileType.uploadHourly]: this.uploadTilesService, + }; + + @Cacheable({ + prefix: 'tile-health', + ttl: 15 * 60 * 1000, + dependencies: [ManifestEntity, UploadTilesEntity], + }) + async tiles(pages?: PaginationParams): Promise { + const health: TileHealth[] = []; + pages = new PaginationParams({ + limit: 5, + ...pages, + }); + + const tileTypes = Object.values(TileType).slice( + PaginationParams.calculateOffset(pages), + PaginationParams.calculateOffset(pages) + (pages.limit || 5), + ); + + for (const tileType of tileTypes) { + const service = this.tileServices[tileType]; + if (!service) continue; + + const ranges = await service.getRanges(); + if (ranges.length === 0) continue; + + for (const range of ranges) { + const { startDate, endDate } = range.dateRange; + + const totalHours = Math.ceil( + (endDate.getTime() - startDate.getTime()) / (1000 * 60 * 60), + ); + const expected = Math.ceil(totalHours / service.interval); + + const missingTimes = await service.findMissing(range); + const actual = expected - missingTimes.length; + + const slices = generateTileSlices({ + missingTimes: missingTimes.map((time) => ({ time })), + startDate, + endDate, + intervalHours: service.interval, + }); + + health.push( + new TileHealth({ + type: tileType, + startDate, + endDate, + expected, + actual, + slices, + }), + ); + } + } + + return health; + } + + async deleteTilesByType( + tileType: TileType, + range?: PartialDateRange, + ): Promise { + const service = this.tileServices[tileType]; + if (!service) { + throw new Error(`No service found for tile type: ${tileType}`); + } + + await service.wipe(range); + } +} diff --git a/src/health/tiles/tile-health.utils.ts b/src/health/tiles/tile-health.utils.ts new file mode 100644 index 0000000..5e5962a --- /dev/null +++ b/src/health/tiles/tile-health.utils.ts @@ -0,0 +1,72 @@ +import { TileSlice } from './tile-health.dto'; + +export interface TileSliceProps { + missingTimes: { time: Date }[]; + startDate: Date; + endDate: Date; + intervalHours?: number; + maxSlices?: number; +} + +export const generateTileSlices = ({ + missingTimes, + startDate, + endDate, + intervalHours = 1, + maxSlices = 30, +}: TileSliceProps): TileSlice[] => { + const totalHours = Math.ceil( + (endDate.getTime() - startDate.getTime()) / (1000 * 60 * 60), + ); + const totalTiles = Math.ceil(totalHours / intervalHours); + const sliceCount = Math.min(maxSlices, totalTiles); + const tilesPerSlice = Math.ceil(totalTiles / sliceCount); + + const slices: TileSlice[] = []; + let currentTime = new Date(startDate); + let timeIndex = 0; + + for (let i = 0; i < sliceCount; i++) { + const sliceStart = new Date(currentTime); + const sliceEnd = new Date( + Math.min( + currentTime.getTime() + tilesPerSlice * intervalHours * 60 * 60 * 1000, + endDate.getTime(), + ), + ); + + let available = 0; + let unavailable = 0; + + while ( + timeIndex < missingTimes.length && + missingTimes[timeIndex]!.time < sliceEnd + ) { + if (missingTimes[timeIndex]!.time >= sliceStart) { + unavailable++; + } + timeIndex++; + } + + const expectedInSlice = Math.ceil( + (sliceEnd.getTime() - sliceStart.getTime()) / + (intervalHours * 60 * 60 * 1000), + ); + available = Math.max(0, expectedInSlice - unavailable); + const none = tilesPerSlice - (available + unavailable); + + slices.push( + new TileSlice({ + startDate: sliceStart, + endDate: sliceEnd, + available, + unavailable, + none, + }), + ); + + currentTime = sliceEnd; + } + + return slices; +}; diff --git a/src/job/job.entity.ts b/src/job/job.entity.ts index 156cb3c..01de2f9 100644 --- a/src/job/job.entity.ts +++ b/src/job/job.entity.ts @@ -54,6 +54,11 @@ export interface JobOptions { */ key?: string; + /** + * The queue for this job. Jobs within a queue are processed sequentially. + */ + queue?: string; + /** * The metadata for this job. * Can contain any arbitrary data that the job needs to run. @@ -82,6 +87,7 @@ export class Job { this.id = Job.idCounter++; this.title = options.title || `Untitled Job`; this.key = options.key; + this.queue = options.queue || 'default'; this.metadata = options.metadata; this.cancelToken = options.cancelToken || new JobCancelToken(); this.timeout = options.timeout; @@ -109,6 +115,11 @@ export class Job { */ readonly key?: string; + /** + * The queue for this job. Jobs within a queue are processed sequentially. + */ + readonly queue: string; + /** * The metadata for this job. * Can contain any arbitrary data that the job needs to run. diff --git a/src/job/job.service.ts b/src/job/job.service.ts index 93039df..4f770fe 100644 --- a/src/job/job.service.ts +++ b/src/job/job.service.ts @@ -7,29 +7,38 @@ import { Job, JobCancelError } from './job.entity'; @Injectable() export class JobService { private jobs: Job[] = []; - private queue: Job[] = []; - private isProcessing = false; + private queues: Map[]> = new Map(); + private processingFlags: Map = new Map(); private readonly logger = new Logger(JobService.name); add(job: Job): void { + const queueKey = job.queue; + + if (!this.queues.has(queueKey)) { + this.queues.set(queueKey, []); + this.processingFlags.set(queueKey, false); + } + + const queue = this.queues.get(queueKey)!; + if (job.key) { - const existingJob = this.queue.find((j) => j.key === job.key); + const existingJob = queue.find((j) => j.key === job.key); if (existingJob) { this.logger.log( - `Job with key "${job.key}" already queued. Skipping duplicate.`, + `[${queueKey}] Job with key "${job.key}" already queued. Skipping duplicate.`, ); return; } } // limit the queue to 1000 items to prevent potential infinite backlog: - if (this.queue.length >= 1000) { + if (queue.length >= 1000) { this.logger.warn( - `Queue is full. Skipping job "${job.title}" with key "${job.key}".`, + `[${queueKey}] Queue is full. Skipping job "${job.title}" with key "${job.key}".`, ); return; } - this.queue.push(job as Job); + queue.push(job as Job); // limit the jobs array to 5000 items to prevent outrageous memory usage: if (this.jobs.length >= 5000) { @@ -38,19 +47,24 @@ export class JobService { this.jobs.push(job as Job); this.logger.log( - `[#${job.id}] [${job.title}] added to the queue. (${this.queue.length} jobs in queue)`, + `[#${job.id}] [${job.title}] added to [${queueKey}] queue. (${queue.length} jobs in queue)`, ); - void this.processQueue(); + void this.processQueue(queueKey); } - private async processQueue(): Promise { - if (this.isProcessing) return; + private async processQueue(queueKey: string): Promise { + if (this.processingFlags.get(queueKey)) return; - this.isProcessing = true; - while (this.queue.length > 0) { - const job = this.queue.shift(); + const queue = this.queues.get(queueKey); + if (!queue) return; + + this.processingFlags.set(queueKey, true); + while (queue.length > 0) { + const job = queue.shift(); if (job) { - this.logger.log(`[#${job.id}] [${job.title}] is starting`); + this.logger.log( + `[#${job.id}] [${job.title}] is starting on [${queueKey}] queue`, + ); try { job.cancelToken.ensureRunning(); let timeout: NodeJS.Timeout | undefined; @@ -81,20 +95,26 @@ export class JobService { } } } - this.logger.log(`(${this.queue.length} jobs remaining in queue)`); + this.logger.log( + `[${queueKey}] (${queue.length} jobs remaining in queue)`, + ); } - this.isProcessing = false; + this.processingFlags.delete(queueKey); + this.queues.delete(queueKey); } cancel(jobId: number, reason?: string): void { - const job = this.queue.find((j) => j.id === jobId); - if (job) { - job.cancelToken.cancel(reason); - this.logger.warn( - `Job [#${jobId}] ${job.title} has been marked as cancelled${ - reason ? `: ${reason}` : '.' - }`, - ); + for (const queue of this.queues.values()) { + const job = queue.find((j) => j.id === jobId); + if (job) { + job.cancelToken.cancel(reason); + this.logger.warn( + `Job [#${jobId}] ${job.title} has been marked as cancelled${ + reason ? `: ${reason}` : '.' + }`, + ); + return; + } } } diff --git a/src/migration/1763760586158-AddManifestUpdatedAt.ts b/src/migration/1763760586158-AddManifestUpdatedAt.ts new file mode 100644 index 0000000..ca9fda6 --- /dev/null +++ b/src/migration/1763760586158-AddManifestUpdatedAt.ts @@ -0,0 +1,15 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddManifestUpdatedAt1763760586158 implements MigrationInterface { + name = 'AddManifestUpdatedAt1763760586158'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `ALTER TABLE "manifests" ADD "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now()`, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`ALTER TABLE "manifests" DROP COLUMN "updated_at"`); + } +} diff --git a/src/migration/1766948259235-CreateTileTables.ts b/src/migration/1766948259235-CreateTileTables.ts new file mode 100644 index 0000000..167e54a --- /dev/null +++ b/src/migration/1766948259235-CreateTileTables.ts @@ -0,0 +1,31 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class CreateTileTables1766948259235 implements MigrationInterface { + name = 'CreateTileTables1766948259235'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "upload_hourly_tiles" ("time" TIMESTAMP WITH TIME ZONE NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "count" integer NOT NULL DEFAULT '0', CONSTRAINT "PK_956c68f0e0657b1f8dbab067c0d" PRIMARY KEY ("time"))`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_956c68f0e0657b1f8dbab067c0" ON "upload_hourly_tiles" ("time") `, + ); + await queryRunner.query( + `CREATE TABLE "post_pending_hourly_tiles" ("time" TIMESTAMP WITH TIME ZONE NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "count" integer NOT NULL DEFAULT '0', CONSTRAINT "PK_1d110b6bc773b0ce822abf25120" PRIMARY KEY ("time"))`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_1d110b6bc773b0ce822abf2512" ON "post_pending_hourly_tiles" ("time") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `DROP INDEX "public"."IDX_1d110b6bc773b0ce822abf2512"`, + ); + await queryRunner.query(`DROP TABLE "post_pending_hourly_tiles"`); + await queryRunner.query( + `DROP INDEX "public"."IDX_956c68f0e0657b1f8dbab067c0"`, + ); + await queryRunner.query(`DROP TABLE "upload_hourly_tiles"`); + } +} diff --git a/src/migration/1766949660258-CreatePostLifecycle.ts b/src/migration/1766949660258-CreatePostLifecycle.ts new file mode 100644 index 0000000..4791c38 --- /dev/null +++ b/src/migration/1766949660258-CreatePostLifecycle.ts @@ -0,0 +1,39 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class CreatePostLifecycle1766949660258 implements MigrationInterface { + name = 'CreatePostLifecycle1766949660258'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "post_lifecycle" ("post_id" integer NOT NULL, "uploaded_at" TIMESTAMP WITH TIME ZONE, "approved_at" TIMESTAMP WITH TIME ZONE, "deleted_at" TIMESTAMP WITH TIME ZONE, "permitted_at" TIMESTAMP WITH TIME ZONE, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), CONSTRAINT "PK_5d5823871bcb925dafaf8bd1217" PRIMARY KEY ("post_id"))`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_f01b0384610fe312d0734fde2c" ON "post_lifecycle" ("permitted_at") `, + ); + await queryRunner.query( + `CREATE INDEX "IDX_651dc390d8b9d55b368d4b0d6e" ON "post_lifecycle" ("deleted_at") `, + ); + await queryRunner.query( + `CREATE INDEX "IDX_f39a0e7fa1d7263e1dcca02d67" ON "post_lifecycle" ("approved_at") `, + ); + await queryRunner.query( + `CREATE INDEX "IDX_b15705d49a00bc4257c9042ceb" ON "post_lifecycle" ("uploaded_at") `, + ); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `DROP INDEX "public"."IDX_b15705d49a00bc4257c9042ceb"`, + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_f39a0e7fa1d7263e1dcca02d67"`, + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_651dc390d8b9d55b368d4b0d6e"`, + ); + await queryRunner.query( + `DROP INDEX "public"."IDX_f01b0384610fe312d0734fde2c"`, + ); + await queryRunner.query(`DROP TABLE "post_lifecycle"`); + } +} diff --git a/src/migration/1766950226804-DropPostPendingTiles.ts b/src/migration/1766950226804-DropPostPendingTiles.ts new file mode 100644 index 0000000..b9f5056 --- /dev/null +++ b/src/migration/1766950226804-DropPostPendingTiles.ts @@ -0,0 +1,21 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class DropPostPendingTiles1766950226804 implements MigrationInterface { + name = 'DropPostPendingTiles1766950226804'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `DROP INDEX "public"."IDX_1d110b6bc773b0ce822abf2512"`, + ); + await queryRunner.query(`DROP TABLE "post_pending_hourly_tiles"`); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query( + `CREATE TABLE "post_pending_hourly_tiles" ("time" TIMESTAMP WITH TIME ZONE NOT NULL, "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT now(), "count" integer NOT NULL DEFAULT '0', CONSTRAINT "PK_1d110b6bc773b0ce822abf25120" PRIMARY KEY ("time"))`, + ); + await queryRunner.query( + `CREATE INDEX "IDX_1d110b6bc773b0ce822abf2512" ON "post_pending_hourly_tiles" ("time")`, + ); + } +} diff --git a/src/post/lifecycle/post-lifecycle.entity.ts b/src/post/lifecycle/post-lifecycle.entity.ts new file mode 100644 index 0000000..f73187f --- /dev/null +++ b/src/post/lifecycle/post-lifecycle.entity.ts @@ -0,0 +1,36 @@ +import { + Column, + Entity, + Index, + PrimaryColumn, + UpdateDateColumn, +} from 'typeorm'; + +@Entity('post_lifecycle') +@Index(['uploadedAt']) +@Index(['approvedAt']) +@Index(['deletedAt']) +@Index(['permittedAt']) +export class PostLifecycleEntity { + constructor(partial?: Partial) { + Object.assign(this, partial); + } + + @PrimaryColumn({ type: 'int' }) + postId: number; + + @Column({ type: 'timestamptz', nullable: true }) + uploadedAt: Date | null; + + @Column({ type: 'timestamptz', nullable: true }) + approvedAt: Date | null; + + @Column({ type: 'timestamptz', nullable: true }) + deletedAt: Date | null; + + @Column({ type: 'timestamptz', nullable: true }) + permittedAt: Date | null; + + @UpdateDateColumn({ type: 'timestamptz' }) + updatedAt: Date; +} diff --git a/src/post/lifecycle/post-lifecycle.module.ts b/src/post/lifecycle/post-lifecycle.module.ts new file mode 100644 index 0000000..a4498da --- /dev/null +++ b/src/post/lifecycle/post-lifecycle.module.ts @@ -0,0 +1,34 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { JobModule } from 'src/job/job.module'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { PermitEntity } from 'src/permit/permit.entity'; +import { PostEventEntity } from 'src/post-event/post-event.entity'; +import { PostVersionEntity } from 'src/post-version/post-version.entity'; + +import { PostLifecycleEntity } from './post-lifecycle.entity'; +import { PostLifecycleService } from './post-lifecycle.service'; +import { PermitLifecycleWorker } from './workers/permit-lifecycle.worker'; +import { PostEventLifecycleWorker } from './workers/post-event-lifecycle.worker'; +import { UploadLifecycleWorker } from './workers/upload-lifecycle.worker'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([ + PostLifecycleEntity, + PostVersionEntity, + PostEventEntity, + PermitEntity, + ManifestEntity, + ]), + JobModule, + ], + providers: [ + PostLifecycleService, + UploadLifecycleWorker, + PostEventLifecycleWorker, + PermitLifecycleWorker, + ], + exports: [PostLifecycleService], +}) +export class PostLifecycleModule {} diff --git a/src/post/lifecycle/post-lifecycle.service.ts b/src/post/lifecycle/post-lifecycle.service.ts new file mode 100644 index 0000000..4ca23c5 --- /dev/null +++ b/src/post/lifecycle/post-lifecycle.service.ts @@ -0,0 +1,111 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Invalidates } from 'src/app/browser.module'; +import { Repository } from 'typeorm'; + +import { PostLifecycleEntity } from './post-lifecycle.entity'; + +export interface LifecycleUploadData { + postId: number; + uploadedAt: Date; +} + +export interface LifecycleApprovalData { + postId: number; + approvedAt: Date | null; +} + +export interface LifecycleDeletionData { + postId: number; + deletedAt: Date | null; +} + +export interface LifecyclePermitData { + postId: number; + permittedAt: Date; +} + +@Injectable() +export class PostLifecycleService { + constructor( + @InjectRepository(PostLifecycleEntity) + private readonly lifecycleRepository: Repository, + ) {} + + @Invalidates(PostLifecycleEntity) + async upsertUploads(data: LifecycleUploadData[]): Promise { + if (data.length === 0) return; + + await this.lifecycleRepository + .createQueryBuilder() + .insert() + .into(PostLifecycleEntity) + .values( + data.map((d) => ({ + postId: d.postId, + uploadedAt: d.uploadedAt, + })), + ) + .orUpdate(['uploaded_at', 'updated_at'], ['post_id']) + .execute(); + } + + @Invalidates(PostLifecycleEntity) + async upsertApprovals(data: LifecycleApprovalData[]): Promise { + if (data.length === 0) return; + + await this.lifecycleRepository + .createQueryBuilder() + .insert() + .into(PostLifecycleEntity) + .values( + data.map((d) => ({ + postId: d.postId, + approvedAt: d.approvedAt, + })), + ) + .orUpdate(['approved_at', 'updated_at'], ['post_id']) + .execute(); + } + + @Invalidates(PostLifecycleEntity) + async upsertDeletions(data: LifecycleDeletionData[]): Promise { + if (data.length === 0) return; + + await this.lifecycleRepository + .createQueryBuilder() + .insert() + .into(PostLifecycleEntity) + .values( + data.map((d) => ({ + postId: d.postId, + deletedAt: d.deletedAt, + })), + ) + .orUpdate(['deleted_at', 'updated_at'], ['post_id']) + .execute(); + } + + @Invalidates(PostLifecycleEntity) + async upsertPermits(data: LifecyclePermitData[]): Promise { + if (data.length === 0) return; + + await this.lifecycleRepository + .createQueryBuilder() + .insert() + .into(PostLifecycleEntity) + .values( + data.map((d) => ({ + postId: d.postId, + permittedAt: d.permittedAt, + })), + ) + .orUpdate(['permitted_at', 'updated_at'], ['post_id']) + .execute(); + } + + @Invalidates(PostLifecycleEntity) + async wipe(): Promise { + await this.lifecycleRepository.clear(); + } +} diff --git a/src/post/lifecycle/workers/permit-lifecycle.worker.ts b/src/post/lifecycle/workers/permit-lifecycle.worker.ts new file mode 100644 index 0000000..c33aa1b --- /dev/null +++ b/src/post/lifecycle/workers/permit-lifecycle.worker.ts @@ -0,0 +1,89 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { DateRange, FunkyCronExpression, chunkDateRange } from 'src/common'; +import { Job } from 'src/job/job.entity'; +import { JobService } from 'src/job/job.service'; +import { ItemType } from 'src/label/label.entity'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { PermitEntity } from 'src/permit/permit.entity'; +import { MoreThan, Repository } from 'typeorm'; + +import { PostLifecycleService } from '../post-lifecycle.service'; + +@Injectable() +export class PermitLifecycleWorker { + constructor( + private readonly jobService: JobService, + private readonly lifecycleService: PostLifecycleService, + @InjectRepository(ManifestEntity) + private readonly manifestRepository: Repository, + @InjectRepository(PermitEntity) + private readonly permitRepository: Repository, + ) {} + + private readonly logger = new Logger(PermitLifecycleWorker.name); + // TODO: Persist this across restarts + private lastProcessedTime: Date | null = null; + + @Cron(FunkyCronExpression.EVERY_3_MINUTES) + async runSync() { + this.jobService.add( + new Job({ + title: 'Post Permit Lifecycle Sync', + key: `/post-lifecycle/permits`, + queue: 'tiling', + timeout: 1000 * 60 * 5, + execute: async ({ cancelToken }) => { + const manifests = await this.manifestRepository.find({ + where: { + type: ItemType.permits, + ...(this.lastProcessedTime && { + updatedAt: MoreThan(this.lastProcessedTime), + }), + }, + }); + + if (manifests.length === 0) return; + + for (const manifest of manifests) { + cancelToken.ensureRunning(); + + const range = new DateRange({ + startDate: manifest.startDate, + endDate: manifest.endDate, + }); + + const chunks = chunkDateRange(range, 30); + + for (const chunk of chunks) { + cancelToken.ensureRunning(); + + const permits = await this.permitRepository.find({ + where: { + createdAt: chunk.find(), + }, + select: ['id', 'createdAt'], + }); + + if (permits.length === 0) continue; + + this.logger.log( + `Syncing ${permits.length} permits for ${chunk.toE621RangeString()}`, + ); + + await this.lifecycleService.upsertPermits( + permits.map((permit) => ({ + postId: permit.id, + permittedAt: permit.createdAt, + })), + ); + } + } + + this.lastProcessedTime = new Date(); + }, + }), + ); + } +} diff --git a/src/post/lifecycle/workers/post-event-lifecycle.worker.ts b/src/post/lifecycle/workers/post-event-lifecycle.worker.ts new file mode 100644 index 0000000..c807c71 --- /dev/null +++ b/src/post/lifecycle/workers/post-event-lifecycle.worker.ts @@ -0,0 +1,147 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { PostEventAction } from 'src/api'; +import { DateRange, FunkyCronExpression, chunkDateRange } from 'src/common'; +import { Job } from 'src/job/job.entity'; +import { JobService } from 'src/job/job.service'; +import { ItemType } from 'src/label/label.entity'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { PostEventEntity } from 'src/post-event/post-event.entity'; +import { MoreThan, Repository } from 'typeorm'; + +import { PostLifecycleService } from '../post-lifecycle.service'; + +@Injectable() +export class PostEventLifecycleWorker { + constructor( + private readonly jobService: JobService, + private readonly lifecycleService: PostLifecycleService, + @InjectRepository(ManifestEntity) + private readonly manifestRepository: Repository, + @InjectRepository(PostEventEntity) + private readonly postEventRepository: Repository, + ) {} + + private readonly logger = new Logger(PostEventLifecycleWorker.name); + // TODO: Persist this across restarts + private lastProcessedTime: Date | null = null; + + @Cron(FunkyCronExpression.EVERY_3_MINUTES) + async runSync() { + this.jobService.add( + new Job({ + title: 'Post Event Lifecycle Sync', + key: `/post-lifecycle/post-events`, + queue: 'tiling', + timeout: 1000 * 60 * 5, + execute: async ({ cancelToken }) => { + const manifests = await this.manifestRepository.find({ + where: { + type: ItemType.postEvents, + ...(this.lastProcessedTime && { + updatedAt: MoreThan(this.lastProcessedTime), + }), + }, + }); + + if (manifests.length === 0) return; + + for (const manifest of manifests) { + cancelToken.ensureRunning(); + + const range = new DateRange({ + startDate: manifest.startDate, + endDate: manifest.endDate, + }); + + const chunks = chunkDateRange(range, 30); + + for (const chunk of chunks) { + cancelToken.ensureRunning(); + + const events: { + post_id: number; + created_at: Date; + action: string; + }[] = await this.postEventRepository.query( + ` + SELECT DISTINCT ON (pe.post_id) + pe.post_id, + pe.created_at, + pe.action + FROM post_events pe + WHERE pe.action IN ($1, $2, $3, $4) + AND pe.created_at >= $5 AND pe.created_at < $6 + ORDER BY pe.post_id, pe.created_at DESC + `, + [ + PostEventAction.approved, + PostEventAction.unapproved, + PostEventAction.deleted, + PostEventAction.undeleted, + chunk.startDate, + chunk.endDate, + ], + ); + + if (events.length === 0) continue; + + this.logger.log( + `Syncing ${events.length} post events for ${chunk.toE621RangeString()}`, + ); + + const approvals = events + .filter( + (e) => + e.action === PostEventAction.approved || + e.action === PostEventAction.undeleted, + ) + .map((e) => ({ + postId: e.post_id, + approvedAt: e.created_at, + })); + + const unapprovals = events + .filter((e) => e.action === PostEventAction.unapproved) + .map((e) => ({ + postId: e.post_id, + approvedAt: null, + })); + + const deletions = events + .filter((e) => e.action === PostEventAction.deleted) + .map((e) => ({ + postId: e.post_id, + deletedAt: e.created_at, + })); + + const undeletions = events + .filter((e) => e.action === PostEventAction.undeleted) + .map((e) => ({ + postId: e.post_id, + deletedAt: null, + })); + + if (approvals.length > 0 || unapprovals.length > 0) { + await this.lifecycleService.upsertApprovals([ + ...approvals, + ...unapprovals, + ]); + } + + if (deletions.length > 0 || undeletions.length > 0) { + await this.lifecycleService.upsertDeletions([ + ...deletions, + ...undeletions, + ]); + } + } + } + + this.lastProcessedTime = new Date(); + }, + }), + ); + } +} diff --git a/src/post/lifecycle/workers/upload-lifecycle.worker.ts b/src/post/lifecycle/workers/upload-lifecycle.worker.ts new file mode 100644 index 0000000..0d2b2db --- /dev/null +++ b/src/post/lifecycle/workers/upload-lifecycle.worker.ts @@ -0,0 +1,90 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { InjectRepository } from '@nestjs/typeorm'; +import { DateRange, FunkyCronExpression, chunkDateRange } from 'src/common'; +import { Job } from 'src/job/job.entity'; +import { JobService } from 'src/job/job.service'; +import { ItemType } from 'src/label/label.entity'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { PostVersionEntity } from 'src/post-version/post-version.entity'; +import { MoreThan, Repository } from 'typeorm'; + +import { PostLifecycleService } from '../post-lifecycle.service'; + +@Injectable() +export class UploadLifecycleWorker { + constructor( + private readonly jobService: JobService, + private readonly lifecycleService: PostLifecycleService, + @InjectRepository(ManifestEntity) + private readonly manifestRepository: Repository, + @InjectRepository(PostVersionEntity) + private readonly postVersionRepository: Repository, + ) {} + + private readonly logger = new Logger(UploadLifecycleWorker.name); + // TODO: Persist this across restarts + private lastProcessedTime: Date | null = null; + + @Cron(FunkyCronExpression.EVERY_3_MINUTES) + async runSync() { + this.jobService.add( + new Job({ + title: 'Post Upload Lifecycle Sync', + key: `/post-lifecycle/uploads`, + queue: 'tiling', + timeout: 1000 * 60 * 5, + execute: async ({ cancelToken }) => { + const manifests = await this.manifestRepository.find({ + where: { + type: ItemType.postVersions, + ...(this.lastProcessedTime && { + updatedAt: MoreThan(this.lastProcessedTime), + }), + }, + }); + + if (manifests.length === 0) return; + + for (const manifest of manifests) { + cancelToken.ensureRunning(); + + const range = new DateRange({ + startDate: manifest.startDate, + endDate: manifest.endDate, + }); + + const chunks = chunkDateRange(range, 30); + + for (const chunk of chunks) { + cancelToken.ensureRunning(); + + const uploads = await this.postVersionRepository.find({ + where: { + version: 1, + updatedAt: chunk.find(), + }, + select: ['postId', 'updatedAt'], + }); + + if (uploads.length === 0) continue; + + this.logger.log( + `Syncing ${uploads.length} uploads for ${chunk.toE621RangeString()}`, + ); + + await this.lifecycleService.upsertUploads( + uploads.map((upload) => ({ + postId: upload.postId, + uploadedAt: upload.updatedAt, + })), + ); + } + } + + this.lastProcessedTime = new Date(); + }, + }), + ); + } +} diff --git a/src/post/metric/post-metric.module.ts b/src/post/metric/post-metric.module.ts index 3117fa8..27e234d 100644 --- a/src/post/metric/post-metric.module.ts +++ b/src/post/metric/post-metric.module.ts @@ -4,6 +4,7 @@ import { PermitEntity } from 'src/permit/permit.entity'; import { PostVersionEntity } from 'src/post-version/post-version.entity'; import { PostEventEntity } from '../../post-event/post-event.entity'; +import { PostLifecycleEntity } from '../lifecycle/post-lifecycle.entity'; import { PostEntity } from '../post.entity'; import { PostMetricController } from './post-metric.controller'; import { PostMetricService } from './post-metric.service'; @@ -15,6 +16,7 @@ import { PostMetricService } from './post-metric.service'; PostVersionEntity, PostEventEntity, PermitEntity, + PostLifecycleEntity, ]), ], controllers: [PostMetricController], diff --git a/src/post/metric/post-metric.service.ts b/src/post/metric/post-metric.service.ts index 93ddb6f..556632b 100644 --- a/src/post/metric/post-metric.service.ts +++ b/src/post/metric/post-metric.service.ts @@ -1,28 +1,25 @@ import { Injectable } from '@nestjs/common'; import { InjectRepository } from '@nestjs/typeorm'; -import { max, min, startOfMonth, sub } from 'date-fns'; +import { startOfMonth, sub } from 'date-fns'; import { Cacheable } from 'src/app/browser.module'; import { DateRange, PartialDateRange, SeriesCountPoint, - collapseTimeScaleDuration, - convertKeysToCamelCase, - convertKeysToDate, - generateSeriesCountPoints, + TimeScale, + expandInto, + generateSeriesLastTileCountPoints, } from 'src/common'; -import { PermitEntity } from 'src/permit/permit.entity'; -import { PostVersionEntity } from 'src/post-version/post-version.entity'; -import { Brackets, LessThan, Repository } from 'typeorm'; +import { Repository } from 'typeorm'; -import { PostEventEntity } from '../../post-event/post-event.entity'; +import { PostLifecycleEntity } from '../lifecycle/post-lifecycle.entity'; import { PostStatusSummary } from './post-metric.dto'; @Injectable() export class PostMetricService { constructor( - @InjectRepository(PostVersionEntity) - private readonly postVersionRepository: Repository, + @InjectRepository(PostLifecycleEntity) + private readonly lifecycleRepository: Repository, ) {} /** @@ -41,7 +38,7 @@ export class PostMetricService { @Cacheable({ prefix: 'post', ttl: 5 * 60 * 1000, - dependencies: [PostVersionEntity, PostEventEntity, PermitEntity], + dependencies: [PostLifecycleEntity], }) async statusSummary( partialRange?: PartialDateRange, @@ -49,76 +46,43 @@ export class PostMetricService { const range = DateRange.fill(partialRange); const cutOff = this.pendingCutoffDate(range); - const posts = await this.postVersionRepository - .createQueryBuilder('post_version') - .select('post_version.post_id', 'post_id') - .addSelect('MAX(approval_event.created_at)', 'approval_date') - .addSelect('MAX(deletion_event.created_at)', 'deletion_date') - .addSelect('MAX(permit.created_at)', 'permit_date') - .leftJoin( - PostEventEntity, - 'approval_event', - `post_version.post_id = approval_event.post_id AND approval_event.action = 'approved' AND ${this.inRange('approval_event.created_at')}`, - { after: cutOff, before: range.endDate }, - ) - .leftJoin( - PostEventEntity, - 'deletion_event', - `post_version.post_id = deletion_event.post_id AND deletion_event.action = 'deleted' AND ${this.inRange('deletion_event.created_at')}`, - { after: cutOff, before: range.endDate }, - ) - .leftJoin( - PermitEntity, - 'permit', - `post_version.post_id = permit.id AND ${this.inRange('permit.created_at')}`, - { after: cutOff, before: range.endDate }, - ) - .where('post_version.version = 1') - .andWhere('post_version.updated_at >= :cutOff', { cutOff }) - .andWhere( - new Brackets((qb) => { - qb.where({ updatedAt: range.find() }).orWhere( - new Brackets((subQb) => { - subQb - .where('approval_event.created_at IS NULL') - .andWhere('deletion_event.created_at IS NULL') - .andWhere('permit.id IS NULL') - .andWhere('post_version.updated_at < :end', { - end: range.endDate, - }); - }), - ); - }), - ) - .groupBy('post_version.post_id') - .getRawMany<{ - post_id: number; - approval_date: Date | null; - deletion_date: Date | null; - permit_date: Date | null; - }>() - .then((results) => results.map(convertKeysToCamelCase)); - - const approved = posts.filter((result) => result.approvalDate).length; - const deleted = posts.filter((result) => result.deletionDate).length; - const permitted = posts.filter((result) => result.permitDate).length; - const pending = posts.filter( - (result) => - !result.approvalDate && !result.deletionDate && !result.permitDate, - ).length; + const result = await this.lifecycleRepository.query( + ` + SELECT + COUNT(*) FILTER (WHERE approved_at >= $1 AND approved_at < $2) as approved, + COUNT(*) FILTER (WHERE deleted_at >= $1 AND deleted_at < $2) as deleted, + COUNT(*) FILTER (WHERE permitted_at >= $1 AND permitted_at < $2) as permitted, + COUNT(*) FILTER (WHERE + (approved_at IS NULL OR approved_at >= $2) + AND (deleted_at IS NULL OR deleted_at >= $2) + AND (permitted_at IS NULL OR permitted_at >= $2) + ) as pending + FROM post_lifecycle + WHERE uploaded_at >= $1 + AND ( + (uploaded_at >= $3 AND uploaded_at < $2) + OR (uploaded_at < $2 + AND (approved_at IS NULL OR approved_at >= $2) + AND (deleted_at IS NULL OR deleted_at >= $2) + AND (permitted_at IS NULL OR permitted_at >= $2) + ) + ) + `, + [cutOff, range.endDate, range.startDate], + ); return new PostStatusSummary({ - approved, - deleted, - permitted, - pending, + approved: parseInt(result[0]?.approved || '0'), + deleted: parseInt(result[0]?.deleted || '0'), + permitted: parseInt(result[0]?.permitted || '0'), + pending: parseInt(result[0]?.pending || '0'), }); } @Cacheable({ prefix: 'post', - ttl: 10 * 60 * 1000, - dependencies: [PostVersionEntity, PostEventEntity, PermitEntity], + ttl: 5 * 60 * 1000, + dependencies: [PostLifecycleEntity], }) async pendingSeries( partialRange?: PartialDateRange, @@ -126,88 +90,79 @@ export class PostMetricService { const range = DateRange.fill(partialRange); const cutOff = this.pendingCutoffDate(range); - const posts = await this.postVersionRepository - .createQueryBuilder('post_version') - .select('post_version.post_id', 'post_id') - .addSelect('MAX(post_version.updated_at)', 'updated_at') - .addSelect('MIN(approval_event.created_at)', 'approval_date') - .addSelect('MIN(deletion_event.created_at)', 'deletion_date') - .leftJoin( - PostEventEntity, - 'approval_event', - `post_version.post_id = approval_event.post_id AND approval_event.action = 'approved' AND ${this.inRange('approval_event.created_at')}`, - { after: cutOff, before: range.endDate }, - ) - .leftJoin( - PostEventEntity, - 'deletion_event', - `post_version.post_id = deletion_event.post_id AND deletion_event.action = 'deleted' AND ${this.inRange('deletion_event.created_at')}`, - { after: cutOff, before: range.endDate }, - ) - .leftJoin( - PermitEntity, - 'permit', - `post_version.post_id = permit.id AND ${this.inRange('permit.created_at')}`, - { after: cutOff, before: range.endDate }, - ) - .where('post_version.version = 1') - .andWhere('post_version.updated_at >= :cutOff', { cutOff }) - .andWhere({ updatedAt: LessThan(range.endDate) }) - .andWhere('permit.id IS NULL') - .andWhere( - new Brackets((qb) => { - qb.where('approval_event.created_at IS NULL').orWhere( - 'approval_event.created_at > :start', - { start: range.startDate }, - ); - }), + const query = ` + WITH initial_count AS ( + SELECT COUNT(*) AS count + FROM post_lifecycle + WHERE uploaded_at >= $3 + AND uploaded_at < $1 + AND permitted_at IS NULL + AND (approved_at IS NULL OR approved_at >= $1) + AND (deleted_at IS NULL OR deleted_at >= $1) + ), + pending_posts AS ( + SELECT + post_id, + date_trunc('hour', uploaded_at) AS upload_hour, + date_trunc( + 'hour', + COALESCE( + LEAST( + COALESCE(approved_at, $2::timestamptz), + COALESCE(deleted_at, $2::timestamptz) + ), + $2::timestamptz + ) + ) AS handled_hour + FROM post_lifecycle + WHERE uploaded_at >= $3 + AND uploaded_at < $2 + AND permitted_at IS NULL + AND (approved_at IS NULL OR approved_at > $1) + AND (deleted_at IS NULL OR deleted_at > $1) + ), + events AS ( + SELECT upload_hour AS hour, 1 AS change + FROM pending_posts + WHERE upload_hour >= $1 + UNION ALL + SELECT handled_hour AS hour, -1 AS change + FROM pending_posts + WHERE handled_hour >= $1 AND handled_hour < $2 + ), + event_aggregates AS ( + SELECT hour, SUM(change) AS change + FROM events + GROUP BY hour + ), + hour_series AS ( + SELECT generate_series( + $1::timestamptz, + $2::timestamptz - interval '1 hour', + interval '1 hour' + ) AS hour ) - .andWhere( - new Brackets((qb) => { - qb.where('deletion_event.created_at IS NULL').orWhere( - 'deletion_event.created_at > :start', - { start: range.startDate }, - ); - }), - ) - .groupBy('post_version.post_id') - .getRawMany<{ - post_id: number; - updated_at: string; - approval_date: string | null; - deletion_date: string | null; - }>() - .then((results) => - results - .map(convertKeysToCamelCase) - .map((result) => - convertKeysToDate(result, [ - 'updatedAt', - 'approvalDate', - 'deletionDate', - ]), - ), - ); - - const scale = collapseTimeScaleDuration(range.scale); - - const dates = posts - .map((post) => { - const startDate = max([post.updatedAt, range.startDate]); - - const handledDate = post.approvalDate ?? post.deletionDate; - - const endDate = min([ - handledDate ? sub(handledDate, { [scale]: 1 }) : new Date(), - range.endDate, - ]); - - if (startDate > endDate) return null; - - return new DateRange({ startDate, endDate }); - }) - .filter((date): date is DateRange => date !== null); - - return generateSeriesCountPoints(dates, range); + SELECT + hour_series.hour AS time, + (SELECT count FROM initial_count) + COALESCE( + SUM(event_aggregates.change) OVER (ORDER BY hour_series.hour ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW), + 0 + ) AS count + FROM hour_series + LEFT JOIN event_aggregates ON hour_series.hour = event_aggregates.hour + ORDER BY hour_series.hour + `; + + const result = (await this.lifecycleRepository.query(query, [ + range.startDate, + range.endDate, + cutOff, + ])) as Array<{ time: Date; count: string }>; + + return generateSeriesLastTileCountPoints( + result.map((row) => new DateRange(expandInto(row.time, TimeScale.Hour))), + result.map((row) => parseInt(row.count, 10)), + range, + ); } } diff --git a/src/post/post.module.ts b/src/post/post.module.ts index fe39fc1..f670d01 100644 --- a/src/post/post.module.ts +++ b/src/post/post.module.ts @@ -1,8 +1,9 @@ import { Module } from '@nestjs/common'; +import { PostLifecycleModule } from './lifecycle/post-lifecycle.module'; import { PostMetricModule } from './metric/post-metric.module'; @Module({ - imports: [PostMetricModule], + imports: [PostMetricModule, PostLifecycleModule], }) export class PostModule {} diff --git a/src/upload/metric/upload-metric.controller.ts b/src/upload/metric/upload-metric.controller.ts index 4cf21b5..9b8c1ad 100644 --- a/src/upload/metric/upload-metric.controller.ts +++ b/src/upload/metric/upload-metric.controller.ts @@ -49,7 +49,11 @@ export class UploadMetricController { @Query() range?: PartialDateRange, @Query() query?: PostUploadSeriesQuery, ): Promise { - return this.uploadMetricService.count(range, query); + if (query?.uploaderId) { + return this.uploadMetricService.countUploader(query.uploaderId, range); + } + + return this.uploadMetricService.count(range); } @Get('uploader/summary') diff --git a/src/upload/metric/upload-metric.module.ts b/src/upload/metric/upload-metric.module.ts index 3fd191d..9d09c2b 100644 --- a/src/upload/metric/upload-metric.module.ts +++ b/src/upload/metric/upload-metric.module.ts @@ -1,13 +1,17 @@ import { Module } from '@nestjs/common'; import { TypeOrmModule } from '@nestjs/typeorm'; import { PostVersionEntity } from 'src/post-version/post-version.entity'; +import { UploadTilesEntity } from 'src/upload/tiles/upload-tiles.entity'; import { UserHeadModule } from 'src/user/head/user-head.module'; import { UploadMetricController } from './upload-metric.controller'; import { UploadMetricService } from './upload-metric.service'; @Module({ - imports: [TypeOrmModule.forFeature([PostVersionEntity]), UserHeadModule], + imports: [ + TypeOrmModule.forFeature([PostVersionEntity, UploadTilesEntity]), + UserHeadModule, + ], controllers: [UploadMetricController], providers: [UploadMetricService], }) diff --git a/src/upload/metric/upload-metric.service.ts b/src/upload/metric/upload-metric.service.ts index 38507eb..10c3e82 100644 --- a/src/upload/metric/upload-metric.service.ts +++ b/src/upload/metric/upload-metric.service.ts @@ -6,32 +6,62 @@ import { PaginationParams, PartialDateRange, SeriesCountPoint, + TimeScale, convertKeysToCamelCase, + expandInto, generateSeriesCountPoints, + generateSeriesTileCountPoints, } from 'src/common'; import { PostVersionEntity } from 'src/post-version/post-version.entity'; +import { UploadTilesEntity } from 'src/upload/tiles/upload-tiles.entity'; import { Repository } from 'typeorm'; -import { - PostUploadSeriesQuery, - PostUploaderSummary, -} from './upload-metric.dto'; +import { PostUploaderSummary } from './upload-metric.dto'; @Injectable() export class UploadMetricService { constructor( @InjectRepository(PostVersionEntity) private readonly postVersionRepository: Repository, + @InjectRepository(UploadTilesEntity) + private readonly uploadTilesRepository: Repository, ) {} + @Cacheable({ + prefix: 'upload', + ttl: 10 * 60 * 1000, + dependencies: [UploadTilesEntity], + disable: true, + }) + async count(range?: PartialDateRange): Promise { + range = DateRange.fill(range); + + const tiles = await this.uploadTilesRepository.find({ + where: { + time: range.find(), + }, + order: { + time: 'ASC', + }, + }); + + return generateSeriesTileCountPoints( + tiles.map((tile) => new DateRange(expandInto(tile.time, TimeScale.Hour))), + tiles.map((tile) => tile.count), + range, + ); + } + + // TODO: Tile by uploader? + @Cacheable({ prefix: 'upload', ttl: 10 * 60 * 1000, dependencies: [PostVersionEntity], }) - async count( + async countUploader( + uploaderId: number, range?: PartialDateRange, - query?: PostUploadSeriesQuery, ): Promise { range = DateRange.fill(range); @@ -40,7 +70,7 @@ export class UploadMetricService { where: { version: 1, // only uploads updatedAt: range.find(), - ...query?.where(), + updaterId: uploaderId, }, }); diff --git a/src/upload/tiles/upload-tiles.entity.ts b/src/upload/tiles/upload-tiles.entity.ts new file mode 100644 index 0000000..efa9ba8 --- /dev/null +++ b/src/upload/tiles/upload-tiles.entity.ts @@ -0,0 +1,26 @@ +import { + Column, + Entity, + Index, + PrimaryColumn, + UpdateDateColumn, +} from 'typeorm'; + +@Entity('upload_hourly_tiles') +@Index(['time']) +export class UploadTilesEntity { + constructor(partial?: Partial) { + Object.assign(this, partial); + } + + @PrimaryColumn({ type: 'timestamptz' }) + time: Date; + + @UpdateDateColumn({ type: 'timestamptz' }) + updatedAt: Date; + + @Column({ type: 'int', default: 0 }) + count: number; +} + +export type UploadTilesData = Pick; diff --git a/src/upload/tiles/upload-tiles.module.ts b/src/upload/tiles/upload-tiles.module.ts new file mode 100644 index 0000000..e2dac9e --- /dev/null +++ b/src/upload/tiles/upload-tiles.module.ts @@ -0,0 +1,23 @@ +import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { JobModule } from 'src/job/job.module'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { PostVersionEntity } from 'src/post-version/post-version.entity'; + +import { UploadTilesEntity } from './upload-tiles.entity'; +import { UploadTilesService } from './upload-tiles.service'; +import { UploadTilesWorker } from './upload-tiles.worker'; + +@Module({ + imports: [ + TypeOrmModule.forFeature([ + UploadTilesEntity, + PostVersionEntity, + ManifestEntity, + ]), + JobModule, + ], + providers: [UploadTilesService, UploadTilesWorker], + exports: [UploadTilesService], +}) +export class UploadTilesModule {} diff --git a/src/upload/tiles/upload-tiles.service.ts b/src/upload/tiles/upload-tiles.service.ts new file mode 100644 index 0000000..152b761 --- /dev/null +++ b/src/upload/tiles/upload-tiles.service.ts @@ -0,0 +1,98 @@ +import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Invalidates } from 'src/app/browser.module'; +import { + PartialDateRange, + TileService, + TilingRange, + findMissingOrStaleTiles, + getTilingRanges, + groupTimesIntoRanges, +} from 'src/common'; +import { ItemType } from 'src/label/label.entity'; +import { ManifestEntity } from 'src/manifest/manifest.entity'; +import { PostVersionEntity } from 'src/post-version/post-version.entity'; +import { Repository } from 'typeorm'; + +import { UploadTilesData, UploadTilesEntity } from './upload-tiles.entity'; + +@Injectable() +export class UploadTilesService implements TileService { + readonly interval = 1; + + constructor( + @InjectRepository(UploadTilesEntity) + private readonly tileRepository: Repository, + @InjectRepository(PostVersionEntity) + private readonly postVersionRepository: Repository, + @InjectRepository(ManifestEntity) + private readonly manifestRepository: Repository, + ) {} + + async getRanges(): Promise { + const types = [ItemType.postVersions]; + + const manifests = await this.manifestRepository.find({ + where: types.map((type) => ({ type })), + }); + + return getTilingRanges(manifests, types); + } + + @Invalidates(UploadTilesEntity) + async upsert(tiles: UploadTilesEntity[]): Promise { + if (tiles.length === 0) return; + + await this.tileRepository + .createQueryBuilder() + .insert() + .into(UploadTilesEntity) + .values(tiles) + .orUpdate(['count', 'updated_at'], ['time']) + .execute(); + } + + async generate(times: Date[]): Promise> { + if (times.length === 0) { + return new Map(); + } + + const ranges = groupTimesIntoRanges(times); + const results = new Map(); + + for (const range of ranges) { + const result = await this.postVersionRepository + .createQueryBuilder('post_version') + .select("date_trunc('hour', post_version.updated_at)", 'time') + .addSelect('COUNT(*)', 'count') + .where('post_version.version = :version', { version: 1 }) + .andWhere('post_version.updated_at >= :start', { + start: range.startDate, + }) + .andWhere('post_version.updated_at < :end', { end: range.endDate }) + .groupBy("date_trunc('hour', post_version.updated_at)") + .getRawMany<{ time: Date; count: string }>(); + + result.forEach((row) => + results.set(new Date(row.time).toISOString(), { + count: parseInt(row.count, 10), + }), + ); + } + + return results; + } + + async findMissing(range: TilingRange): Promise { + return findMissingOrStaleTiles(this.tileRepository, range); + } + + @Invalidates(UploadTilesEntity) + async wipe(range?: PartialDateRange): Promise { + if (range?.find()) { + await this.tileRepository.delete({ time: range.find() }); + } else { + await this.tileRepository.clear(); + } + } +} diff --git a/src/upload/tiles/upload-tiles.worker.ts b/src/upload/tiles/upload-tiles.worker.ts new file mode 100644 index 0000000..cd63ef4 --- /dev/null +++ b/src/upload/tiles/upload-tiles.worker.ts @@ -0,0 +1,64 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Cron } from '@nestjs/schedule'; +import { FunkyCronExpression } from 'src/common'; +import { Job } from 'src/job/job.entity'; +import { JobService } from 'src/job/job.service'; + +import { UploadTilesEntity } from './upload-tiles.entity'; +import { UploadTilesService } from './upload-tiles.service'; + +@Injectable() +export class UploadTilesWorker { + constructor( + private readonly jobService: JobService, + private readonly uploadTilesService: UploadTilesService, + ) {} + + private readonly logger = new Logger(UploadTilesWorker.name); + + @Cron(FunkyCronExpression.EVERY_3_MINUTES) + async runTiling() { + this.jobService.add( + new Job({ + title: 'Post Uploads Tiling', + key: `/uploads/tiles`, + queue: 'tiling', + timeout: 1000 * 60 * 5, + execute: async ({ cancelToken }) => { + const ranges = await this.uploadTilesService.getRanges(); + + if (ranges.length === 0) return; + + for (const { dateRange, updatedAt } of ranges) { + cancelToken.ensureRunning(); + + const targets = await this.uploadTilesService.findMissing({ + dateRange, + updatedAt, + }); + + if (targets.length === 0) continue; + + this.logger.log( + `Generating ${targets.length} tiles for ${dateRange.toE621RangeString()}`, + ); + + cancelToken.ensureRunning(); + + const tileData = await this.uploadTilesService.generate(targets); + + const tilesToSave = targets.map((time) => { + const data = tileData.get(time.toISOString()); + return new UploadTilesEntity({ + time, + count: data?.count ?? 0, + }); + }); + + await this.uploadTilesService.upsert(tilesToSave); + } + }, + }), + ); + } +} diff --git a/src/upload/upload.module.ts b/src/upload/upload.module.ts index e319417..4498d24 100644 --- a/src/upload/upload.module.ts +++ b/src/upload/upload.module.ts @@ -2,8 +2,9 @@ import { Module } from '@nestjs/common'; import { UploadMetricModule } from './metric/upload-metric.module'; import { UploadSyncModule } from './sync/upload-sync.module'; +import { UploadTilesModule } from './tiles/upload-tiles.module'; @Module({ - imports: [UploadSyncModule, UploadMetricModule], + imports: [UploadSyncModule, UploadMetricModule, UploadTilesModule], }) export class UploadModule {}