diff --git a/packages/apps/job-launcher/server/.env.example b/packages/apps/job-launcher/server/.env.example index 55590e127e..1541e5f6d8 100644 --- a/packages/apps/job-launcher/server/.env.example +++ b/packages/apps/job-launcher/server/.env.example @@ -89,13 +89,3 @@ PAYMENT_PROVIDER_APP_INFO_URL=http://local.app # Sendgrid SENDGRID_API_KEY=sendgrid-disabled - -# Vision -GOOGLE_PROJECT_ID=disabled -GOOGLE_PRIVATE_KEY=disabled -GOOGLE_CLIENT_EMAIL=disabled -GCV_MODERATION_RESULTS_FILES_PATH=disabled -GCV_MODERATION_RESULTS_BUCKET=disabled - -# Slack -SLACK_ABUSE_NOTIFICATION_WEBHOOK_URL=disabled diff --git a/packages/apps/job-launcher/server/src/common/config/config.module.ts b/packages/apps/job-launcher/server/src/common/config/config.module.ts index 82692b8851..189136e888 100644 --- a/packages/apps/job-launcher/server/src/common/config/config.module.ts +++ b/packages/apps/job-launcher/server/src/common/config/config.module.ts @@ -11,8 +11,6 @@ import { S3ConfigService } from './s3-config.service'; import { SendgridConfigService } from './sendgrid-config.service'; import { PaymentProviderConfigService } from './payment-provider-config.service'; import { Web3ConfigService } from './web3-config.service'; -import { SlackConfigService } from './slack-config.service'; -import { VisionConfigService } from './vision-config.service'; @Global() @Module({ @@ -28,8 +26,6 @@ import { VisionConfigService } from './vision-config.service'; CvatConfigService, PGPConfigService, NetworkConfigService, - SlackConfigService, - VisionConfigService, ], exports: [ ConfigService, @@ -43,8 +39,6 @@ import { VisionConfigService } from './vision-config.service'; CvatConfigService, PGPConfigService, NetworkConfigService, - SlackConfigService, - VisionConfigService, ], }) export class EnvConfigModule {} diff --git a/packages/apps/job-launcher/server/src/common/config/env-schema.ts b/packages/apps/job-launcher/server/src/common/config/env-schema.ts index 195f9becb1..1304d47ade 100644 --- a/packages/apps/job-launcher/server/src/common/config/env-schema.ts +++ b/packages/apps/job-launcher/server/src/common/config/env-schema.ts @@ -67,11 +67,6 @@ export const envValidator = Joi.object({ SENDGRID_API_KEY: Joi.string().required(), SENDGRID_FROM_EMAIL: Joi.string(), SENDGRID_FROM_NAME: Joi.string(), - // CVAT - CVAT_JOB_SIZE: Joi.string(), - CVAT_MAX_TIME: Joi.string(), - CVAT_VAL_SIZE: Joi.string(), - CVAT_SKELETONS_JOB_SIZE_MULTIPLIER: Joi.string(), //PGP PGP_ENCRYPT: Joi.boolean(), PGP_PRIVATE_KEY: Joi.string().optional(), @@ -82,12 +77,4 @@ export const envValidator = Joi.object({ //COIN API KEYS RATE_CACHE_TIME: Joi.number().optional(), COINGECKO_API_KEY: Joi.string().optional(), - // Google - GOOGLE_PROJECT_ID: Joi.string().required(), - GOOGLE_PRIVATE_KEY: Joi.string().required(), - GOOGLE_CLIENT_EMAIL: Joi.string().required(), - GCV_MODERATION_RESULTS_FILES_PATH: Joi.string().required(), - GCV_MODERATION_RESULTS_BUCKET: Joi.string().required(), - // Slack - SLACK_ABUSE_NOTIFICATION_WEBHOOK_URL: Joi.string().required(), }); diff --git a/packages/apps/job-launcher/server/src/common/config/slack-config.service.ts b/packages/apps/job-launcher/server/src/common/config/slack-config.service.ts deleted file mode 100644 index 4bccf51aee..0000000000 --- a/packages/apps/job-launcher/server/src/common/config/slack-config.service.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; - -@Injectable() -export class SlackConfigService { - constructor(private configService: ConfigService) {} - - /** - * The abuse notification webhook URL for sending messages to a Slack channel. - * Required - */ - get abuseNotificationWebhookUrl(): string { - return this.configService.getOrThrow( - 'SLACK_ABUSE_NOTIFICATION_WEBHOOK_URL', - ); - } -} diff --git a/packages/apps/job-launcher/server/src/common/config/vision-config.service.ts b/packages/apps/job-launcher/server/src/common/config/vision-config.service.ts deleted file mode 100644 index 3d39b184b6..0000000000 --- a/packages/apps/job-launcher/server/src/common/config/vision-config.service.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { ConfigService } from '@nestjs/config'; - -@Injectable() -export class VisionConfigService { - constructor(private configService: ConfigService) {} - - /** - * The Google Cloud Storage (GCS) path name where temporary async moderation results will be saved. - * Required - */ - get moderationResultsFilesPath(): string { - return this.configService.getOrThrow( - 'GCV_MODERATION_RESULTS_FILES_PATH', - ); - } - - /** - * The Google Cloud Storage (GCS) bucket name where moderation results will be saved. - * Required - */ - get moderationResultsBucket(): string { - return this.configService.getOrThrow( - 'GCV_MODERATION_RESULTS_BUCKET', - ); - } - - /** - * The project ID for connecting to the Google Cloud Vision API. - * Required - */ - get projectId(): string { - return this.configService.getOrThrow('GOOGLE_PROJECT_ID'); - } - - /** - * The private key for authenticating with the Google Cloud Vision API. - * Required - */ - get privateKey(): string { - return this.configService.getOrThrow('GOOGLE_PRIVATE_KEY'); - } - - /** - * The client email used for authenticating requests to the Google Cloud Vision API. - * Required - */ - get clientEmail(): string { - return this.configService.getOrThrow('GOOGLE_CLIENT_EMAIL'); - } -} diff --git a/packages/apps/job-launcher/server/src/common/constants/errors.ts b/packages/apps/job-launcher/server/src/common/constants/errors.ts index a12a28d52a..29c15d826f 100644 --- a/packages/apps/job-launcher/server/src/common/constants/errors.ts +++ b/packages/apps/job-launcher/server/src/common/constants/errors.ts @@ -26,23 +26,6 @@ export enum ErrorJob { NoRefundFound = 'No refund found for this escrow', } -/** - * Represents error messages associated with a job moderation. - */ -export enum ErrorContentModeration { - ErrorProcessingDataset = 'Error processing dataset', - InappropriateContent = 'Job cannot be processed due to inappropriate content', - ContentModerationFailed = 'Job cannot be processed due to failure in content moderation', - NoDestinationURIFound = 'No destination URI found in the response', - InvalidBucketUrl = 'Invalid bucket URL', - DataMustBeStoredInGCS = 'Data must be stored in Google Cloud Storage', - NoResultsFound = 'No results found', - ResultsParsingFailed = 'Results parsing failed', - JobModerationFailed = 'Job moderation failed', - ProcessContentModerationRequestFailed = 'Process content moderation request failed', - CompleteContentModerationFailed = 'Complete content moderation failed', -} - /** * Represents error messages associated to webhook. */ diff --git a/packages/apps/job-launcher/server/src/common/constants/index.ts b/packages/apps/job-launcher/server/src/common/constants/index.ts index 72ada66583..7bbb8b9a5e 100644 --- a/packages/apps/job-launcher/server/src/common/constants/index.ts +++ b/packages/apps/job-launcher/server/src/common/constants/index.ts @@ -66,6 +66,14 @@ export const LOGOUT_PATH = '/auth/logout'; export const MUTEX_TIMEOUT = 2000; //ms -export const GS_PROTOCOL = 'gs://'; -export const GCV_CONTENT_MODERATION_ASYNC_BATCH_SIZE = 100; -export const GCV_CONTENT_MODERATION_BATCH_SIZE_PER_TASK = 2000; +/** + * Regex for GCS URL in subdomain format: https://.storage.googleapis.com/ + */ +export const GCS_HTTP_REGEX_SUBDOMAIN = + /^https:\/\/([a-zA-Z0-9\-.]+)\.storage\.googleapis\.com\/?(.*)$/; + +/** + * Regex for GCS URL in path-based format: https://storage.googleapis.com// + */ +export const GCS_HTTP_REGEX_PATH_BASED = + /^https:\/\/storage\.googleapis\.com\/([^/]+)\/?(.*)$/; diff --git a/packages/apps/job-launcher/server/src/common/enums/content-moderation.ts b/packages/apps/job-launcher/server/src/common/enums/content-moderation.ts deleted file mode 100644 index d772b1774f..0000000000 --- a/packages/apps/job-launcher/server/src/common/enums/content-moderation.ts +++ /dev/null @@ -1,7 +0,0 @@ -export enum ContentModerationRequestStatus { - PENDING = 'pending', - PROCESSED = 'processed', - POSITIVE_ABUSE = 'positive_abuse', - PASSED = 'passed', - FAILED = 'failed', -} diff --git a/packages/apps/job-launcher/server/src/common/enums/cron-job.ts b/packages/apps/job-launcher/server/src/common/enums/cron-job.ts index 6cb71352ec..da9fd197f3 100644 --- a/packages/apps/job-launcher/server/src/common/enums/cron-job.ts +++ b/packages/apps/job-launcher/server/src/common/enums/cron-job.ts @@ -1,5 +1,4 @@ export enum CronJobType { - ContentModeration = 'content-moderation', CreateEscrow = 'create-escrow', CancelEscrow = 'cancel-escrow', ProcessPendingWebhook = 'process-pending-webhook', diff --git a/packages/apps/job-launcher/server/src/common/enums/gcv.ts b/packages/apps/job-launcher/server/src/common/enums/gcv.ts deleted file mode 100644 index 600b82abae..0000000000 --- a/packages/apps/job-launcher/server/src/common/enums/gcv.ts +++ /dev/null @@ -1,9 +0,0 @@ -export enum ContentModerationLevel { - VERY_LIKELY = 'VERY_LIKELY', - LIKELY = 'LIKELY', - POSSIBLE = 'POSSIBLE', -} - -export enum ContentModerationFeature { - SAFE_SEARCH_DETECTION = 'SAFE_SEARCH_DETECTION', -} diff --git a/packages/apps/job-launcher/server/src/common/enums/job.ts b/packages/apps/job-launcher/server/src/common/enums/job.ts index e2ad6ca78e..db16799d11 100644 --- a/packages/apps/job-launcher/server/src/common/enums/job.ts +++ b/packages/apps/job-launcher/server/src/common/enums/job.ts @@ -1,8 +1,5 @@ export enum JobStatus { PAID = 'paid', - UNDER_MODERATION = 'under_moderation', - MODERATION_PASSED = 'moderation_passed', - POSSIBLE_ABUSE_IN_REVIEW = 'possible_abuse_in_review', LAUNCHED = 'launched', PARTIAL = 'partial', COMPLETED = 'completed', diff --git a/packages/apps/job-launcher/server/src/common/utils/gcstorage.spec.ts b/packages/apps/job-launcher/server/src/common/utils/gcstorage.spec.ts deleted file mode 100644 index 5100ed5e18..0000000000 --- a/packages/apps/job-launcher/server/src/common/utils/gcstorage.spec.ts +++ /dev/null @@ -1,191 +0,0 @@ -import { - constructGcsPath, - convertToGCSPath, - convertToHttpUrl, - isGCSBucketUrl, -} from './gcstorage'; -import { ErrorBucket } from '../constants/errors'; - -describe('Google Cloud Storage utils', () => { - describe('isGCSBucketUrl', () => { - it('should return true for a valid GCS HTTP URL', () => { - expect( - isGCSBucketUrl( - 'https://valid-bucket-with-file.storage.googleapis.com/object.jpg', - ), - ).toBe(true); - expect( - isGCSBucketUrl('https://valid-bucket.storage.googleapis.com/'), - ).toBe(true); - expect( - isGCSBucketUrl('https://valid-bucket.storage.googleapis.com'), - ).toBe(true); - }); - - it('should return true for a valid GCS gs:// URL', () => { - expect(isGCSBucketUrl('gs://valid-bucket-with-file/object.jpg')).toBe( - true, - ); - expect(isGCSBucketUrl('gs://valid-bucket/')).toBe(true); - expect(isGCSBucketUrl('gs://valid-bucket')).toBe(true); - }); - - it('should return false for an invalid GCS HTTP URL', () => { - expect(isGCSBucketUrl('https://invalid-url.com/object.jpg')).toBe(false); - }); - - it('should return false for an invalid gs:// URL', () => { - expect(isGCSBucketUrl('gs:/invalid-bucket/object.jpg')).toBe(false); - }); - - it('should return false for a completely invalid URL', () => { - expect(isGCSBucketUrl('randomstring')).toBe(false); - }); - - it('should return false for a GCS URL with an invalid bucket name', () => { - expect(isGCSBucketUrl('https://_invalid.storage.googleapis.com')).toBe( - false, - ); - expect(isGCSBucketUrl('gs://sh.storage.googleapis.com')).toBe(false); - expect(isGCSBucketUrl('https://test-.storage.googleapis.com')).toBe( - false, - ); - expect(isGCSBucketUrl('https://-test.storage.googleapis.com')).toBe( - false, - ); - }); - }); - - describe('convertToGCSPath', () => { - it('should convert a valid GCS HTTP URL to a gs:// path', () => { - expect( - convertToGCSPath( - 'https://valid-bucket.storage.googleapis.com/object.jpg', - ), - ).toBe('gs://valid-bucket/object.jpg'); - }); - - it('should convert a valid GCS HTTP URL without an object path to a gs:// bucket path', () => { - expect( - convertToGCSPath('https://valid-bucket.storage.googleapis.com'), - ).toBe('gs://valid-bucket'); - - expect( - convertToGCSPath('https://valid-bucket.storage.googleapis.com/'), - ).toBe('gs://valid-bucket'); - }); - - it('should throw a Error for an invalid GCS URL', () => { - expect(() => - convertToGCSPath('https://invalid-url.com/object.jpg'), - ).toThrow(new Error(ErrorBucket.InvalidGCSUrl)); - }); - - it('should throw a Error for a URL with an invalid bucket name', () => { - expect(() => - convertToGCSPath('https://invalid_bucket.storage.googleapis.com'), - ).toThrow(new Error(ErrorBucket.InvalidGCSUrl)); - }); - }); - - describe('convertToHttpUrl', () => { - it('should convert a gs:// path to a valid HTTP URL', () => { - const result = convertToHttpUrl('gs://valid-bucket/object.jpg'); - expect(result).toBe( - 'https://valid-bucket.storage.googleapis.com/object.jpg', - ); - }); - - it('should convert a gs:// bucket path without an object to an HTTP bucket URL', () => { - expect(convertToHttpUrl('gs://valid-bucket/')).toBe( - 'https://valid-bucket.storage.googleapis.com/', - ); - expect(convertToHttpUrl('gs://valid-bucket')).toBe( - 'https://valid-bucket.storage.googleapis.com/', - ); - }); - - it('should throw a Error for an invalid gs:// path', () => { - expect(() => convertToHttpUrl('invalid-gcs-path')).toThrow( - new Error(ErrorBucket.InvalidGCSUrl), - ); - }); - - it('should throw a Error if the gs:// format is incorrect', () => { - expect(() => convertToHttpUrl('gs:/missing-slash/object.jpg')).toThrow( - new Error(ErrorBucket.InvalidGCSUrl), - ); - }); - - it('should throw a Error for an invalid bucket name in gs:// path', () => { - expect(() => convertToHttpUrl('gs://_invalid/object.jpg')).toThrow( - new Error(ErrorBucket.InvalidGCSUrl), - ); - expect(() => convertToHttpUrl('gs://test-/object.jpg')).toThrow( - new Error(ErrorBucket.InvalidGCSUrl), - ); - }); - }); - - describe('constructGcsPath', () => { - it('should correctly construct a GCS path with multiple segments', () => { - expect(constructGcsPath('my-bucket', 'folder', 'file.jpg')).toBe( - 'gs://my-bucket/folder/file.jpg', - ); - }); - - it('should handle leading and trailing slashes properly', () => { - expect(constructGcsPath('my-bucket/', '/folder/', '/file.jpg')).toBe( - 'gs://my-bucket/folder/file.jpg', - ); - }); - - it('should remove extra slashes and normalize path segments', () => { - expect( - constructGcsPath('my-bucket', '///folder///', '///file.jpg///'), - ).toBe('gs://my-bucket/folder/file.jpg'); - }); - - it('should handle cases where no additional paths are provided', () => { - expect(constructGcsPath('my-bucket')).toBe('gs://my-bucket'); - }); - - it('should handle empty segments gracefully', () => { - expect(constructGcsPath('my-bucket', '', 'file.jpg')).toBe( - 'gs://my-bucket/file.jpg', - ); - }); - - it('should construct a path with nested directories correctly', () => { - expect( - constructGcsPath('my-bucket', 'folder1', 'folder2', 'file.jpg'), - ).toBe('gs://my-bucket/folder1/folder2/file.jpg'); - }); - - it('should not add an extra slash if the base path already ends with one', () => { - expect(constructGcsPath('my-bucket/', 'file.jpg')).toBe( - 'gs://my-bucket/file.jpg', - ); - }); - - it('should correctly handle a single trailing slash in the base path', () => { - expect(constructGcsPath('my-bucket/', '')).toBe('gs://my-bucket'); - }); - - it('should correctly handle a bucket name that already includes gs://', () => { - expect(constructGcsPath('gs://my-bucket', 'folder', 'file.jpg')).toBe( - 'gs://my-bucket/folder/file.jpg', - ); - }); - - it('should correctly handle a bucket name with gs:// and a trailing slash', () => { - expect(constructGcsPath('gs://my-bucket/', 'folder', 'file.jpg')).toBe( - 'gs://my-bucket/folder/file.jpg', - ); - }); - - it('should handle paths that contain only slashes', () => { - expect(constructGcsPath('my-bucket', '/', '/')).toBe('gs://my-bucket'); - }); - }); -}); diff --git a/packages/apps/job-launcher/server/src/common/utils/gcstorage.ts b/packages/apps/job-launcher/server/src/common/utils/gcstorage.ts deleted file mode 100644 index 87f57e9086..0000000000 --- a/packages/apps/job-launcher/server/src/common/utils/gcstorage.ts +++ /dev/null @@ -1,193 +0,0 @@ -import { isURL } from 'validator'; -import { GS_PROTOCOL } from '../constants'; -import { ErrorBucket } from '../constants/errors'; - -// Step 1: Define your regular expressions, bucket validation, and URL validation helpers - -/** - * Regex for GCS URL in subdomain format: https://.storage.googleapis.com/ - */ -export const GCS_HTTP_REGEX_SUBDOMAIN = - /^https:\/\/([a-zA-Z0-9\-.]+)\.storage\.googleapis\.com\/?(.*)$/; - -/** - * Regex for GCS URL in path-based format: https://storage.googleapis.com// - */ -export const GCS_HTTP_REGEX_PATH_BASED = - /^https:\/\/storage\.googleapis\.com\/([^/]+)\/?(.*)$/; - -/** - * Regex for GCS URI format: gs:/// - */ -export const GCS_GS_REGEX = /^gs:\/\/([a-zA-Z0-9\-.]+)\/?(.*)$/; - -/** - * Regex that ensures the bucket name follows Google Cloud Storage (GCS) naming rules: - * - Must be between 3 and 63 characters long. - * - Can contain lowercase letters, numbers, dashes (`-`), and dots (`.`). - * - Cannot begin or end with a dash (`-`). - * - Cannot have consecutive periods (`..`). - * - Cannot resemble an IP address (e.g., "192.168.1.1"). - */ -const BUCKET_NAME_REGEX = /^[a-z0-9][a-z0-9-]{1,61}[a-z0-9]$/; - -// Step 2: Implement the main validation function - -/** - * Validates if a given URL is a valid Google Cloud Storage URL. - * - * Supports: - * - Subdomain format: https://.storage.googleapis.com[/] - * - Path-based format: https://storage.googleapis.com/[/] - * - GCS URI format: gs://[/] - * - * @param url - The URL to validate. - * @returns {boolean} - Returns true if the URL is valid, otherwise false. - */ -export function isGCSBucketUrl(url: string): boolean { - // 1) Quickly check if it's a valid URL in general - if (!isValidUrl(url)) { - return false; - } - - // 2) Try subdomain-based regex first - let httpMatch = url.match(GCS_HTTP_REGEX_SUBDOMAIN); - - // 3) If that fails, try path-based regex - if (!httpMatch) { - httpMatch = url.match(GCS_HTTP_REGEX_PATH_BASED); - } - - // 4) Also check if it matches the gs:// scheme - const gsMatch = url.match(GCS_GS_REGEX); - - // 5) If any HTTP or GS regex matched - if (httpMatch || gsMatch) { - // For HTTP matches, the bucket is captured in group [1]. - // For GS matches, it's also in group [1]. - const bucketName = httpMatch ? httpMatch[1] : gsMatch ? gsMatch[1] : null; - - if (!bucketName || !isValidBucketName(bucketName)) { - return false; - } - - return true; - } - - return false; -} - -/** - * Validates a URL to check if it is a valid Google Cloud Storage URL. - * This function ensures the URL is well-formed and its protocol is one of: - * - `http:` (HTTP URL) - * - `https:` (HTTPS URL) - * - `gs:` (Google Cloud Storage URI) - * - * @param maybeUrl The URL string to be validated. - * @returns A boolean indicating whether the URL is valid and has an allowed protocol. - */ -export function isValidUrl(maybeUrl: string): boolean { - try { - const url = new URL(maybeUrl); - if (url.protocol === 'gs:') { - return true; - } else { - return isURL(maybeUrl, { - require_protocol: true, - protocols: ['http', 'https'], - }); - } - } catch { - return false; - } -} - -/** - * Validates a Google Cloud Storage bucket name. - * GCS requires bucket names to: - * - Be 3-63 characters long - * - Contain only lowercase letters, numbers, dashes - * - Not start or end with a dash - */ -function isValidBucketName(bucket: string): boolean { - return BUCKET_NAME_REGEX.test(bucket); -} - -/** - * Converts a valid Google Cloud Storage HTTP URL to a GCS path. - * - * @param url - The HTTP URL to convert. - * @returns {string} - The converted GCS path. - * @throws Error - If the URL is not a valid GCS URL. - */ -export function convertToGCSPath(url: string): string { - if (!isGCSBucketUrl(url)) { - throw new Error(ErrorBucket.InvalidGCSUrl); - } - - let match = url.match(GCS_HTTP_REGEX_SUBDOMAIN); - let bucketName: string | null = null; - let objectPath: string | null = null; - - if (match) { - bucketName = match[1]; - objectPath = match[2] || ''; - } else { - match = url.match(GCS_HTTP_REGEX_PATH_BASED); - if (match) { - bucketName = match[1]; - objectPath = match[2] || ''; - } - } - - if (!bucketName) { - throw new Error(ErrorBucket.InvalidGCSUrl); - } - - let gcsPath = `gs://${bucketName}`; - if (objectPath) { - gcsPath += `/${objectPath}`; - } - return gcsPath; -} - -/** - * Converts a GCS path to a valid Google Cloud Storage HTTP URL. - * - * @param gcsPath - The GCS path to convert (e.g., "gs://bucket-name/object-path"). - * @returns {string} - The converted HTTP URL. - * @throws Error - If the GCS path is not valid. - */ -export function convertToHttpUrl(gcsPath: string): string { - if (!isGCSBucketUrl(gcsPath)) { - throw new Error(ErrorBucket.InvalidGCSUrl); - } - - const match = gcsPath.match(GCS_GS_REGEX); - - const bucketName = match![1]; - const objectPath = match![2] || ''; - - return `https://${bucketName}.storage.googleapis.com/${objectPath}`; -} - -/** - * Constructs a GCS path with a variable number of segments. - * - * @param bucket - The GCS bucket name (without `gs://`). - * @param paths - Additional path segments to append. - * @returns {string} - The constructed GCS path. - */ -export function constructGcsPath(bucket: string, ...paths: string[]): string { - const cleanBucket = bucket.replace(/^gs:\/\//, '').replace(/\/+$/, ''); - - const fullPath = paths - .map((segment) => segment.replace(/^\/+|\/+$/g, '')) - .filter((segment) => segment) - .join('/'); - - return fullPath - ? `${GS_PROTOCOL}${cleanBucket}/${fullPath}` - : `${GS_PROTOCOL}${cleanBucket}`; -} diff --git a/packages/apps/job-launcher/server/src/common/utils/storage.ts b/packages/apps/job-launcher/server/src/common/utils/storage.ts index ec7100b4ae..fbe86382ec 100644 --- a/packages/apps/job-launcher/server/src/common/utils/storage.ts +++ b/packages/apps/job-launcher/server/src/common/utils/storage.ts @@ -2,14 +2,14 @@ import { HttpStatus } from '@nestjs/common'; import axios, { AxiosError } from 'axios'; import { parseString } from 'xml2js'; import { StorageDataDto } from '../../modules/job/job.dto'; +import { + GCS_HTTP_REGEX_PATH_BASED, + GCS_HTTP_REGEX_SUBDOMAIN, +} from '../constants'; import { ErrorBucket } from '../constants/errors'; import { CvatJobType, JobRequestType } from '../enums/job'; import { AWSRegions, StorageProviders } from '../enums/storage'; import { ValidationError } from '../errors'; -import { - GCS_HTTP_REGEX_PATH_BASED, - GCS_HTTP_REGEX_SUBDOMAIN, -} from './gcstorage'; import { formatAxiosError } from './http'; function parseXml(xml: string): Promise { diff --git a/packages/apps/job-launcher/server/src/database/database.module.ts b/packages/apps/job-launcher/server/src/database/database.module.ts index 66d72e30d2..9e8d36a174 100644 --- a/packages/apps/job-launcher/server/src/database/database.module.ts +++ b/packages/apps/job-launcher/server/src/database/database.module.ts @@ -8,7 +8,6 @@ import { UserEntity } from '../modules/user/user.entity'; import { TypeOrmLoggerModule, TypeOrmLoggerService } from './typeorm'; import { JobEntity } from '../modules/job/job.entity'; -import { ContentModerationRequestEntity } from '../modules/content-moderation/content-moderation-request.entity'; import { PaymentEntity } from '../modules/payment/payment.entity'; import { DatabaseConfigService } from '../common/config/database-config.service'; import { ApiKeyEntity } from '../modules/auth/apikey.entity'; @@ -40,7 +39,6 @@ import { WhitelistEntity } from '../modules/whitelist/whitelist.entity'; ApiKeyEntity, UserEntity, JobEntity, - ContentModerationRequestEntity, PaymentEntity, WebhookEntity, CronJobEntity, diff --git a/packages/apps/job-launcher/server/src/database/migrations/1774453578372-removeContentModeration.ts b/packages/apps/job-launcher/server/src/database/migrations/1774453578372-removeContentModeration.ts new file mode 100644 index 0000000000..b9ec19d063 --- /dev/null +++ b/packages/apps/job-launcher/server/src/database/migrations/1774453578372-removeContentModeration.ts @@ -0,0 +1,151 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class RemoveContentModeration1774453578372 implements MigrationInterface { + name = 'RemoveContentModeration1774453578372'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + UPDATE "hmt"."jobs" + SET "status" = 'paid' + WHERE "status" IN ('moderation_passed', 'under_moderation') + `); + await queryRunner.query(` + UPDATE "hmt"."jobs" + SET "status" = 'failed' + WHERE "status" = 'possible_abuse_in_review' + `); + await queryRunner.query(` + DELETE FROM "hmt"."cron-jobs" + WHERE "cron_job_type" = 'content-moderation' + `); + await queryRunner.query(` + ALTER TABLE "hmt"."content-moderation-requests" + DROP CONSTRAINT IF EXISTS "FK_d4f313caf54945a83b00abc02af" + `); + await queryRunner.query(` + DROP TABLE IF EXISTS "hmt"."content-moderation-requests" + `); + await queryRunner.query(` + DROP TYPE IF EXISTS "hmt"."content-moderation-requests_status_enum" + `); + await queryRunner.query(` + ALTER TYPE "hmt"."jobs_status_enum" + RENAME TO "jobs_status_enum_old" + `); + await queryRunner.query(` + CREATE TYPE "hmt"."jobs_status_enum" AS ENUM( + 'paid', + 'launched', + 'partial', + 'completed', + 'failed', + 'to_cancel', + 'canceling', + 'canceled' + ) + `); + await queryRunner.query(` + ALTER TABLE "hmt"."jobs" + ALTER COLUMN "status" TYPE "hmt"."jobs_status_enum" USING "status"::"text"::"hmt"."jobs_status_enum" + `); + await queryRunner.query(` + DROP TYPE "hmt"."jobs_status_enum_old" + `); + await queryRunner.query(` + ALTER TYPE "hmt"."cron-jobs_cron_job_type_enum" + RENAME TO "cron-jobs_cron_job_type_enum_old" + `); + await queryRunner.query(` + CREATE TYPE "hmt"."cron-jobs_cron_job_type_enum" AS ENUM( + 'create-escrow', + 'cancel-escrow', + 'process-pending-webhook', + 'sync-job-statuses', + 'abuse' + ) + `); + await queryRunner.query(` + ALTER TABLE "hmt"."cron-jobs" + ALTER COLUMN "cron_job_type" TYPE "hmt"."cron-jobs_cron_job_type_enum" USING "cron_job_type"::"text"::"hmt"."cron-jobs_cron_job_type_enum" + `); + await queryRunner.query(` + DROP TYPE "hmt"."cron-jobs_cron_job_type_enum_old" + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TYPE "hmt"."content-moderation-requests_status_enum" AS ENUM( + 'pending', + 'processed', + 'positive_abuse', + 'passed', + 'failed' + ) + `); + await queryRunner.query(` + CREATE TABLE "hmt"."content-moderation-requests" ( + "id" SERIAL NOT NULL, + "created_at" TIMESTAMP WITH TIME ZONE NOT NULL, + "updated_at" TIMESTAMP WITH TIME ZONE NOT NULL, + "data_url" character varying NOT NULL, + "from" integer NOT NULL, + "to" integer NOT NULL, + "status" "hmt"."content-moderation-requests_status_enum" NOT NULL, + "job_id" integer NOT NULL, + CONSTRAINT "PK_e81154211cbfb9f8dcd56158313" PRIMARY KEY ("id") + ) + `); + await queryRunner.query(` + CREATE TYPE "hmt"."cron-jobs_cron_job_type_enum_old" AS ENUM( + 'abuse', + 'cancel-escrow', + 'content-moderation', + 'create-escrow', + 'process-pending-webhook', + 'sync-job-statuses' + ) + `); + await queryRunner.query(` + ALTER TABLE "hmt"."cron-jobs" + ALTER COLUMN "cron_job_type" TYPE "hmt"."cron-jobs_cron_job_type_enum_old" USING "cron_job_type"::"text"::"hmt"."cron-jobs_cron_job_type_enum_old" + `); + await queryRunner.query(` + DROP TYPE "hmt"."cron-jobs_cron_job_type_enum" + `); + await queryRunner.query(` + ALTER TYPE "hmt"."cron-jobs_cron_job_type_enum_old" + RENAME TO "cron-jobs_cron_job_type_enum" + `); + await queryRunner.query(` + CREATE TYPE "hmt"."jobs_status_enum_old" AS ENUM( + 'canceled', + 'canceling', + 'completed', + 'failed', + 'launched', + 'moderation_passed', + 'paid', + 'partial', + 'possible_abuse_in_review', + 'to_cancel', + 'under_moderation' + ) + `); + await queryRunner.query(` + ALTER TABLE "hmt"."jobs" + ALTER COLUMN "status" TYPE "hmt"."jobs_status_enum_old" USING "status"::"text"::"hmt"."jobs_status_enum_old" + `); + await queryRunner.query(` + DROP TYPE "hmt"."jobs_status_enum" + `); + await queryRunner.query(` + ALTER TYPE "hmt"."jobs_status_enum_old" + RENAME TO "jobs_status_enum" + `); + await queryRunner.query(` + ALTER TABLE "hmt"."content-moderation-requests" + ADD CONSTRAINT "FK_d4f313caf54945a83b00abc02af" FOREIGN KEY ("job_id") REFERENCES "hmt"."jobs"("id") ON DELETE NO ACTION ON UPDATE NO ACTION + `); + } +} diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation-request.entity.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation-request.entity.ts deleted file mode 100644 index 70a268f4a7..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation-request.entity.ts +++ /dev/null @@ -1,31 +0,0 @@ -import { Column, Entity, ManyToOne } from 'typeorm'; -import { NS } from '../../common/constants'; -import { BaseEntity } from '../../database/base.entity'; -import { ContentModerationRequestStatus } from '../../common/enums/content-moderation'; -import { JobEntity } from '../job/job.entity'; - -@Entity({ schema: NS, name: 'content-moderation-requests' }) -export class ContentModerationRequestEntity extends BaseEntity { - @Column({ type: 'varchar', nullable: false }) - public dataUrl: string; - - @Column({ type: 'int', nullable: false }) - public from: number; - - @Column({ type: 'int', nullable: false }) - public to: number; - - @Column({ - type: 'enum', - enum: ContentModerationRequestStatus, - }) - public status: ContentModerationRequestStatus; - - @ManyToOne(() => JobEntity, (job) => job.contentModerationRequests, { - eager: true, - }) - job: JobEntity; - - @Column({ type: 'int', nullable: false }) - public jobId: number; -} diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation-request.repository.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation-request.repository.ts deleted file mode 100644 index 179b6c1092..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation-request.repository.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { Injectable } from '@nestjs/common'; -import { DataSource } from 'typeorm'; -import { ServerConfigService } from '../../common/config/server-config.service'; -import { SortDirection } from '../../common/enums/collection'; -import { ContentModerationRequestStatus } from '../../common/enums/content-moderation'; -import { BaseRepository } from '../../database/base.repository'; -import { ContentModerationRequestEntity } from './content-moderation-request.entity'; -import { QueryFailedError } from 'typeorm'; -import { handleQueryFailedError } from '../../common/errors'; - -@Injectable() -export class ContentModerationRequestRepository extends BaseRepository { - constructor( - private readonly dataSource: DataSource, - public readonly serverConfigService: ServerConfigService, - ) { - super(ContentModerationRequestEntity, dataSource); - } - - /** - * Finds all requests for a given job, ordered by createdAt desc. - */ - public async findByJobId( - jobId: number, - ): Promise { - try { - return this.find({ - where: { job: { id: jobId } }, - order: { createdAt: SortDirection.DESC }, - relations: ['job', 'job.contentModerationRequests'], - }); - } catch (error) { - if (error instanceof QueryFailedError) { - throw handleQueryFailedError(error); - } - throw error; - } - } - - /** - * Finds requests matching a jobId & status, in descending order by createdAt. - */ - public async findByJobIdAndStatus( - jobId: number, - status: ContentModerationRequestStatus, - ): Promise { - try { - return this.find({ - where: { job: { id: jobId }, status }, - order: { createdAt: SortDirection.DESC }, - relations: ['job', 'job.contentModerationRequests'], - }); - } catch (error) { - if (error instanceof QueryFailedError) { - throw handleQueryFailedError(error); - } - throw error; - } - } - - /** - * Creates multiple new requests in one call. - */ - public async createRequests( - requests: ContentModerationRequestEntity[], - ): Promise { - try { - return await this.save(requests); - } catch (error) { - if (error instanceof QueryFailedError) { - throw handleQueryFailedError(error); - } - throw error; - } - } - - /** - * Updates the status of a single request. - */ - public async updateStatus( - request: ContentModerationRequestEntity, - newStatus: ContentModerationRequestStatus, - ): Promise { - try { - request.status = newStatus; - await this.updateOne(request); - } catch (error) { - if (error instanceof QueryFailedError) { - throw handleQueryFailedError(error); - } - throw error; - } - } -} diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.dto.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.dto.ts deleted file mode 100644 index eca7f1452a..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.dto.ts +++ /dev/null @@ -1,17 +0,0 @@ -export class ModerationResultDto { - adult: string; - violence: string; - racy: string; - spoof: string; - medical: string; -} - -export class ImageModerationResultDto { - imageUrl: string; - moderationResult: ModerationResultDto; -} - -export class DataModerationResultDto { - positiveAbuseResults: ImageModerationResultDto[]; - possibleAbuseResults: ImageModerationResultDto[]; -} diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.interface.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.interface.ts deleted file mode 100644 index 7e4b518ba7..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.interface.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { JobEntity } from '../job/job.entity'; - -export interface IContentModeratorService { - moderateJob(jobEntity: JobEntity): Promise; -} diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.module.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.module.ts deleted file mode 100644 index b7ef50a7af..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/content-moderation.module.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { Global, Module } from '@nestjs/common'; -import { ConfigModule } from '@nestjs/config'; -import { TypeOrmModule } from '@nestjs/typeorm'; -import { JobModule } from '../job/job.module'; -import { ContentModerationRequestEntity } from './content-moderation-request.entity'; -import { ContentModerationRequestRepository } from './content-moderation-request.repository'; -import { GCVContentModerationService } from './gcv-content-moderation.service'; -import { JobEntity } from '../job/job.entity'; -import { JobRepository } from '../job/job.repository'; -import { ManifestModule } from '../manifest/manifest.module'; - -@Global() -@Module({ - imports: [ - TypeOrmModule.forFeature([ContentModerationRequestEntity, JobEntity]), - ConfigModule, - JobModule, - ManifestModule, - ], - providers: [ - ContentModerationRequestRepository, - JobRepository, - GCVContentModerationService, - ], - exports: [GCVContentModerationService], -}) -export class ContentModerationModule {} diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/gcv-content-moderation.service.spec.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/gcv-content-moderation.service.spec.ts deleted file mode 100644 index 84bce6d2ef..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/gcv-content-moderation.service.spec.ts +++ /dev/null @@ -1,772 +0,0 @@ -jest.mock('@google-cloud/storage'); -jest.mock('@google-cloud/vision'); -jest.mock('../../common/utils/slack', () => ({ - sendSlackNotification: jest.fn(), -})); -jest.mock('../../common/utils/storage', () => ({ - ...jest.requireActual('../../common/utils/storage'), - listObjectsInBucket: jest.fn(), -})); - -import { faker } from '@faker-js/faker'; -import { Storage } from '@google-cloud/storage'; -import { ImageAnnotatorClient } from '@google-cloud/vision'; -import { Test, TestingModule } from '@nestjs/testing'; - -import { SlackConfigService } from '../../common/config/slack-config.service'; -import { VisionConfigService } from '../../common/config/vision-config.service'; -import { ErrorContentModeration } from '../../common/constants/errors'; -import { ContentModerationRequestStatus } from '../../common/enums/content-moderation'; -import { ContentModerationLevel } from '../../common/enums/gcv'; -import { JobStatus } from '../../common/enums/job'; -import { sendSlackNotification } from '../../common/utils/slack'; -import { listObjectsInBucket } from '../../common/utils/storage'; -import { JobEntity } from '../job/job.entity'; -import { JobRepository } from '../job/job.repository'; -import { ManifestService } from '../manifest/manifest.service'; -import { ContentModerationRequestEntity } from './content-moderation-request.entity'; -import { ContentModerationRequestRepository } from './content-moderation-request.repository'; -import { GCVContentModerationService } from './gcv-content-moderation.service'; - -describe('GCVContentModerationService', () => { - let service: GCVContentModerationService; - - let jobRepository: JobRepository; - let contentModerationRequestRepository: ContentModerationRequestRepository; - let slackConfigService: SlackConfigService; - let manifestService: ManifestService; - let jobEntity: JobEntity; - - const mockStorage = { - bucket: jest.fn().mockReturnValue({ - getFiles: jest.fn(), - file: jest.fn().mockReturnValue({ - createWriteStream: jest.fn(() => ({ end: jest.fn() })), - getSignedUrl: jest.fn(), - download: jest.fn(), - }), - }), - }; - const mockVisionClient = { - asyncBatchAnnotateImages: jest.fn(), - }; - - beforeAll(async () => { - (Storage as unknown as jest.Mock).mockImplementation(() => mockStorage); - (ImageAnnotatorClient as unknown as jest.Mock).mockImplementation( - () => mockVisionClient, - ); - - const module: TestingModule = await Test.createTestingModule({ - providers: [ - GCVContentModerationService, - { - provide: JobRepository, - useValue: { - updateOne: jest.fn(), - }, - }, - { - provide: ContentModerationRequestRepository, - useValue: { - findByJobId: jest.fn(), - findByJobIdAndStatus: jest.fn(), - updateOne: jest.fn(), - }, - }, - { - provide: VisionConfigService, - useValue: { - projectId: faker.string.uuid(), - privateKey: faker.string.alphanumeric(40), - clientEmail: faker.internet.email(), - moderationResultsBucket: faker.word.sample(), - moderationResultsFilesPath: faker.word.sample(), - }, - }, - { - provide: SlackConfigService, - useValue: { - abuseNotificationWebhookUrl: faker.internet.url(), - }, - }, - { - provide: ManifestService, - useValue: { - downloadManifest: jest.fn(), - }, - }, - ], - }).compile(); - service = module.get( - GCVContentModerationService, - ); - jobRepository = module.get(JobRepository); - contentModerationRequestRepository = - module.get( - ContentModerationRequestRepository, - ); - slackConfigService = module.get(SlackConfigService); - manifestService = module.get(ManifestService); - - jobEntity = { - id: faker.number.int(), - status: JobStatus.PAID, - manifestUrl: faker.internet.url(), - } as JobEntity; - }); - - afterEach(() => { - jest.clearAllMocks(); - }); - - describe('moderateJob (public)', () => { - it('should call createModerationRequests, processModerationRequests, parseModerationRequests, finalizeJob in order', async () => { - const createModerationRequestsSpy = jest - .spyOn(service, 'createModerationRequests') - .mockResolvedValueOnce(undefined); - const processModerationRequestsSpy = jest - .spyOn(service, 'processModerationRequests') - .mockResolvedValueOnce(undefined); - const parseModerationRequestsSpy = jest - .spyOn(service, 'parseModerationRequests') - .mockResolvedValueOnce(undefined); - const finalizeJobSpy = jest - .spyOn(service, 'finalizeJob') - .mockResolvedValueOnce(undefined); - - await service.moderateJob(jobEntity); - - expect(createModerationRequestsSpy).toHaveBeenCalledWith(jobEntity); - expect(processModerationRequestsSpy).toHaveBeenCalledWith(jobEntity); - expect(parseModerationRequestsSpy).toHaveBeenCalledWith(jobEntity); - expect(finalizeJobSpy).toHaveBeenCalledWith(jobEntity); - }); - - it('should propagate an error if createModerationRequests fails', async () => { - jest - .spyOn(service, 'createModerationRequests') - .mockRejectedValueOnce( - new Error('Simulated createModerationRequests error'), - ); - - await expect(service.moderateJob(jobEntity)).rejects.toThrow( - 'Simulated createModerationRequests error', - ); - }); - }); - - describe('createModerationRequests', () => { - it('should return if job status not PAID or UNDER_MODERATION', async () => { - jobEntity.status = JobStatus.CANCELED; - - await (service as any).createModerationRequests(jobEntity); - expect(jobRepository.updateOne).not.toHaveBeenCalled(); - }); - - it('should set job to MODERATION_PASSED if data_url is missing or invalid', async () => { - jobEntity.status = JobStatus.PAID; - (manifestService.downloadManifest as jest.Mock).mockResolvedValueOnce({ - data: { data_url: null }, - }); - - await (service as any).createModerationRequests(jobEntity); - expect(jobEntity.status).toBe(JobStatus.MODERATION_PASSED); - expect(jobRepository.updateOne).toHaveBeenCalledWith(jobEntity); - }); - - it('should do nothing if no valid files found in GCS', async () => { - jobEntity.status = JobStatus.PAID; - (manifestService.downloadManifest as jest.Mock).mockResolvedValueOnce({ - data: { - data_url: `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}`, - }, - }); - - (listObjectsInBucket as jest.Mock).mockResolvedValueOnce([]); - await (service as any).createModerationRequests(jobEntity); - - expect(jobRepository.updateOne).not.toHaveBeenCalled(); - }); - - it('should create new requests in PENDING and set job to UNDER_MODERATION', async () => { - jobEntity.status = JobStatus.PAID; - (manifestService.downloadManifest as jest.Mock).mockResolvedValueOnce({ - data: { - data_url: `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}`, - }, - }); - - (listObjectsInBucket as jest.Mock).mockResolvedValueOnce([ - `${faker.word.sample()}.jpg`, - `${faker.word.sample()}.jpg`, - `${faker.word.sample()}.jpg`, - ]); - ( - contentModerationRequestRepository.findByJobId as jest.Mock - ).mockResolvedValueOnce([]); - - await (service as any).createModerationRequests(jobEntity); - - expect(jobEntity.status).toBe(JobStatus.UNDER_MODERATION); - expect(jobRepository.updateOne).toHaveBeenCalledWith(jobEntity); - }); - - it('should throw if an error occurs in creation logic', async () => { - jobEntity.status = JobStatus.PAID; - (manifestService.downloadManifest as jest.Mock).mockResolvedValueOnce({ - data: { - data_url: `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}`, - }, - }); - (listObjectsInBucket as jest.Mock).mockResolvedValueOnce([ - `${faker.word.sample()}.jpg`, - `${faker.word.sample()}.jpg`, - `${faker.word.sample()}.jpg`, - ]); - ( - contentModerationRequestRepository.findByJobId as jest.Mock - ).mockRejectedValueOnce(new Error('DB error')); - - await expect( - (service as any).createModerationRequests(jobEntity), - ).rejects.toThrow('DB error'); - }); - }); - - describe('processModerationRequests', () => { - it('should process all PENDING requests (success)', async () => { - const pendingRequest = { - id: faker.number.int(), - } as ContentModerationRequestEntity; - - ( - contentModerationRequestRepository.findByJobIdAndStatus as jest.Mock - ).mockResolvedValueOnce([pendingRequest]); - const processSingleRequestSpy = jest - .spyOn(service, 'processSingleRequest') - .mockResolvedValueOnce(undefined); - - await (service as any).processModerationRequests(jobEntity); - expect(processSingleRequestSpy).toHaveBeenCalledWith(pendingRequest); - }); - - it('should mark request as FAILED if processSingleRequest throws', async () => { - const pendingRequest = { - id: faker.number.int(), - } as ContentModerationRequestEntity; - - ( - contentModerationRequestRepository.findByJobIdAndStatus as jest.Mock - ).mockResolvedValueOnce([pendingRequest]); - jest - .spyOn(service, 'processSingleRequest') - .mockRejectedValueOnce(new Error('Processing error')); - - await (service as any).processModerationRequests(jobEntity); - - expect(contentModerationRequestRepository.updateOne).toHaveBeenCalledWith( - expect.objectContaining({ - id: pendingRequest.id, - status: ContentModerationRequestStatus.FAILED, - }), - ); - }); - - it('should throw if findByJobIdAndStatus fails', async () => { - ( - contentModerationRequestRepository.findByJobIdAndStatus as jest.Mock - ).mockRejectedValueOnce(new Error('getRequests error')); - - await expect( - (service as any).processModerationRequests(jobEntity), - ).rejects.toThrow('getRequests error'); - }); - }); - - describe('parseModerationRequests', () => { - it('should parse all PROCESSED requests (success)', async () => { - const processedRequest = { - id: faker.number.int(), - } as ContentModerationRequestEntity; - - ( - contentModerationRequestRepository.findByJobIdAndStatus as jest.Mock - ).mockResolvedValueOnce([processedRequest]); - const parseSingleRequestSpy = jest - .spyOn(service, 'parseSingleRequest') - .mockResolvedValueOnce(undefined); - - await (service as any).parseModerationRequests(jobEntity); - expect(parseSingleRequestSpy).toHaveBeenCalledWith(processedRequest); - }); - - it('should mark request as FAILED if parseSingleRequest throws', async () => { - const processedRequest = { - id: faker.number.int(), - } as ContentModerationRequestEntity; - - ( - contentModerationRequestRepository.findByJobIdAndStatus as jest.Mock - ).mockResolvedValueOnce([processedRequest]); - jest - .spyOn(service, 'parseSingleRequest') - .mockRejectedValueOnce(new Error('Parsing error')); - - await (service as any).parseModerationRequests(jobEntity); - expect(contentModerationRequestRepository.updateOne).toHaveBeenCalledWith( - expect.objectContaining({ - id: processedRequest.id, - status: ContentModerationRequestStatus.FAILED, - }), - ); - }); - - it('should throw if findByJobIdAndStatus fails', async () => { - ( - contentModerationRequestRepository.findByJobIdAndStatus as jest.Mock - ).mockRejectedValueOnce(new Error('getRequests error')); - - await expect( - (service as any).parseModerationRequests(jobEntity), - ).rejects.toThrow('getRequests error'); - }); - }); - - describe('finalizeJob', () => { - it('should do nothing if any requests are still PENDING or PROCESSED', async () => { - jobEntity.contentModerationRequests = []; - ( - contentModerationRequestRepository.findByJobId as jest.Mock - ).mockResolvedValueOnce([ - { status: ContentModerationRequestStatus.PROCESSED }, - ]); - - await (service as any).finalizeJob(jobEntity); - expect(jobRepository.updateOne).not.toHaveBeenCalled(); - }); - - it('should set job to MODERATION_PASSED if all requests passed', async () => { - jobEntity.contentModerationRequests = []; - ( - contentModerationRequestRepository.findByJobId as jest.Mock - ).mockResolvedValueOnce([ - { status: ContentModerationRequestStatus.PASSED }, - { status: ContentModerationRequestStatus.PASSED }, - ]); - - await (service as any).finalizeJob(jobEntity); - expect(jobEntity.status).toBe(JobStatus.MODERATION_PASSED); - expect(jobRepository.updateOne).toHaveBeenCalledWith(jobEntity); - }); - - it('should set job to POSSIBLE_ABUSE_IN_REVIEW if any request is flagged', async () => { - jobEntity.contentModerationRequests = []; - ( - contentModerationRequestRepository.findByJobId as jest.Mock - ).mockResolvedValueOnce([ - { status: ContentModerationRequestStatus.POSITIVE_ABUSE }, - ]); - - await (service as any).finalizeJob(jobEntity); - expect(jobEntity.status).toBe(JobStatus.POSSIBLE_ABUSE_IN_REVIEW); - expect(jobRepository.updateOne).toHaveBeenCalledWith(jobEntity); - }); - - it('should throw if DB call fails', async () => { - jobEntity.contentModerationRequests = []; - ( - contentModerationRequestRepository.findByJobId as jest.Mock - ).mockRejectedValueOnce(new Error('DB error')); - - await expect((service as any).finalizeJob(jobEntity)).rejects.toThrow( - 'DB error', - ); - }); - }); - - describe('processSingleRequest', () => { - it('should slice valid files, call asyncBatchAnnotateImages, set status PROCESSED', async () => { - const fakerBucket = faker.word.sample({ length: { min: 5, max: 10 } }); - const requestEntity: ContentModerationRequestEntity = { - id: faker.number.int(), - dataUrl: `https://${fakerBucket}.storage.googleapis.com`, - from: 1, - to: 2, - job: jobEntity, - } as any; - - const file1 = `${faker.word.sample()}.jpg`; - const file2 = `${faker.word.sample()}.jpg`; - const file3 = `${faker.word.sample()}.jpg`; - jest - .spyOn(service, 'getValidFiles') - .mockResolvedValueOnce([file1, file2, file3]); - const asyncBatchSpy = jest - .spyOn(service, 'asyncBatchAnnotateImages') - .mockResolvedValueOnce(undefined); - - await (service as any).processSingleRequest(requestEntity); - - expect(asyncBatchSpy).toHaveBeenCalledWith( - [`gs://${fakerBucket}/${file1}`, `gs://${fakerBucket}/${file2}`], - `moderation-results-${requestEntity.job.id}-${requestEntity.id}`, - ); - expect(contentModerationRequestRepository.updateOne).toHaveBeenCalledWith( - expect.objectContaining({ - id: requestEntity.id, - status: ContentModerationRequestStatus.PROCESSED, - }), - ); - }); - - it('should throw if asyncBatchAnnotateImages fails', async () => { - const requestEntity: ContentModerationRequestEntity = { - id: faker.number.int(), - dataUrl: `https://${faker.word.sample({ length: { min: 5, max: 10 } })}.storage.googleapis.com`, - from: 1, - to: 2, - job: jobEntity, - } as any; - - jest - .spyOn(service, 'getValidFiles') - .mockResolvedValueOnce([`${faker.word.sample()}.jpg`]); - jest - .spyOn(service, 'asyncBatchAnnotateImages') - .mockRejectedValueOnce(new Error('Vision error')); - - await expect( - (service as any).processSingleRequest(requestEntity), - ).rejects.toThrow('Vision error'); - }); - }); - - describe('asyncBatchAnnotateImages', () => { - it('should call visionClient.asyncBatchAnnotateImages successfully', async () => { - const mockOperation = { - promise: jest.fn().mockResolvedValueOnce([ - { - outputConfig: { gcsDestination: { uri: faker.internet.url() } }, - }, - ]), - }; - mockVisionClient.asyncBatchAnnotateImages.mockResolvedValueOnce([ - mockOperation, - ]); - - await (service as any).asyncBatchAnnotateImages( - ['img1', 'img2'], - 'my-file', - ); - expect(mockVisionClient.asyncBatchAnnotateImages).toHaveBeenCalledWith( - expect.objectContaining({ requests: expect.any(Array) }), - ); - }); - - it('should throw Error if vision call fails', async () => { - mockVisionClient.asyncBatchAnnotateImages.mockRejectedValueOnce( - new Error('Vision failure'), - ); - - await expect( - (service as any).asyncBatchAnnotateImages([], 'my-file'), - ).rejects.toThrow(Error); - }); - }); - - describe('parseSingleRequest', () => { - it('should set POSITIVE_ABUSE if positiveAbuseResults found', async () => { - const requestEntity: ContentModerationRequestEntity = { - id: faker.number.int(), - job: jobEntity, - } as any; - jest - .spyOn(service, 'collectModerationResults') - .mockResolvedValueOnce([ - { imageUrl: 'abuse.jpg', moderationResult: 'adult' }, - ]); - jest - .spyOn(service, 'handleAbuseLinks') - .mockResolvedValueOnce(undefined); - - await (service as any).parseSingleRequest(requestEntity); - expect(service['handleAbuseLinks']).toHaveBeenCalled(); - expect(requestEntity.status).toBe( - ContentModerationRequestStatus.POSITIVE_ABUSE, - ); - expect(contentModerationRequestRepository.updateOne).toHaveBeenCalledWith( - expect.objectContaining({ - status: ContentModerationRequestStatus.POSITIVE_ABUSE, - }), - ); - }); - - it('should set PASSED if no abuse found', async () => { - const requestEntity = { - id: faker.number.int(), - job: jobEntity, - } as ContentModerationRequestEntity; - jest - .spyOn(service, 'collectModerationResults') - .mockResolvedValueOnce({ - positiveAbuseResults: [], - possibleAbuseResults: [], - }); - - await (service as any).parseSingleRequest(requestEntity); - expect(requestEntity.status).toBe(ContentModerationRequestStatus.PASSED); - expect(contentModerationRequestRepository.updateOne).toHaveBeenCalledWith( - expect.objectContaining({ - status: ContentModerationRequestStatus.PASSED, - }), - ); - }); - - it('should set FAILED if collectModerationResults throws', async () => { - const requestEntity = { - id: faker.number.int(), - job: jobEntity, - } as ContentModerationRequestEntity; - jest - .spyOn(service, 'collectModerationResults') - .mockRejectedValueOnce(new Error('Collect error')); - - await expect( - (service as any).parseSingleRequest(requestEntity), - ).rejects.toThrow('Collect error'); - expect(requestEntity.status).toBe(ContentModerationRequestStatus.FAILED); - }); - }); - - describe('collectModerationResults', () => { - it('should throw ControlledError if no GCS files found', async () => { - (mockStorage.bucket as any).mockReturnValueOnce({ - getFiles: jest.fn().mockResolvedValueOnce([]), - }); - - await expect( - (service as any).collectModerationResults('some-file'), - ).rejects.toThrow(ErrorContentModeration.NoResultsFound); - }); - - it('should parse each file and accumulate responses, then categorize', async () => { - (mockStorage.bucket as any).mockReturnValueOnce({ - getFiles: jest.fn().mockResolvedValueOnce([ - [ - { - name: `${faker.word.sample()}.json`, - download: jest.fn().mockResolvedValueOnce([ - Buffer.from( - JSON.stringify({ - responses: [ - { - safeSearchAnnotation: { - adult: ContentModerationLevel.LIKELY, - }, - }, - ], - }), - ), - ]), - }, - { - name: `${faker.word.sample()}.json`, - download: jest.fn().mockResolvedValueOnce([ - Buffer.from( - JSON.stringify({ - responses: [ - { - safeSearchAnnotation: { - violence: ContentModerationLevel.POSSIBLE, - }, - }, - ], - }), - ), - ]), - }, - ], - ]), - }); - - jest - .spyOn(service, 'categorizeModerationResults') - .mockReturnValueOnce({ - positiveAbuseResults: [], - possibleAbuseResults: [], - }); - - const result = await (service as any).collectModerationResults( - faker.word.sample(), - ); - expect((service as any).categorizeModerationResults).toHaveBeenCalledWith( - expect.arrayContaining([ - { safeSearchAnnotation: { adult: ContentModerationLevel.LIKELY } }, - { - safeSearchAnnotation: { violence: ContentModerationLevel.POSSIBLE }, - }, - ]), - ); - expect(result).toHaveProperty('positiveAbuseResults'); - expect(result).toHaveProperty('possibleAbuseResults'); - }); - - it('should throw ControlledError if an error occurs', async () => { - (mockStorage.bucket as any).mockReturnValueOnce({ - getFiles: jest.fn().mockRejectedValueOnce(new Error('GCS error')), - }); - - await expect( - (service as any).collectModerationResults(faker.word.sample()), - ).rejects.toThrow(ErrorContentModeration.ResultsParsingFailed); - }); - }); - - describe('categorizeModerationResults', () => { - it('should split results into positiveAbuse and possibleAbuse', () => { - const responses = [ - { - safeSearchAnnotation: { adult: ContentModerationLevel.LIKELY }, - context: { - uri: `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}/${faker.word.sample()}`, - }, - }, - { - safeSearchAnnotation: { violence: ContentModerationLevel.POSSIBLE }, - context: { - uri: `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}/${faker.word.sample()}`, - }, - }, - ]; - const results = (service as any).categorizeModerationResults(responses); - expect(results).toHaveLength(2); - expect(results[0]).toHaveProperty('imageUrl'); - expect(results[0]).toHaveProperty('moderationResult'); - expect(results[1]).toHaveProperty('imageUrl'); - expect(results[1]).toHaveProperty('moderationResult'); - expect(results[0].moderationResult).toBe('adult'); - expect(results[1].moderationResult).toBe('violence'); - }); - - it('should ignore entries with no safeSearchAnnotation', () => { - const responses = [ - { - safeSearchAnnotation: null, - context: { - uri: `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}/${faker.word.sample()}`, - }, - }, - ]; - const results = (service as any).categorizeModerationResults(responses); - expect(results).toHaveLength(0); - }); - }); - - describe('handleAbuseLinks', () => { - it('should upload text file and send Slack message for confirmed abuse', async () => { - const mockSignedUrl = faker.internet.url(); - (mockStorage.bucket as any).mockReturnValueOnce({ - file: jest.fn().mockReturnValueOnce({ - createWriteStream: jest.fn(() => ({ end: jest.fn() })), - getSignedUrl: jest.fn().mockResolvedValueOnce([mockSignedUrl]), - }), - }); - - await (service as any).handleAbuseLinks( - [faker.internet.url()], - faker.word.sample(), - faker.number.int(), - faker.number.int(), - true, - ); - expect(sendSlackNotification).toHaveBeenCalledWith( - slackConfigService.abuseNotificationWebhookUrl, - expect.stringContaining(mockSignedUrl), - ); - }); - - it('should handle possible abuse similarly', async () => { - const mockSignedUrl = faker.internet.url(); - (mockStorage.bucket as any).mockReturnValueOnce({ - file: jest.fn().mockReturnValueOnce({ - createWriteStream: jest.fn(() => ({ end: jest.fn() })), - getSignedUrl: jest.fn().mockResolvedValueOnce([mockSignedUrl]), - }), - }); - - await (service as any).handleAbuseLinks( - [faker.internet.url()], - faker.word.sample(), - faker.number.int(), - faker.number.int(), - false, - ); - expect(sendSlackNotification).toHaveBeenCalledWith( - slackConfigService.abuseNotificationWebhookUrl, - expect.stringContaining(mockSignedUrl), - ); - }); - - it('should throw if getSignedUrl fails', async () => { - (mockStorage.bucket as any).mockReturnValueOnce({ - file: jest.fn().mockReturnValueOnce({ - createWriteStream: jest.fn(() => ({ end: jest.fn() })), - getSignedUrl: jest - .fn() - .mockRejectedValueOnce(new Error('Signed URL error')), - }), - }); - - await expect( - (service as any).handleAbuseLinks( - [], - faker.word.sample(), - faker.number.int(), - faker.number.int(), - true, - ), - ).rejects.toThrow('Signed URL error'); - }); - }); - - describe('getValidFiles', () => { - it('should return cached files if present', async () => { - const dataUrl = `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}/data`; - const file1 = `${faker.word.sample()}.jpg`; - const file2 = `${faker.word.sample()}.png`; - (service as any).bucketListCache.set(dataUrl, [file1, file2]); - - const result = await (service as any).getValidFiles(dataUrl); - expect(result).toEqual([file1, file2]); - expect(listObjectsInBucket).not.toHaveBeenCalled(); - }); - - it('should fetch from GCS if not cached, filter out directories, and cache', async () => { - const dataUrl = `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}/data`; - const file1 = `${faker.word.sample()}.jpg`; - const file2 = `${faker.word.sample()}.png`; - (listObjectsInBucket as jest.Mock).mockResolvedValueOnce([ - file1, - 'subdir/', - file2, - ]); - - const result = await (service as any).getValidFiles(dataUrl); - expect(result).toEqual([file1, file2]); - - expect((service as any).bucketListCache.get(dataUrl)).toEqual(result); - }); - - it('should throw if listObjectsInBucket fails', async () => { - const dataUrl = `gs://${faker.word.sample({ length: { min: 5, max: 10 } })}/fail`; - (listObjectsInBucket as jest.Mock).mockRejectedValueOnce( - new Error('List objects error'), - ); - - await expect((service as any).getValidFiles(dataUrl)).rejects.toThrow( - 'List objects error', - ); - }); - }); -}); diff --git a/packages/apps/job-launcher/server/src/modules/content-moderation/gcv-content-moderation.service.ts b/packages/apps/job-launcher/server/src/modules/content-moderation/gcv-content-moderation.service.ts deleted file mode 100644 index f1a422c51d..0000000000 --- a/packages/apps/job-launcher/server/src/modules/content-moderation/gcv-content-moderation.service.ts +++ /dev/null @@ -1,507 +0,0 @@ -import { Storage } from '@google-cloud/storage'; -import { ImageAnnotatorClient, protos } from '@google-cloud/vision'; -import { Injectable } from '@nestjs/common'; -import NodeCache from 'node-cache'; -import { SlackConfigService } from '../../common/config/slack-config.service'; -import { VisionConfigService } from '../../common/config/vision-config.service'; -import { - GCV_CONTENT_MODERATION_ASYNC_BATCH_SIZE, - GCV_CONTENT_MODERATION_BATCH_SIZE_PER_TASK, -} from '../../common/constants'; -import { ErrorContentModeration } from '../../common/constants/errors'; -import { ContentModerationRequestStatus } from '../../common/enums/content-moderation'; -import { - ContentModerationFeature, - ContentModerationLevel, -} from '../../common/enums/gcv'; -import { JobStatus } from '../../common/enums/job'; -import { - constructGcsPath, - convertToGCSPath, - convertToHttpUrl, - isGCSBucketUrl, -} from '../../common/utils/gcstorage'; -import { sendSlackNotification } from '../../common/utils/slack'; -import { listObjectsInBucket } from '../../common/utils/storage'; -import { JobEntity } from '../job/job.entity'; -import { JobRepository } from '../job/job.repository'; -import { CvatManifestDto } from '../manifest/manifest.dto'; -import { ManifestService } from '../manifest/manifest.service'; -import { ContentModerationRequestEntity } from './content-moderation-request.entity'; -import { ContentModerationRequestRepository } from './content-moderation-request.repository'; -import { ModerationResultDto } from './content-moderation.dto'; -import { IContentModeratorService } from './content-moderation.interface'; -import logger from '../../logger'; - -@Injectable() -export class GCVContentModerationService implements IContentModeratorService { - private readonly logger = logger.child({ - context: GCVContentModerationService.name, - }); - - private visionClient: ImageAnnotatorClient; - private storage: Storage; - - /** - * Cache of GCS object listings by dataUrl - * Key: dataUrl string, Value: array of valid file names - */ - private bucketListCache: NodeCache; - - constructor( - private readonly jobRepository: JobRepository, - private readonly contentModerationRequestRepository: ContentModerationRequestRepository, - private readonly visionConfigService: VisionConfigService, - private readonly slackConfigService: SlackConfigService, - private readonly manifestService: ManifestService, - ) { - this.visionClient = new ImageAnnotatorClient({ - projectId: this.visionConfigService.projectId, - credentials: { - private_key: this.visionConfigService.privateKey, - client_email: this.visionConfigService.clientEmail, - }, - }); - - this.storage = new Storage({ - projectId: this.visionConfigService.projectId, - credentials: { - private_key: this.visionConfigService.privateKey, - client_email: this.visionConfigService.clientEmail, - }, - }); - - // Initialize cache with expiration time of 60 minutes and check period of 15 minutes - this.bucketListCache = new NodeCache({ - stdTTL: 30 * 60, - checkperiod: 15 * 60, - }); - } - - /** - * Single public method orchestrating all steps in order - */ - public async moderateJob(jobEntity: JobEntity): Promise { - await this.createModerationRequests(jobEntity); - await this.processModerationRequests(jobEntity); - await this.parseModerationRequests(jobEntity); - await this.finalizeJob(jobEntity); - } - - /** - * 1) If no requests exist for this job, create them in PENDING. - */ - private async createModerationRequests(jobEntity: JobEntity): Promise { - if ( - jobEntity.status !== JobStatus.PAID && - jobEntity.status !== JobStatus.UNDER_MODERATION - ) { - return; - } - - try { - const manifest = (await this.manifestService.downloadManifest( - jobEntity.manifestUrl, - jobEntity.requestType, - )) as CvatManifestDto; - const dataUrl = manifest?.data?.data_url; - - if (!dataUrl || !isGCSBucketUrl(dataUrl)) { - jobEntity.status = JobStatus.MODERATION_PASSED; - await this.jobRepository.updateOne(jobEntity); - return; - } - - const validFiles = await this.getValidFiles(dataUrl); - if (validFiles.length === 0) return; - - const existingRequests = - await this.contentModerationRequestRepository.findByJobId(jobEntity.id); - - const newRequests: ContentModerationRequestEntity[] = []; - - for ( - let i = 0; - i < validFiles.length; - i += GCV_CONTENT_MODERATION_BATCH_SIZE_PER_TASK - ) { - const from = i + 1; - const to = Math.min( - i + GCV_CONTENT_MODERATION_BATCH_SIZE_PER_TASK, - validFiles.length, - ); - - const request = existingRequests.some( - (req) => req.from === from && req.to === to, - ); - - if (!request) { - newRequests.push( - Object.assign(new ContentModerationRequestEntity(), { - dataUrl, - from, - to, - status: ContentModerationRequestStatus.PENDING, - job: jobEntity, - }), - ); - } - } - - if (newRequests.length > 0) { - jobEntity.contentModerationRequests = [ - ...(jobEntity.contentModerationRequests || []), - ...newRequests, - ]; - jobEntity.status = JobStatus.UNDER_MODERATION; - await this.jobRepository.updateOne(jobEntity); - } - } catch (error) { - this.logger.error('Error creating requests for job', { - error, - jobId: jobEntity.id, - }); - throw error; - } - } - - /** - * 2) Process all PENDING requests -> call GCV. Mark them PROCESSED if success. - * Parallelized with Promise.all for performance. - */ - private async processModerationRequests(jobEntity: JobEntity): Promise { - try { - const requests = - await this.contentModerationRequestRepository.findByJobIdAndStatus( - jobEntity.id, - ContentModerationRequestStatus.PENDING, - ); - await Promise.all( - requests.map(async (requestEntity) => { - try { - await this.processSingleRequest(requestEntity); - } catch (error) { - this.logger.error('Error processing moderation request', { - moderationRequestId: requestEntity.id, - jobId: jobEntity.id, - error, - }); - - requestEntity.status = ContentModerationRequestStatus.FAILED; - await this.contentModerationRequestRepository.updateOne( - requestEntity, - ); - } - }), - ); - } catch (error) { - this.logger.error('Error processing moderation requests', { - error, - jobId: jobEntity.id, - }); - - throw error; - } - } - - /** - * 3) Parse results for requests in PROCESSED -> set to PASSED, POSSIBLE_ABUSE, or POSITIVE_ABUSE - * Also parallelized with Promise.all. - */ - private async parseModerationRequests(jobEntity: JobEntity): Promise { - try { - const requests = - await this.contentModerationRequestRepository.findByJobIdAndStatus( - jobEntity.id, - ContentModerationRequestStatus.PROCESSED, - ); - - await Promise.all( - requests.map(async (requestEntity) => { - try { - await this.parseSingleRequest(requestEntity); - } catch (error) { - this.logger.error('Error parsing moderation request', { - moderationRequestId: requestEntity.id, - jobId: jobEntity.id, - error, - }); - - requestEntity.status = ContentModerationRequestStatus.FAILED; - await this.contentModerationRequestRepository.updateOne( - requestEntity, - ); - } - }), - ); - } catch (error) { - this.logger.error('Error parsing moderation results', { - jobId: jobEntity.id, - error, - }); - throw error; - } - } - - /** - * 4) If all requests are done, set job to MODERATION_PASSED or POSSIBLE_ABUSE_IN_REVIEW - */ - private async finalizeJob(jobEntity: JobEntity): Promise { - try { - // We'll try to use the jobEntity if it has requests loaded. Otherwise, fallback to DB. - const allRequests = jobEntity.contentModerationRequests?.length - ? jobEntity.contentModerationRequests - : await this.contentModerationRequestRepository.findByJobId( - jobEntity.id, - ); - - const incomplete = allRequests.some( - (r) => - r.status === ContentModerationRequestStatus.PENDING || - r.status === ContentModerationRequestStatus.PROCESSED, - ); - if (incomplete) return; - - let allPassed = true; - for (const req of allRequests) { - if ( - req.status === ContentModerationRequestStatus.FAILED || - req.status === ContentModerationRequestStatus.POSITIVE_ABUSE - ) { - allPassed = false; - } - } - - if (allPassed) { - jobEntity.status = JobStatus.MODERATION_PASSED; - await this.jobRepository.updateOne(jobEntity); - } else { - jobEntity.status = JobStatus.POSSIBLE_ABUSE_IN_REVIEW; - await this.jobRepository.updateOne(jobEntity); - } - } catch (error) { - this.logger.error('Error finalizing moderation job', { - jobId: jobEntity.id, - error, - }); - throw error; - } - } - - /** - * Actually calls GCV. Mark requestEntity => PROCESSED on success. - */ - private async processSingleRequest( - requestEntity: ContentModerationRequestEntity, - ): Promise { - const validFiles = await this.getValidFiles(requestEntity.dataUrl); - const filesToProcess = validFiles.slice( - requestEntity.from - 1, - requestEntity.to, - ); - const gcDataUrl = convertToGCSPath(requestEntity.dataUrl); - const imageUrls = filesToProcess.map( - (fileName) => `${gcDataUrl}/${fileName.split('/').pop()}`, - ); - - const fileName = `moderation-results-${requestEntity.job.id}-${requestEntity.id}`; - - await this.asyncBatchAnnotateImages(imageUrls, fileName); - - requestEntity.status = ContentModerationRequestStatus.PROCESSED; - await this.contentModerationRequestRepository.updateOne(requestEntity); - } - - /** - * Calls GCV's asyncBatchAnnotateImages with SAFE_SEARCH_DETECTION - */ - private async asyncBatchAnnotateImages( - imageUrls: string[], - fileName: string, - ): Promise { - const request = imageUrls.map((url) => ({ - image: { source: { imageUri: url } }, - features: [{ type: ContentModerationFeature.SAFE_SEARCH_DETECTION }], - })); - - const outputUri = constructGcsPath( - this.visionConfigService.moderationResultsBucket, - this.visionConfigService.moderationResultsFilesPath, - fileName + '-', - ); - - const requestPayload: protos.google.cloud.vision.v1.IAsyncBatchAnnotateImagesRequest = - { - requests: request, - outputConfig: { - gcsDestination: { uri: outputUri }, - batchSize: GCV_CONTENT_MODERATION_ASYNC_BATCH_SIZE, - }, - }; - - try { - const [operation] = - await this.visionClient.asyncBatchAnnotateImages(requestPayload); - const [filesResponse] = await operation.promise(); - this.logger.debug('Output written to GCS', { - url: filesResponse?.outputConfig?.gcsDestination?.uri, - }); - } catch (error) { - this.logger.error('Error analyzing images', error); - throw new Error(ErrorContentModeration.ContentModerationFailed); - } - } - - /** - * Parse a single PROCESSED request => sets it to PASSED or POSITIVE_ABUSE - */ - private async parseSingleRequest( - requestEntity: ContentModerationRequestEntity, - ): Promise { - try { - const fileName = `moderation-results-${requestEntity.job.id}-${requestEntity.id}`; - const moderationResults = await this.collectModerationResults(fileName); - - if (moderationResults.length > 0) { - await this.handleAbuseLinks( - moderationResults, - fileName, - requestEntity.id, - requestEntity.job.id, - ); - requestEntity.status = ContentModerationRequestStatus.POSITIVE_ABUSE; - } else { - requestEntity.status = ContentModerationRequestStatus.PASSED; - } - } catch (err) { - requestEntity.status = ContentModerationRequestStatus.FAILED; - throw err; - } - await this.contentModerationRequestRepository.updateOne(requestEntity); - } - - /** - * Downloads GCS results, categorizes them into positiveAbuse / possibleAbuse - */ - private async collectModerationResults(fileName: string) { - try { - const bucketPrefix = `${this.visionConfigService.moderationResultsFilesPath}/${fileName}`; - const bucketName = this.visionConfigService.moderationResultsBucket; - const bucket = this.storage.bucket(bucketName); - - const [files] = await bucket.getFiles({ prefix: bucketPrefix }); - if (!files || files.length === 0) { - throw new Error(ErrorContentModeration.NoResultsFound); - } - - const allResponses = []; - for (const file of files) { - const [content] = await file.download(); - const jsonString = content.toString('utf-8'); - const parsed = JSON.parse(jsonString); - - if (Array.isArray(parsed.responses)) { - allResponses.push(...parsed.responses); - } - } - return this.categorizeModerationResults(allResponses); - } catch (error) { - if (error.message === ErrorContentModeration.NoResultsFound) { - throw error; - } - this.logger.error('Error collecting moderation results', error); - throw new Error(ErrorContentModeration.ResultsParsingFailed); - } - } - - /** - * Processes the results from the Google Cloud Vision API and categorizes them based on moderation levels - */ - private categorizeModerationResults( - results: protos.google.cloud.vision.v1.IAnnotateImageResponse[], - ) { - const relevantLevels = [ - ContentModerationLevel.VERY_LIKELY, - ContentModerationLevel.LIKELY, - ContentModerationLevel.POSSIBLE, - ]; - - return results - .map((response) => { - const safeSearch = response.safeSearchAnnotation as ModerationResultDto; - if (!safeSearch) return null; - - const imageUrl = convertToHttpUrl(response.context?.uri ?? ''); - - const flaggedCategory = Object.keys(new ModerationResultDto()).find( - (field) => - relevantLevels.includes( - safeSearch[ - field as keyof ModerationResultDto - ] as ContentModerationLevel, - ), - ); - - if (!flaggedCategory) { - return null; - } - - return { - imageUrl, - moderationResult: flaggedCategory, - }; - }) - - .filter( - (item): item is { imageUrl: string; moderationResult: string } => - !!item, - ); - } - - /** - * Uploads a small text file listing the abuse-related images, then sends Slack notification - */ - private async handleAbuseLinks( - images: { - imageUrl: string; - moderationResult: string; - }[], - fileName: string, - requestId: number, - jobId: number, - ): Promise { - const bucketName = this.visionConfigService.moderationResultsBucket; - const resultsFileName = `${fileName}.txt`; - const file = this.storage.bucket(bucketName).file(resultsFileName); - const stream = file.createWriteStream({ resumable: false }); - stream.end(JSON.stringify(images)); - - const [signedUrl] = await file.getSignedUrl({ - action: 'read', - expires: Date.now() + 60 * 60 * 24 * 1000, - }); - const consoleUrl = `https://console.cloud.google.com/storage/browser/${bucketName}?prefix=${resultsFileName}`; - const message = `Images may contain abusive content. Request ${requestId}, job ${jobId}.\n\n**Results File:** <${signedUrl}|Download Here>\n**Google Cloud Console:** <${consoleUrl}|View in Console>\n\nEnsure you download the file before the link expires, or access it directly via GCS.`; - - await sendSlackNotification( - this.slackConfigService.abuseNotificationWebhookUrl, - message, - ); - } - - /** - * Caches GCS object listings so we don't repeatedly call listObjectsInBucket for the same dataUrl - */ - private async getValidFiles(dataUrl: string): Promise { - const cacheEntry = this.bucketListCache.get(dataUrl); - if (cacheEntry) { - return cacheEntry; - } - - const allFiles = await listObjectsInBucket(new URL(dataUrl)); - const validFiles = allFiles.filter((f) => f && !f.endsWith('/')); - this.bucketListCache.set(dataUrl, validFiles); - - return validFiles; - } -} diff --git a/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.module.ts b/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.module.ts index 97c54ae941..f51bd37967 100644 --- a/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.module.ts +++ b/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.module.ts @@ -12,14 +12,12 @@ import { WebhookRepository } from '../webhook/webhook.repository'; import { JobEntity } from '../job/job.entity'; import { JobRepository } from '../job/job.repository'; import { ConfigModule } from '@nestjs/config'; -import { ContentModerationModule } from '../content-moderation/content-moderation.module'; @Global() @Module({ imports: [ TypeOrmModule.forFeature([CronJobEntity, JobEntity]), ConfigModule, - ContentModerationModule, JobModule, PaymentModule, Web3Module, diff --git a/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.spec.ts b/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.spec.ts index 660fab11e1..530b86e3e5 100644 --- a/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.spec.ts +++ b/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.spec.ts @@ -27,13 +27,8 @@ import { } from '../../../test/constants'; import { NetworkConfigService } from '../../common/config/network-config.service'; import { ServerConfigService } from '../../common/config/server-config.service'; -import { SlackConfigService } from '../../common/config/slack-config.service'; -import { VisionConfigService } from '../../common/config/vision-config.service'; import { Web3ConfigService } from '../../common/config/web3-config.service'; -import { - ErrorContentModeration, - ErrorCronJob, -} from '../../common/constants/errors'; +import { ErrorCronJob } from '../../common/constants/errors'; import { CronJobType } from '../../common/enums/cron-job'; import { CvatJobType, @@ -44,8 +39,6 @@ import { import { WebhookStatus } from '../../common/enums/webhook'; import { ConflictError } from '../../common/errors'; import logger from '../../logger'; -import { ContentModerationRequestRepository } from '../content-moderation/content-moderation-request.repository'; -import { GCVContentModerationService } from '../content-moderation/gcv-content-moderation.service'; import { JobEntity } from '../job/job.entity'; import { JobRepository } from '../job/job.repository'; import { JobService } from '../job/job.service'; @@ -77,7 +70,6 @@ describe('CronJobService', () => { storageService: StorageService, jobService: JobService, paymentService: PaymentService, - contentModerationService: GCVContentModerationService, jobRepository: JobRepository; const signerMock = { @@ -111,23 +103,11 @@ describe('CronJobService', () => { }, }, JobService, - GCVContentModerationService, WebhookService, Encryption, ServerConfigService, Web3ConfigService, NetworkConfigService, - { - provide: VisionConfigService, - useValue: { - projectId: 'test-project-id', - privateKey: 'test-private-key', - clientEmail: 'test-client-email', - tempAsyncResultsBucket: 'test-temp-bucket', - moderationResultsBucket: 'test-moderation-results-bucket', - }, - }, - SlackConfigService, QualificationService, { provide: NetworkConfigService, @@ -136,10 +116,6 @@ describe('CronJobService', () => { }, }, { provide: JobRepository, useValue: createMock() }, - { - provide: ContentModerationRequestRepository, - useValue: createMock(), - }, { provide: PaymentRepository, useValue: createMock(), @@ -170,9 +146,6 @@ describe('CronJobService', () => { service = module.get(CronJobService); // paymentService = module.get(PaymentService); - contentModerationService = module.get( - GCVContentModerationService, - ); jobService = module.get(JobService); jobRepository = module.get(JobRepository); paymentService = module.get(PaymentService); @@ -758,112 +731,6 @@ describe('CronJobService', () => { }); }); - describe('moderateContentCronJob', () => { - let contentModerationMock: any; - let cronJobEntityMock: Partial; - let jobEntity1: Partial, jobEntity2: Partial; - - beforeEach(() => { - cronJobEntityMock = { - cronJobType: CronJobType.ContentModeration, - startedAt: new Date(), - }; - - jobEntity1 = { - id: 1, - status: JobStatus.PAID, - }; - - jobEntity2 = { - id: 2, - status: JobStatus.PAID, - }; - - jest - .spyOn(jobRepository, 'findByStatus') - .mockResolvedValue([jobEntity1 as any, jobEntity2 as any]); - - contentModerationMock = jest.spyOn( - contentModerationService, - 'moderateJob', - ); - contentModerationMock.mockResolvedValue(true); - - jest.spyOn(service, 'isCronJobRunning').mockResolvedValue(false); - - jest.spyOn(repository, 'findOneByType').mockResolvedValue(null); - jest - .spyOn(repository, 'createUnique') - .mockResolvedValue(cronJobEntityMock as any); - }); - - afterEach(() => { - jest.restoreAllMocks(); - }); - - it('should not run if cron job is already running', async () => { - jest.spyOn(service, 'isCronJobRunning').mockResolvedValueOnce(true); - - const startCronJobMock = jest.spyOn(service, 'startCronJob'); - - await service.moderateContentCronJob(); - - expect(startCronJobMock).not.toHaveBeenCalled(); - }); - - it('should create a cron job entity to lock the process', async () => { - jest - .spyOn(service, 'startCronJob') - .mockResolvedValueOnce(cronJobEntityMock as any); - - await service.moderateContentCronJob(); - - expect(service.startCronJob).toHaveBeenCalledWith( - CronJobType.ContentModeration, - ); - }); - - it('should process all jobs with status PAID', async () => { - await service.moderateContentCronJob(); - - expect(contentModerationMock).toHaveBeenCalledTimes(2); - expect(contentModerationMock).toHaveBeenCalledWith(jobEntity1); - expect(contentModerationMock).toHaveBeenCalledWith(jobEntity2); - }); - - it('should handle failed moderation attempts', async () => { - const error = new Error('Moderation failed'); - contentModerationMock.mockRejectedValueOnce(error); - - const handleFailureMock = jest.spyOn( - jobService, - 'handleProcessJobFailure', - ); - - await service.moderateContentCronJob(); - - expect(handleFailureMock).toHaveBeenCalledTimes(1); - expect(handleFailureMock).toHaveBeenCalledWith( - jobEntity1, - expect.stringContaining(ErrorContentModeration.ResultsParsingFailed), - ); - expect(handleFailureMock).not.toHaveBeenCalledWith( - jobEntity2, - expect.anything(), - ); - }); - - it('should complete the cron job entity to unlock', async () => { - jest - .spyOn(service, 'completeCronJob') - .mockResolvedValueOnce(cronJobEntityMock as any); - - await service.moderateContentCronJob(); - - expect(service.completeCronJob).toHaveBeenCalledWith(cronJobEntityMock); - }); - }); - describe('syncJobStatuses Cron Job', () => { let cronJobEntityMock: Partial; let jobEntityMock: Partial; diff --git a/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.ts b/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.ts index f41f759733..8034f103b1 100644 --- a/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.ts +++ b/packages/apps/job-launcher/server/src/modules/cron-job/cron-job.service.ts @@ -1,6 +1,5 @@ import { Injectable } from '@nestjs/common'; import { - ErrorContentModeration, ErrorCronJob, ErrorEscrow, ErrorJob, @@ -19,7 +18,6 @@ import { } from '../../common/enums/webhook'; import { ConflictError, NotFoundError } from '../../common/errors'; import logger from '../../logger'; -import { GCVContentModerationService } from '../content-moderation/gcv-content-moderation.service'; import { JobEntity } from '../job/job.entity'; import { JobRepository } from '../job/job.repository'; import { JobService } from '../job/job.service'; @@ -39,7 +37,6 @@ export class CronJobService { private readonly cronJobRepository: CronJobRepository, private readonly jobService: JobService, private readonly jobRepository: JobRepository, - private readonly contentModerationService: GCVContentModerationService, private readonly webhookService: WebhookService, private readonly web3Service: Web3Service, private readonly paymentService: PaymentService, @@ -82,45 +79,6 @@ export class CronJobService { return this.cronJobRepository.updateOne(cronJobEntity); } - @Cron('*/2 * * * *') - public async moderateContentCronJob() { - if (await this.isCronJobRunning(CronJobType.ContentModeration)) { - return; - } - - const cronJobEntity = await this.startCronJob( - CronJobType.ContentModeration, - ); - - try { - const jobs = await this.jobRepository.findByStatus([ - JobStatus.PAID, - JobStatus.UNDER_MODERATION, - ]); - - await Promise.all( - jobs.map(async (jobEntity) => { - try { - await this.contentModerationService.moderateJob(jobEntity); - } catch (error) { - this.logger.error('Error parse job moderation results job', { - jobId: jobEntity.id, - error, - }); - await this.jobService.handleProcessJobFailure( - jobEntity, - ErrorContentModeration.ResultsParsingFailed, - ); - } - }), - ); - } catch (error) { - this.logger.error('Error in moderateContentCronJob', error); - } - - await this.completeCronJob(cronJobEntity); - } - @Cron('*/2 * * * *') public async createEscrowCronJob() { const isCronJobRunning = await this.isCronJobRunning( @@ -135,9 +93,7 @@ export class CronJobService { const cronJob = await this.startCronJob(CronJobType.CreateEscrow); try { - const jobEntities = await this.jobRepository.findByStatus( - JobStatus.MODERATION_PASSED, - ); + const jobEntities = await this.jobRepository.findByStatus(JobStatus.PAID); for (const jobEntity of jobEntities) { try { await this.jobService.createEscrow(jobEntity); diff --git a/packages/apps/job-launcher/server/src/modules/job/fixtures.ts b/packages/apps/job-launcher/server/src/modules/job/fixtures.ts index cf8f0ddf88..e2f9686b1b 100644 --- a/packages/apps/job-launcher/server/src/modules/job/fixtures.ts +++ b/packages/apps/job-launcher/server/src/modules/job/fixtures.ts @@ -1,16 +1,8 @@ import { faker } from '@faker-js/faker'; import { ChainId } from '@human-protocol/sdk'; -import { - getMockedProvider, - getMockedRegion, -} from '../../../test/fixtures/storage'; -import { - CvatJobType, - EscrowFundToken, - FortuneJobType, -} from '../../common/enums/job'; +import { EscrowFundToken, FortuneJobType } from '../../common/enums/job'; import { PaymentCurrency } from '../../common/enums/payment'; -import { JobCvatDto, JobFortuneDto } from './job.dto'; +import { JobFortuneDto } from './job.dto'; import { JobEntity } from './job.entity'; import { JobStatus } from '../../common/enums/job'; @@ -36,36 +28,6 @@ export const createFortuneJobDto = (overrides = {}): JobFortuneDto => ({ ...overrides, }); -export const createCvatJobDto = (overrides = {}): JobCvatDto => ({ - chainId: ChainId.POLYGON_AMOY, - data: { - dataset: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - }, - labels: [{ name: faker.lorem.word(), nodes: [faker.string.uuid()] }], - requesterDescription: faker.lorem.sentence(), - userGuide: faker.internet.url(), - minQuality: faker.number.float({ min: 0.1, max: 1 }), - groundTruth: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - type: faker.helpers.arrayElement(Object.values(CvatJobType)), - paymentCurrency: faker.helpers.arrayElement(paymentCurrencies), - paymentAmount: faker.number.int({ min: 1, max: 1000 }), - escrowFundToken: faker.helpers.arrayElement(escrowFundTokens), - exchangeOracle: faker.finance.ethereumAddress(), - recordingOracle: faker.finance.ethereumAddress(), - reputationOracle: faker.finance.ethereumAddress(), - ...overrides, -}); - export const createJobEntity = ( overrides: Partial = {}, ): JobEntity => { @@ -93,7 +55,6 @@ export const createJobEntity = ( entity.status = faker.helpers.arrayElement(Object.values(JobStatus)); entity.userId = faker.number.int(); entity.payments = []; - entity.contentModerationRequests = []; entity.retriesCount = faker.number.int({ min: 0, max: 4 }); entity.waitUntil = faker.date.future(); Object.assign(entity, overrides); diff --git a/packages/apps/job-launcher/server/src/modules/job/job.controller.ts b/packages/apps/job-launcher/server/src/modules/job/job.controller.ts index bc7ae37fb9..e6f2c0a6c9 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.controller.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.controller.ts @@ -32,7 +32,6 @@ import { FortuneFinalResultDto, GetJobsDto, JobCancelDto, - JobCvatDto, JobDetailsDto, JobFortuneDto, JobIdDto, @@ -138,43 +137,6 @@ export class JobController { ); } - @ApiOperation({ - summary: 'Create a CVAT job', - description: 'Endpoint to create a new CVAT job.', - }) - @ApiBody({ type: JobCvatDto }) - @ApiResponse({ - status: 201, - description: 'ID of the created CVAT job.', - type: Number, - }) - @ApiResponse({ - status: 400, - description: 'Bad Request. Invalid input parameters.', - }) - @ApiResponse({ - status: 401, - description: 'Unauthorized. Missing or invalid credentials.', - }) - @ApiResponse({ - status: 409, - description: 'Conflict. Conflict with the current state of the server.', - }) - @Post('/cvat') - public async createCvatJob( - @Body() data: JobCvatDto, - @Request() req: RequestWithUser, - ): Promise { - throw new ForbiddenError('Disabled'); - return await this.mutexManagerService.runExclusive( - `user${req.user.id}`, - MUTEX_TIMEOUT, - async () => { - return await this.jobService.createJob(req.user, data.type, data); - }, - ); - } - @ApiOperation({ summary: 'Get a list of jobs', description: diff --git a/packages/apps/job-launcher/server/src/modules/job/job.dto.ts b/packages/apps/job-launcher/server/src/modules/job/job.dto.ts index 729fcb3ccb..919396f901 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.dto.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.dto.ts @@ -2,25 +2,21 @@ import { ChainId } from '@human-protocol/sdk'; import { ApiProperty, ApiPropertyOptional } from '@nestjs/swagger'; import { Transform, Type } from 'class-transformer'; import { - ArrayMinSize, IsArray, IsEthereumAddress, IsIn, IsNotEmpty, IsNumber, IsNumberString, - IsObject, IsOptional, IsPositive, IsString, - IsUrl, - Max, Min, + IsUrl, ValidateNested, } from 'class-validator'; import { IsEnumCaseInsensitive } from '../../common/decorators'; import { - CvatJobType, EscrowFundToken, JobRequestType, JobSortField, @@ -33,7 +29,7 @@ import { AWSRegions, StorageProviders } from '../../common/enums/storage'; import { PageOptionsDto } from '../../common/pagination/pagination.dto'; import { IsValidTokenDecimals } from '../../common/validators/token-decimals'; import { IsValidToken } from '../../common/validators/tokens'; -import { Label, ManifestDetails } from '../manifest/manifest.dto'; +import { ManifestDetails } from '../manifest/manifest.dto'; export class JobDto { @ApiProperty({ enum: ChainId, required: false, name: 'chain_id' }) @@ -145,68 +141,6 @@ export class StorageDataDto { public path?: string; } -export class CvatDataDto { - @ApiProperty() - @IsObject() - @ValidateNested() - @Type(() => StorageDataDto) - public dataset: StorageDataDto; - - @ApiPropertyOptional() - @IsObject() - @IsOptional() - @ValidateNested() - @Type(() => StorageDataDto) - public points?: StorageDataDto; - - @ApiPropertyOptional() - @IsObject() - @IsOptional() - @ValidateNested() - @Type(() => StorageDataDto) - public boxes?: StorageDataDto; -} - -export class JobCvatDto extends JobDto { - @ApiProperty({ name: 'requester_description' }) - @IsString() - @IsNotEmpty() - public requesterDescription: string; - - @ApiProperty() - @IsObject() - @ValidateNested() - @Type(() => CvatDataDto) - public data: CvatDataDto; - - @ApiProperty({ type: [Label] }) - @IsArray() - @ArrayMinSize(1) - @ValidateNested({ each: true }) - @Type(() => Label) - public labels: Label[]; - - @ApiProperty({ name: 'min_quality' }) - @IsNumber() - @IsPositive() - @Max(1) - public minQuality: number; - - @ApiProperty({ name: 'ground_truth' }) - @IsObject() - @ValidateNested() - @Type(() => StorageDataDto) - public groundTruth: StorageDataDto; - - @ApiProperty({ name: 'user_guide' }) - @IsUrl() - public userGuide: string; - - @ApiProperty({ enum: CvatJobType }) - @IsEnumCaseInsensitive(CvatJobType) - public type: CvatJobType; -} - export class JobCancelDto { @ApiProperty() @IsNumberString() @@ -364,4 +298,4 @@ export class GetJobsDto extends PageOptionsDto { status?: JobStatusFilter; } -export type CreateJob = JobQuickLaunchDto | JobFortuneDto | JobCvatDto; +export type CreateJob = JobQuickLaunchDto | JobFortuneDto; diff --git a/packages/apps/job-launcher/server/src/modules/job/job.entity.ts b/packages/apps/job-launcher/server/src/modules/job/job.entity.ts index 3887a7670b..56f5cbc38c 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.entity.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.entity.ts @@ -6,7 +6,6 @@ import { JobRequestType, JobStatus, JobType } from '../../common/enums/job'; import { BaseEntity } from '../../database/base.entity'; import { UserEntity } from '../user/user.entity'; import { PaymentEntity } from '../payment/payment.entity'; -import { ContentModerationRequestEntity } from '../content-moderation/content-moderation-request.entity'; @Entity({ schema: NS, name: 'jobs' }) @Index(['chainId', 'escrowAddress'], { unique: true }) @@ -65,13 +64,6 @@ export class JobEntity extends BaseEntity implements IJob { @OneToMany(() => PaymentEntity, (payment) => payment.job) public payments: PaymentEntity[]; - @OneToMany( - () => ContentModerationRequestEntity, - (contentModerationRequest) => contentModerationRequest.job, - { cascade: ['insert'] }, - ) - public contentModerationRequests: ContentModerationRequestEntity[]; - @Column({ type: 'int', default: 0 }) public retriesCount: number; diff --git a/packages/apps/job-launcher/server/src/modules/job/job.interface.ts b/packages/apps/job-launcher/server/src/modules/job/job.interface.ts index b4ad55106a..c1aa8b2b75 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.interface.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.interface.ts @@ -1,86 +1,5 @@ -import { CvatJobType, JobRequestType } from '../../common/enums/job'; -import { - CvatDataDto, - JobCvatDto, - JobFortuneDto, - StorageDataDto, -} from './job.dto'; import { JobEntity } from './job.entity'; -export interface RequestAction { - createManifest: ( - dto: JobFortuneDto | JobCvatDto, - requestType: JobRequestType, - fundAmount: number, - decimals: number, - ) => Promise; -} - -export interface ManifestAction { - getElementsCount: (urls: GenerateUrls) => Promise; - generateUrls: ( - data: CvatDataDto, - groundTruth: StorageDataDto, - ) => GenerateUrls; -} - -export interface EscrowAction { - getTrustedHandlers: () => string[]; -} - -export interface OracleAction { - getOracleAddresses: () => OracleAddresses; -} - -export interface OracleAddresses { - exchangeOracle: string; - recordingOracle: string; - reputationOracle: string; -} - -export interface CvatCalculateJobBounty { - requestType: CvatJobType; - fundAmount: number; - decimals: number; - urls: GenerateUrls; - nodesTotal?: number; -} - -export interface GenerateUrls { - dataUrl: URL; - gtUrl: URL; - pointsUrl?: URL; - boxesUrl?: URL; -} - -export interface CvatImageData { - id: number; - width: number; - height: number; - file_name: string; - license: number; - flickr_url: string; - coco_url: string; - date_captured: number; -} - -export interface CvatAnnotationData { - id: number; - image_id: number; - category_id: number; - segmentation: number[]; - area: number; - bbox: [number, number, number, number]; - iscrowd: number; - attributes: { - scale: number; - x: number; - y: number; - }; - keypoints: [number, number, number]; - num_keypoints: number; -} - export interface ListResult { entities: JobEntity[]; itemCount: number; diff --git a/packages/apps/job-launcher/server/src/modules/job/job.repository.ts b/packages/apps/job-launcher/server/src/modules/job/job.repository.ts index 31a644e310..9b7708b87e 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.repository.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.repository.ts @@ -81,7 +81,6 @@ export class JobRepository extends BaseRepository { waitUntil: SortDirection.ASC, }, ...(take && { take }), - relations: ['contentModerationRequests'], }); } @@ -108,12 +107,7 @@ export class JobRepository extends BaseRepository { switch (data.status) { case JobStatusFilter.PENDING: - statusFilter = [ - JobStatus.PAID, - JobStatus.UNDER_MODERATION, - JobStatus.MODERATION_PASSED, - JobStatus.POSSIBLE_ABUSE_IN_REVIEW, - ]; + statusFilter = [JobStatus.PAID]; break; case JobStatusFilter.CANCELED: statusFilter = [ diff --git a/packages/apps/job-launcher/server/src/modules/job/job.service.spec.ts b/packages/apps/job-launcher/server/src/modules/job/job.service.spec.ts index 98da9f3fe2..3d69a1c594 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.service.spec.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.service.spec.ts @@ -39,10 +39,7 @@ import { } from '../../common/errors'; import { div, max, mul } from '../../common/utils/decimal'; import { getTokenDecimals } from '../../common/utils/tokens'; -import { - createMockCvatManifest, - createMockFortuneManifest, -} from '../manifest/fixtures'; +import { createMockFortuneManifest } from '../manifest/fixtures'; import { ManifestService } from '../manifest/manifest.service'; import { PaymentRepository } from '../payment/payment.repository'; import { PaymentService } from '../payment/payment.service'; @@ -55,11 +52,7 @@ import { Web3Service } from '../web3/web3.service'; import { WebhookRepository } from '../webhook/webhook.repository'; import { WhitelistEntity } from '../whitelist/whitelist.entity'; import { WhitelistService } from '../whitelist/whitelist.service'; -import { - createCvatJobDto, - createFortuneJobDto, - createJobEntity, -} from './fixtures'; +import { createFortuneJobDto, createJobEntity } from './fixtures'; import { FortuneFinalResultDto, GetJobsDto, @@ -207,7 +200,6 @@ describe('JobService', () => { fortuneJobDto, FortuneJobType.FORTUNE, fortuneJobDto.paymentAmount, - fundTokenDecimals, ); expect(mockManifestService.uploadManifest).toHaveBeenCalledWith( fortuneJobDto.chainId, @@ -237,7 +229,7 @@ describe('JobService', () => { ).toFixed(fundTokenDecimals), ), fundAmount: fortuneJobDto.paymentAmount, - status: JobStatus.MODERATION_PASSED, + status: JobStatus.PAID, waitUntil: expect.any(Date), token: fortuneJobDto.escrowFundToken, exchangeOracle: fortuneJobDto.exchangeOracle, @@ -308,7 +300,6 @@ describe('JobService', () => { fortuneJobDto, FortuneJobType.FORTUNE, Number(fortuneJobDto.paymentAmount.toFixed(6)), - fundTokenDecimals, ); expect(mockManifestService.uploadManifest).toHaveBeenCalledWith( fortuneJobDto.chainId, @@ -343,7 +334,7 @@ describe('JobService', () => { usdToTokenRate, ).toFixed(6), ), - status: JobStatus.MODERATION_PASSED, + status: JobStatus.PAID, waitUntil: expect.any(Date), token: fortuneJobDto.escrowFundToken, exchangeOracle: fortuneJobDto.exchangeOracle, @@ -425,7 +416,6 @@ describe('JobService', () => { fortuneJobDto, FortuneJobType.FORTUNE, fortuneJobDto.paymentAmount, - fundTokenDecimals, ); expect(mockManifestService.uploadManifest).toHaveBeenCalledWith( fortuneJobDto.chainId, @@ -455,7 +445,7 @@ describe('JobService', () => { ).toFixed(fundTokenDecimals), ), fundAmount: fortuneJobDto.paymentAmount, - status: JobStatus.MODERATION_PASSED, + status: JobStatus.PAID, waitUntil: expect.any(Date), token: fortuneJobDto.escrowFundToken, exchangeOracle: mockOracles.exchangeOracle, @@ -489,88 +479,6 @@ describe('JobService', () => { }); }); - describe('CVAT', () => { - it('should create a CVAT job', async () => { - const cvatJobDto = createCvatJobDto(); - const fundTokenDecimals = getTokenDecimals( - cvatJobDto.chainId!, - cvatJobDto.escrowFundToken, - ); - - const mockManifest = createMockCvatManifest(); - mockManifestService.createManifest.mockResolvedValueOnce(mockManifest); - const mockUrl = faker.internet.url(); - const mockHash = faker.string.uuid(); - mockManifestService.uploadManifest.mockResolvedValueOnce({ - url: mockUrl, - hash: mockHash, - }); - const jobEntityMock = createJobEntity(); - mockJobRepository.createUnique = jest - .fn() - .mockResolvedValueOnce(jobEntityMock); - mockRateService.getRate - .mockResolvedValueOnce(tokenToUsdRate) - .mockResolvedValueOnce(usdToTokenRate); - - await jobService.createJob(userMock, cvatJobDto.type, cvatJobDto); - - expect(mockWeb3Service.validateChainId).toHaveBeenCalledWith( - cvatJobDto.chainId, - ); - expect(mockRoutingProtocolService.selectOracles).not.toHaveBeenCalled(); - expect(mockRoutingProtocolService.validateOracles).toHaveBeenCalledWith( - cvatJobDto.chainId, - cvatJobDto.type, - cvatJobDto.reputationOracle, - cvatJobDto.exchangeOracle, - cvatJobDto.recordingOracle, - ); - expect(mockManifestService.createManifest).toHaveBeenCalledWith( - cvatJobDto, - cvatJobDto.type, - cvatJobDto.paymentAmount, - fundTokenDecimals, - ); - expect(mockManifestService.uploadManifest).toHaveBeenCalledWith( - cvatJobDto.chainId, - mockManifest, - [ - cvatJobDto.exchangeOracle, - cvatJobDto.reputationOracle, - cvatJobDto.recordingOracle, - ], - ); - expect(mockPaymentService.createWithdrawalPayment).toHaveBeenCalledWith( - userMock.id, - expect.any(Number), - cvatJobDto.paymentCurrency, - tokenToUsdRate, - ); - expect(mockJobRepository.updateOne).toHaveBeenCalledWith({ - chainId: cvatJobDto.chainId, - userId: userMock.id, - manifestUrl: mockUrl, - manifestHash: mockHash, - requestType: cvatJobDto.type, - fee: expect.any(Number), - fundAmount: Number( - mul( - mul(cvatJobDto.paymentAmount, tokenToUsdRate), - usdToTokenRate, - ).toFixed(6), - ), - status: JobStatus.MODERATION_PASSED, - waitUntil: expect.any(Date), - token: cvatJobDto.escrowFundToken, - exchangeOracle: cvatJobDto.exchangeOracle, - recordingOracle: cvatJobDto.recordingOracle, - reputationOracle: cvatJobDto.reputationOracle, - payments: expect.any(Array), - }); - }); - }); - describe('JobQuickLaunchDto', () => { it('should create a job with quick launch dto', async () => { const jobQuickLaunchDto = new JobQuickLaunchDto(); @@ -635,7 +543,7 @@ describe('JobService', () => { usdToTokenRate, ).toFixed(6), ), - status: JobStatus.MODERATION_PASSED, + status: JobStatus.PAID, waitUntil: expect.any(Date), token: jobQuickLaunchDto.escrowFundToken, exchangeOracle: jobQuickLaunchDto.exchangeOracle, @@ -650,7 +558,7 @@ describe('JobService', () => { describe('createEscrow', () => { it('should create an escrow and update job entity', async () => { const jobEntity = createJobEntity({ - status: JobStatus.MODERATION_PASSED, + status: JobStatus.PAID, token: EscrowFundToken.HMT, escrowAddress: null, }); @@ -738,7 +646,7 @@ describe('JobService', () => { it('should throw if escrow address is not returned', async () => { const jobEntity = createJobEntity({ - status: JobStatus.MODERATION_PASSED, + status: JobStatus.PAID, token: EscrowFundToken.HMT, escrowAddress: null, }); diff --git a/packages/apps/job-launcher/server/src/modules/job/job.service.ts b/packages/apps/job-launcher/server/src/modules/job/job.service.ts index 271743182f..385dd3961a 100644 --- a/packages/apps/job-launcher/server/src/modules/job/job.service.ts +++ b/packages/apps/job-launcher/server/src/modules/job/job.service.ts @@ -254,7 +254,6 @@ export class JobService { dto, requestType, fundTokenAmount, - fundTokenDecimals, ); const { url, hash } = await this.manifestService.uploadManifest( @@ -286,16 +285,7 @@ export class JobService { jobEntity.token = dto.escrowFundToken; jobEntity.waitUntil = new Date(); - if ( - user.whitelist || - ( - [FortuneJobType.FORTUNE, HCaptchaJobType.HCAPTCHA] as JobRequestType[] - ).includes(requestType) - ) { - jobEntity.status = JobStatus.MODERATION_PASSED; - } else { - jobEntity.status = JobStatus.PAID; - } + jobEntity.status = JobStatus.PAID; jobEntity = await this.jobRepository.updateOne(jobEntity); diff --git a/packages/apps/job-launcher/server/src/modules/manifest/fixtures.ts b/packages/apps/job-launcher/server/src/modules/manifest/fixtures.ts index ba2b12bb13..4448f700c2 100644 --- a/packages/apps/job-launcher/server/src/modules/manifest/fixtures.ts +++ b/packages/apps/job-launcher/server/src/modules/manifest/fixtures.ts @@ -1,53 +1,6 @@ import { faker } from '@faker-js/faker'; -import { ChainId } from '@human-protocol/sdk'; -import { CvatConfigService } from '../../common/config/cvat-config.service'; -import { CvatJobType, EscrowFundToken } from '../../common/enums/job'; -import { PaymentCurrency } from '../../common/enums/payment'; -import { JobCvatDto } from '../job/job.dto'; -import { - getMockedProvider, - getMockedRegion, -} from '../../../test/fixtures/storage'; +import { CvatJobType, FortuneJobType } from '../../common/enums/job'; import { CvatManifestDto, FortuneManifestDto } from './manifest.dto'; -import { FortuneJobType } from '../../common/enums/job'; - -export const mockCvatConfigService: Omit = { - jobSize: faker.number.int({ min: 1, max: 1000 }), - maxTime: faker.number.int({ min: 1, max: 1000 }), - valSize: faker.number.int({ min: 1, max: 1000 }), - skeletonsJobSizeMultiplier: faker.number.int({ min: 1, max: 1000 }), -}; - -export function createJobCvatDto( - overrides: Partial = {}, -): JobCvatDto { - return { - data: { - dataset: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - }, - labels: [{ name: faker.lorem.word(), nodes: [faker.string.uuid()] }], - requesterDescription: faker.lorem.sentence(), - userGuide: faker.internet.url(), - minQuality: faker.number.float({ min: 0.1, max: 1 }), - groundTruth: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - type: CvatJobType.IMAGE_BOXES, - chainId: faker.helpers.arrayElement(Object.values(ChainId)) as ChainId, - paymentCurrency: faker.helpers.arrayElement(Object.values(PaymentCurrency)), - paymentAmount: faker.number.int({ min: 1, max: 1000 }), - escrowFundToken: faker.helpers.arrayElement(Object.values(EscrowFundToken)), - ...overrides, - }; -} export function createMockFortuneManifest( overrides: Partial = {}, diff --git a/packages/apps/job-launcher/server/src/modules/manifest/manifest.module.ts b/packages/apps/job-launcher/server/src/modules/manifest/manifest.module.ts index ab52fd4050..48d4742160 100644 --- a/packages/apps/job-launcher/server/src/modules/manifest/manifest.module.ts +++ b/packages/apps/job-launcher/server/src/modules/manifest/manifest.module.ts @@ -3,19 +3,9 @@ import { ManifestService } from './manifest.service'; import { StorageModule } from '../storage/storage.module'; import { Web3Module } from '../web3/web3.module'; import { EncryptionModule } from '../encryption/encryption.module'; -import { RoutingProtocolModule } from '../routing-protocol/routing-protocol.module'; -import { RateModule } from '../rate/rate.module'; -import { QualificationModule } from '../qualification/qualification.module'; @Module({ - imports: [ - StorageModule, - Web3Module, - EncryptionModule, - RoutingProtocolModule, - RateModule, - QualificationModule, - ], + imports: [StorageModule, Web3Module, EncryptionModule], providers: [ManifestService], exports: [ManifestService], }) diff --git a/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.spec.ts b/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.spec.ts index 2f9363f42e..21e93dfa4e 100644 --- a/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.spec.ts +++ b/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.spec.ts @@ -1,31 +1,16 @@ -jest.mock('../../common/utils/storage', () => ({ - ...jest.requireActual('../../common/utils/storage'), - listObjectsInBucket: jest.fn(), -})); - import { faker } from '@faker-js/faker'; import { createMock } from '@golevelup/ts-jest'; import { Encryption } from '@human-protocol/sdk'; import { Test } from '@nestjs/testing'; -import { CvatConfigService } from '../../common/config/cvat-config.service'; import { PGPConfigService } from '../../common/config/pgp-config.service'; import { ErrorJob } from '../../common/constants/errors'; import { CvatJobType, FortuneJobType } from '../../common/enums/job'; -import { - ConflictError, - ServerError, - ValidationError, -} from '../../common/errors'; -import { generateBucketUrl } from '../../common/utils/storage'; +import { ServerError, ValidationError } from '../../common/errors'; +import { JobFortuneDto } from '../job/job.dto'; import { StorageService } from '../storage/storage.service'; import { Web3Service } from '../web3/web3.service'; -import { createJobCvatDto, mockCvatConfigService } from './fixtures'; -import { FortuneManifestDto } from './manifest.dto'; import { ManifestService } from './manifest.service'; -import { - getMockedProvider, - getMockedRegion, -} from '../../../test/fixtures/storage'; +import { ManifestDto } from './manifest.dto'; describe('ManifestService', () => { let manifestService: ManifestService; @@ -40,10 +25,6 @@ describe('ManifestService', () => { ManifestService, { provide: Web3Service, useValue: createMock() }, { provide: StorageService, useValue: mockStorageService }, - { - provide: CvatConfigService, - useValue: mockCvatConfigService, - }, { provide: PGPConfigService, useValue: { encrypt: false } }, { provide: Encryption, useValue: createMock() }, ], @@ -57,205 +38,37 @@ describe('ManifestService', () => { }); describe('createManifest', () => { - describe('createCvatManifest', () => { - const tokenFundAmount = faker.number.int({ min: 1, max: 1000 }); - const tokenFundDecimals = faker.number.int({ min: 1, max: 18 }); - let jobBounty: string; - - beforeAll(() => { - jobBounty = faker.number.int({ min: 1, max: 1000 }).toString(); - manifestService['calculateCvatJobBounty'] = jest - .fn() - .mockResolvedValue(jobBounty); - }); - - it('should create a valid CVAT manifest for image boxes job type', async () => { - const dto = createJobCvatDto({ type: CvatJobType.IMAGE_BOXES }); - const requestType = CvatJobType.IMAGE_BOXES; - - const result = await manifestService.createManifest( - dto, - requestType, - tokenFundAmount, - tokenFundDecimals, - ); - - expect(result).toEqual({ - data: { - data_url: generateBucketUrl(dto.data.dataset, requestType).href, - }, - annotation: { - labels: dto.labels, - description: dto.requesterDescription, - user_guide: dto.userGuide, - type: requestType, - job_size: mockCvatConfigService.jobSize, - }, - validation: { - min_quality: dto.minQuality, - val_size: mockCvatConfigService.valSize, - gt_url: generateBucketUrl(dto.groundTruth, requestType).href, - }, - job_bounty: jobBounty, - }); - }); - - it('should create a valid CVAT manifest for image polygons job type', async () => { - const dto = createJobCvatDto({ type: CvatJobType.IMAGE_POLYGONS }); - const requestType = CvatJobType.IMAGE_POLYGONS; - - const result = await manifestService.createManifest( - dto, - requestType, - tokenFundAmount, - tokenFundDecimals, - ); - - expect(result).toEqual({ - data: { - data_url: generateBucketUrl(dto.data.dataset, requestType).href, - }, - annotation: { - labels: dto.labels, - description: dto.requesterDescription, - user_guide: dto.userGuide, - type: requestType, - job_size: mockCvatConfigService.jobSize, - }, - validation: { - min_quality: dto.minQuality, - val_size: mockCvatConfigService.valSize, - gt_url: generateBucketUrl(dto.groundTruth, requestType).href, - }, - job_bounty: jobBounty, - }); - }); - - it('should create a valid CVAT manifest for image boxes from points job type', async () => { - const dto = createJobCvatDto({ - data: { - dataset: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - points: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - }, - type: CvatJobType.IMAGE_BOXES_FROM_POINTS, - }); - const requestType = CvatJobType.IMAGE_BOXES_FROM_POINTS; - - const result = await manifestService.createManifest( - dto, - requestType, - tokenFundAmount, - tokenFundDecimals, - ); - - expect(result).toEqual({ - data: { - data_url: generateBucketUrl(dto.data.dataset, requestType).href, - points_url: generateBucketUrl(dto.data.points!, requestType).href, - }, - annotation: { - labels: dto.labels, - description: dto.requesterDescription, - user_guide: dto.userGuide, - type: requestType, - job_size: mockCvatConfigService.jobSize, - }, - validation: { - min_quality: dto.minQuality, - val_size: mockCvatConfigService.valSize, - gt_url: generateBucketUrl(dto.groundTruth, requestType).href, - }, - job_bounty: jobBounty, - }); - }); - - it('should create a valid CVAT manifest for image skeletons from boxes job type', async () => { - const dto = createJobCvatDto({ - data: { - dataset: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - boxes: { - provider: getMockedProvider(), - region: getMockedRegion(), - bucketName: faker.lorem.word(), - path: faker.system.filePath(), - }, - }, - type: CvatJobType.IMAGE_SKELETONS_FROM_BOXES, - }); - const requestType = CvatJobType.IMAGE_SKELETONS_FROM_BOXES; + it('should create a fortune manifest', async () => { + const dto: JobFortuneDto = { + requesterTitle: faker.lorem.sentence(), + requesterDescription: faker.lorem.sentence(), + submissionsRequired: faker.number.int({ min: 1, max: 100 }), + paymentCurrency: faker.helpers.arrayElement([0, 1]) as any, + paymentAmount: faker.number.int({ min: 1, max: 1000 }), + escrowFundToken: faker.helpers.arrayElement(['HMT', 'USDC']) as any, + }; - const result = await manifestService.createManifest( + await expect( + manifestService.createManifest( dto, - requestType, - tokenFundAmount, - tokenFundDecimals, - ); - - expect(result).toEqual({ - data: { - data_url: generateBucketUrl(dto.data.dataset, requestType).href, - boxes_url: generateBucketUrl(dto.data.boxes!, requestType).href, - }, - annotation: { - labels: dto.labels, - description: dto.requesterDescription, - user_guide: dto.userGuide, - type: requestType, - job_size: mockCvatConfigService.jobSize, - }, - validation: { - min_quality: dto.minQuality, - val_size: mockCvatConfigService.valSize, - gt_url: generateBucketUrl(dto.groundTruth, requestType).href, - }, - job_bounty: jobBounty, - }); - }); - - it('should throw an error if data does not exist for image boxes from points job type', async () => { - const requestType = CvatJobType.IMAGE_BOXES_FROM_POINTS; - - const dto = createJobCvatDto({ type: requestType }); - - await expect( - manifestService.createManifest( - dto, - requestType, - tokenFundAmount, - tokenFundDecimals, - ), - ).rejects.toThrow(new ConflictError(ErrorJob.DataNotExist)); + FortuneJobType.FORTUNE, + dto.paymentAmount, + ), + ).resolves.toEqual({ + ...dto, + requestType: FortuneJobType.FORTUNE, + fundAmount: dto.paymentAmount, }); + }); - it('should throw an error if data does not exist for image skeletons from boxes job type', async () => { - const requestType = CvatJobType.IMAGE_SKELETONS_FROM_BOXES; - - const dto = createJobCvatDto({ type: requestType }); - - await expect( - manifestService.createManifest( - dto, - requestType, - tokenFundAmount, - tokenFundDecimals, - ), - ).rejects.toThrow(new ConflictError(ErrorJob.DataNotExist)); - }); + it('should reject non-fortune request types', async () => { + await expect( + manifestService.createManifest( + {} as JobFortuneDto, + CvatJobType.IMAGE_BOXES, + 1, + ), + ).rejects.toThrow(new ValidationError(ErrorJob.InvalidRequestType)); }); }); @@ -310,7 +123,7 @@ describe('ManifestService', () => { it('should download and validate a manifest successfully', async () => { const mockManifestUrl = faker.internet.url(); const mockRequestType = FortuneJobType.FORTUNE; - const mockManifest: FortuneManifestDto = { + const mockManifest: ManifestDto = { submissionsRequired: faker.number.int({ min: 1, max: 100 }), requesterTitle: faker.lorem.words(3), requesterDescription: faker.lorem.sentence(), @@ -331,7 +144,7 @@ describe('ManifestService', () => { it('should throw an error if validation fails', async () => { const mockManifestUrl = faker.internet.url(); const mockRequestType = CvatJobType.IMAGE_BOXES; - const mockManifest: FortuneManifestDto = { + const mockManifest: ManifestDto = { submissionsRequired: faker.number.int({ min: 1, max: 100 }), requesterTitle: faker.lorem.words(3), requesterDescription: faker.lorem.sentence(), diff --git a/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.ts b/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.ts index 93a47b8481..490aac01d2 100644 --- a/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.ts +++ b/packages/apps/job-launcher/server/src/modules/manifest/manifest.service.ts @@ -4,28 +4,15 @@ import { Injectable, } from '@nestjs/common'; import { validate } from 'class-validator'; -import { ethers } from 'ethers'; -import { CvatConfigService } from '../../common/config/cvat-config.service'; import { PGPConfigService } from '../../common/config/pgp-config.service'; import { ErrorJob } from '../../common/constants/errors'; import { - CvatJobType, FortuneJobType, HCaptchaJobType, JobRequestType, } from '../../common/enums/job'; -import { ConflictError, ValidationError } from '../../common/errors'; -import { - generateBucketUrl, - listObjectsInBucket, -} from '../../common/utils/storage'; -import { CreateJob, JobCvatDto } from '../job/job.dto'; -import { - CvatAnnotationData, - CvatCalculateJobBounty, - CvatImageData, - GenerateUrls, -} from '../job/job.interface'; +import { ValidationError } from '../../common/errors'; +import { JobFortuneDto } from '../job/job.dto'; import { StorageService } from '../storage/storage.service'; import { Web3Service } from '../web3/web3.service'; import { @@ -41,241 +28,24 @@ export class ManifestService { constructor( private readonly web3Service: Web3Service, - private readonly cvatConfigService: CvatConfigService, private readonly pgpConfigService: PGPConfigService, private readonly storageService: StorageService, private readonly encryption: Encryption, ) {} async createManifest( - dto: CreateJob, + dto: JobFortuneDto, requestType: JobRequestType, fundAmount: number, - decimals: number, - ): Promise { - switch (requestType) { - case FortuneJobType.FORTUNE: - return { - ...dto, - requestType, - fundAmount, - }; - - case CvatJobType.IMAGE_POLYGONS: - case CvatJobType.IMAGE_BOXES: - case CvatJobType.IMAGE_POINTS: - case CvatJobType.IMAGE_BOXES_FROM_POINTS: - case CvatJobType.IMAGE_SKELETONS_FROM_BOXES: - return this.createCvatManifest( - dto as JobCvatDto, - requestType, - fundAmount, - decimals, - ); - - default: - throw new ValidationError(ErrorJob.InvalidRequestType); - } - } - - private async getCvatElementsCount( - urls: GenerateUrls, - requestType: CvatJobType, - ): Promise { - let gt: any, gtEntries: number; - switch (requestType) { - case CvatJobType.IMAGE_POLYGONS: - case CvatJobType.IMAGE_BOXES: - case CvatJobType.IMAGE_POINTS: { - const data = await listObjectsInBucket(urls.dataUrl); - if (!data || data.length === 0 || !data[0]) - throw new ValidationError(ErrorJob.DatasetValidationFailed); - gt = (await this.storageService.downloadJsonLikeData( - `${urls.gtUrl.protocol}//${urls.gtUrl.host}${urls.gtUrl.pathname}`, - )) as any; - if (!gt || !gt.images || gt.images.length === 0) - throw new ValidationError(ErrorJob.GroundThuthValidationFailed); - - await this.checkImageConsistency(gt.images, data); - - return data.length - gt.images.length; - } - - case CvatJobType.IMAGE_BOXES_FROM_POINTS: { - const points = (await this.storageService.downloadJsonLikeData( - urls.pointsUrl!.href, - )) as any; - gt = (await this.storageService.downloadJsonLikeData( - urls.gtUrl.href, - )) as any; - - if (!gt || !gt.images || gt.images.length === 0) { - throw new ValidationError(ErrorJob.GroundThuthValidationFailed); - } - - gtEntries = 0; - gt.images.forEach((gtImage: CvatImageData) => { - const { id } = points.images.find( - (dataImage: CvatImageData) => - dataImage.file_name === gtImage.file_name, - ); - - if (id) { - const matchingAnnotations = points.annotations.filter( - (dataAnnotation: CvatAnnotationData) => - dataAnnotation.image_id === id, - ); - gtEntries += matchingAnnotations.length; - } - }); - - return points.annotations.length - gtEntries; - } - - case CvatJobType.IMAGE_SKELETONS_FROM_BOXES: { - const boxes = (await this.storageService.downloadJsonLikeData( - urls.boxesUrl!.href, - )) as any; - gt = (await this.storageService.downloadJsonLikeData( - urls.gtUrl.href, - )) as any; - - if (!gt || !gt.images || gt.images.length === 0) { - throw new ValidationError(ErrorJob.GroundThuthValidationFailed); - } - - gtEntries = 0; - gt.images.forEach((gtImage: CvatImageData) => { - const { id } = boxes.images.find( - (dataImage: CvatImageData) => - dataImage.file_name === gtImage.file_name, - ); - - if (id) { - const matchingAnnotations = boxes.annotations.filter( - (dataAnnotation: CvatAnnotationData) => - dataAnnotation.image_id === id, - ); - gtEntries += matchingAnnotations.length; - } - }); - - return boxes.annotations.length - gtEntries; - } - - default: - throw new ValidationError(ErrorJob.InvalidRequestType); - } - } - - private async checkImageConsistency( - gtImages: any[], - dataFiles: string[], - ): Promise { - const gtFileNames = gtImages.map((image: any) => image.file_name); - const baseFileNames = dataFiles.map((fileName) => - fileName.split('/').pop(), - ); - const missingFileNames = gtFileNames.filter( - (fileName: any) => !baseFileNames.includes(fileName), - ); - - if (missingFileNames.length !== 0) { - throw new ConflictError(ErrorJob.ImageConsistency); + ): Promise { + if (requestType !== FortuneJobType.FORTUNE) { + throw new ValidationError(ErrorJob.InvalidRequestType); } - } - - private async calculateCvatJobBounty( - params: CvatCalculateJobBounty, - ): Promise { - const { requestType, fundAmount, urls, nodesTotal } = params; - - const elementsCount = await this.getCvatElementsCount(urls, requestType); - - let jobSize = Number(this.cvatConfigService.jobSize); - - if (requestType === CvatJobType.IMAGE_SKELETONS_FROM_BOXES) { - const jobSizeMultiplier = Number( - this.cvatConfigService.skeletonsJobSizeMultiplier, - ); - jobSize *= jobSizeMultiplier; - } - - let totalJobs: number; - - // For each skeleton node CVAT creates a separate project thus increasing the number of jobs - if (requestType === CvatJobType.IMAGE_SKELETONS_FROM_BOXES && nodesTotal) { - totalJobs = Math.ceil(elementsCount / jobSize) * nodesTotal; - } else { - totalJobs = Math.ceil(elementsCount / jobSize); - } - - const jobBounty = - ethers.parseUnits(fundAmount.toString(), params.decimals) / - BigInt(totalJobs); - - return ethers.formatUnits(jobBounty, params.decimals); - } - - private async createCvatManifest( - dto: JobCvatDto, - requestType: CvatJobType, - tokenFundAmount: number, - decimals: number, - ): Promise { - if ( - (requestType === CvatJobType.IMAGE_SKELETONS_FROM_BOXES && - !dto.data.boxes) || - (requestType === CvatJobType.IMAGE_BOXES_FROM_POINTS && !dto.data.points) - ) { - throw new ConflictError(ErrorJob.DataNotExist); - } - - const urls = { - dataUrl: generateBucketUrl(dto.data.dataset, requestType), - gtUrl: generateBucketUrl(dto.groundTruth, requestType), - boxesUrl: dto.data.boxes - ? generateBucketUrl(dto.data.boxes, requestType) - : undefined, - pointsUrl: dto.data.points - ? generateBucketUrl(dto.data.points, requestType) - : undefined, - }; - - const jobBounty = await this.calculateCvatJobBounty({ - requestType, - fundAmount: tokenFundAmount, - decimals, - urls, - nodesTotal: dto.labels[0]?.nodes?.length, - }); return { - data: { - data_url: urls.dataUrl.href, - ...(urls.pointsUrl && { - points_url: urls.pointsUrl?.href, - }), - ...(urls.boxesUrl && { - boxes_url: urls.boxesUrl?.href, - }), - }, - annotation: { - labels: dto.labels, - description: dto.requesterDescription, - user_guide: dto.userGuide, - type: requestType as CvatJobType, - job_size: this.cvatConfigService.jobSize, - ...(dto.qualifications && { - qualifications: dto.qualifications, - }), - }, - validation: { - min_quality: dto.minQuality, - val_size: this.cvatConfigService.valSize, - gt_url: urls.gtUrl.href, - }, - job_bounty: jobBounty, + ...dto, + requestType, + fundAmount, }; } @@ -296,11 +66,10 @@ export class ManifestService { const publicKey = await KVStoreUtils.getPublicKey(chainId, address); if (publicKey) publicKeys.push(publicKey); } - const encryptedManifest = await this.encryption.signAndEncrypt( + manifestFile = await this.encryption.signAndEncrypt( JSON.stringify(data), publicKeys, ); - manifestFile = encryptedManifest; } return this.storageService.uploadJsonLikeData(manifestFile); @@ -308,7 +77,7 @@ export class ManifestService { private async validateManifest( requestType: JobRequestType, - manifest: FortuneManifestDto | CvatManifestDto | HCaptchaManifestDto, + manifest: ManifestDto, ): Promise { let dtoCheck; @@ -332,7 +101,7 @@ export class ManifestService { async downloadManifest( manifestUrl: string, requestType: JobRequestType, - ): Promise { + ): Promise { const manifest = (await this.storageService.downloadJsonLikeData( manifestUrl, )) as ManifestDto; diff --git a/packages/apps/job-launcher/server/src/modules/webhook/webhook.controller.spec.ts b/packages/apps/job-launcher/server/src/modules/webhook/webhook.controller.spec.ts index 22305582c9..cec113cd8e 100644 --- a/packages/apps/job-launcher/server/src/modules/webhook/webhook.controller.spec.ts +++ b/packages/apps/job-launcher/server/src/modules/webhook/webhook.controller.spec.ts @@ -8,10 +8,6 @@ import { ConfigService } from '@nestjs/config'; import { Test, TestingModule } from '@nestjs/testing'; import { MOCK_ADDRESS, - MOCK_CVAT_JOB_SIZE, - MOCK_CVAT_MAX_TIME, - MOCK_CVAT_SKELETONS_JOB_SIZE_MULTIPLIER, - MOCK_CVAT_VAL_SIZE, MOCK_EXPIRES_IN, MOCK_HCAPTCHA_SITE_KEY, MOCK_MAX_RETRY_COUNT, @@ -70,11 +66,6 @@ describe('WebhookController', () => { HCAPTCHA_REPUTATION_ORACLE_URI: MOCK_REPUTATION_ORACLE_URL, HCAPTCHA_SECRET: MOCK_SECRET, JWT_ACCESS_TOKEN_EXPIRES_IN: MOCK_EXPIRES_IN, - CVAT_JOB_SIZE: MOCK_CVAT_JOB_SIZE, - CVAT_MAX_TIME: MOCK_CVAT_MAX_TIME, - CVAT_VAL_SIZE: MOCK_CVAT_VAL_SIZE, - CVAT_SKELETONS_JOB_SIZE_MULTIPLIER: - MOCK_CVAT_SKELETONS_JOB_SIZE_MULTIPLIER, }; const module: TestingModule = await Test.createTestingModule({ diff --git a/packages/apps/job-launcher/server/test/constants.ts b/packages/apps/job-launcher/server/test/constants.ts index d28777eef2..438e96be3b 100644 --- a/packages/apps/job-launcher/server/test/constants.ts +++ b/packages/apps/job-launcher/server/test/constants.ts @@ -1,51 +1,26 @@ import { FortuneJobType } from '../src/common/enums/job'; -import { AWSRegions, StorageProviders } from '../src/common/enums/storage'; import { Web3Env } from '../src/common/enums/web3'; -import { CvatDataDto, StorageDataDto } from '../src/modules/job/job.dto'; -import { - FortuneManifestDto, - Label, -} from '../src/modules/manifest/manifest.dto'; +import { FortuneManifestDto } from '../src/modules/manifest/manifest.dto'; export const MOCK_REQUESTER_TITLE = 'Mock job title'; export const MOCK_REQUESTER_DESCRIPTION = 'Mock job description'; -export const MOCK_SUBMISSION_REQUIRED = 5; -export const MOCK_CHAIN_ID = 1; export const MOCK_ADDRESS = '0xCf88b3f1992458C2f5a229573c768D0E9F70C44e'; export const MOCK_FILE_URL = 'http://mockedFileUrl.test/bucket/file.json'; export const MOCK_FILE_HASH = 'mockedFileHash'; -export const MOCK_FILE_KEY = 'manifest.json'; -export const MOCK_BUCKET_FILES = [ - 'file0', - 'file1', - 'file2', - 'file3', - 'file4', - 'file5', -]; export const MOCK_PRIVATE_KEY = 'd334daf65a631f40549cc7de126d5a0016f32a2d00c49f94563f9737f7135e55'; -export const MOCK_GAS_PRICE_MULTIPLIER = 1; export const MOCK_REPUTATION_ORACLES = '0x0000000000000000000000000000000000000001,0x0000000000000000000000000000000000000002,0x0000000000000000000000000000000000000003'; export const MOCK_REPUTATION_ORACLE_1 = '0x0000000000000000000000000000000000000001'; export const MOCK_WEB3_RPC_URL = 'http://localhost:8545'; -export const MOCK_WEB3_NODE_HOST = 'localhost'; -export const MOCK_BUCKET_NAME = 'bucket-name'; export const MOCK_EXCHANGE_ORACLE_ADDRESS = '0xCf88b3f1992458C2f5a229573c768D0E9F70C44e'; -export const MOCK_RECORDING_ORACLE_ADDRESS = - '0xCf88b3f1992458C2f5a229573c768D0E9F70C44e'; -export const MOCK_REPUTATION_ORACLE_ADDRESS = - '0x2E04d5D6cE3fF2261D0Cb04d41Fb4Cd67362A473'; export const MOCK_EXCHANGE_ORACLE_WEBHOOK_URL = 'http://localhost:3000'; export const MOCK_REPUTATION_ORACLE_URL = 'http://reporacle:3000'; export const MOCK_RECORDING_ORACLE_URL = 'http://recoracle:3000'; export const MOCK_EXCHANGE_ORACLE_URL = 'http://exoracle:3000'; export const MOCK_SECRET = 'secrete'; -export const MOCK_JOB_LAUNCHER_FEE = 5; -export const MOCK_ORACLE_FEE = 5; export const MOCK_TRANSACTION_HASH = '0xd28e4c40571530afcb25ea1890e77b2d18c35f06049980ca4fb71829f64d89dc'; export const MOCK_SIGNATURE = @@ -53,22 +28,15 @@ export const MOCK_SIGNATURE = export const MOCK_EMAIL = 'test@example.com'; export const MOCK_PASSWORD = 'password123'; export const MOCK_HASHED_PASSWORD = 'hashedPassword'; -export const MOCK_CUSTOMER_ID = 'customer123'; export const MOCK_PAYMENT_ID = 'payment123'; export const MOCK_ACCESS_TOKEN = 'access_token'; export const MOCK_REFRESH_TOKEN = 'refresh_token'; -export const MOCK_ACCESS_TOKEN_HASHED = 'access_token_hashed'; -export const MOCK_REFRESH_TOKEN_HASHED = 'refresh_token_hashed'; export const MOCK_EXPIRES_IN = 1000000000000000; -export const MOCK_USER_ID = 1; -export const MOCK_JOB_ID = 1; export const MOCK_SENDGRID_API_KEY = 'SG.xxxxxxxxxxxxxxxxxxxxxx.xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'; export const MOCK_PAYMENT_PROVIDER_SECRET_KEY = 'xxxxxxxxxxxxxxxxxxxxxx'; -export const MOCK_COINGECKO_API_KEY = 'xxxxxxxxxxxxxxxxxxxxxx'; export const MOCK_PAYMENT_PROVIDER_API_VERSION = '2022-11-15'; -export const MOCK_PAYMENT_PROVIDER_APP_NAME = 'Name'; export const MOCK_PAYMENT_PROVIDER_APP_INFO_URL = 'https://test-app-url.com'; export const MOCK_SENDGRID_FROM_EMAIL = 'info@hmt.ai'; export const MOCK_SENDGRID_FROM_NAME = 'John Doe'; @@ -117,124 +85,12 @@ Fx3dwWk9YaZ4lQD+MHnMYu48TwdE4ZKNcNUaOmWLBbZTgedqqHGLXbiyZAg= =IMAe -----END PGP PUBLIC KEY BLOCK-----`; export const MOCK_PGP_PASSPHRASE = ''; -export const MOCK_HCAPTCHA_ORACLE_ADDRESS = - '0xa62a1c18571b869e43eeabd217e233e7f0275af3'; -export const MOCK_CVAT_JOB_SIZE = '1'; -export const MOCK_CVAT_MAX_TIME = '300'; -export const MOCK_CVAT_VAL_SIZE = '2'; -export const MOCK_CVAT_SKELETONS_JOB_SIZE_MULTIPLIER = '6'; export const MOCK_HCAPTCHA_SITE_KEY = '1234'; -export const MOCK_HCAPTCHA_IMAGE_URL = - 'http://mockedFileUrl.test/bucket/img_1.jpg'; -export const MOCK_HCAPTCHA_IMAGE_LABEL = 'cat'; -export const MOCK_HCAPTCHA_REPO_URI = 'http://recoracle:3000'; -export const MOCK_HCAPTCHA_RO_URI = 'http://recoracle:3000'; export const MOCK_MAX_RETRY_COUNT = 5; -export const MOCK_STORAGE_DATA: StorageDataDto = { - provider: StorageProviders.AWS, - region: AWSRegions.EU_CENTRAL_1, - bucketName: 'bucket', - path: 'folder/test', -}; -export const MOCK_CVAT_DATA_DATASET: CvatDataDto = { - dataset: MOCK_STORAGE_DATA, -}; - -export const MOCK_CVAT_DATA_POINTS: CvatDataDto = { - dataset: MOCK_STORAGE_DATA, - points: MOCK_STORAGE_DATA, -}; - -export const MOCK_CVAT_DATA_BOXES: CvatDataDto = { - dataset: MOCK_STORAGE_DATA, - boxes: MOCK_STORAGE_DATA, -}; - -export const MOCK_CVAT_LABELS: Label[] = [ - { - name: 'label1', - }, - { - name: 'label2', - }, -]; - -export const MOCK_CVAT_LABELS_WITH_NODES: Label[] = [ - { - name: 'label1', - nodes: ['node 1', 'node 2', 'node 3', 'node 4'], - }, - { - name: 'label2', - nodes: ['node 1', 'node 2', 'node 3', 'node 4'], - }, -]; - -export const MOCK_BUCKET_FILE = - 'https://bucket.s3.eu-central-1.amazonaws.com/folder/test'; - -export const MOCK_CVAT_DATA = { - images: [ - { - id: 1, - file_name: '1.jpg', - }, - { - id: 2, - file_name: '2.jpg', - }, - { - id: 3, - file_name: '3.jpg', - }, - { - id: 4, - file_name: '4.jpg', - }, - { - id: 5, - file_name: '5.jpg', - }, - ], - annotations: [ - { - image_id: 1, - }, - { - image_id: 2, - }, - { - image_id: 3, - }, - { - image_id: 4, - }, - { - image_id: 5, - }, - ], -}; - -export const MOCK_CVAT_GT = { - images: [ - { - file_name: '1.jpg', - }, - { - file_name: '2.jpg', - }, - { - file_name: '3.jpg', - }, - ], -}; - -export const MOCK_MINIMUM_FEE_USD = 0.01; export const MOCK_RATE_CACHE_TIME = 30; export const MOCK_FE_URL = 'http://localhost:3001'; export const mockConfig: any = { - MINIMUM_FEE_USD: MOCK_MINIMUM_FEE_USD, RATE_CACHE_TIME: MOCK_RATE_CACHE_TIME, S3_ACCESS_KEY: MOCK_S3_ACCESS_KEY, S3_SECRET_KEY: MOCK_S3_SECRET_KEY, @@ -249,20 +105,13 @@ export const mockConfig: any = { WEB3_PRIVATE_KEY: MOCK_PRIVATE_KEY, PAYMENT_PROVIDER_SECRET_KEY: MOCK_PAYMENT_PROVIDER_SECRET_KEY, PAYMENT_PROVIDER_API_VERSION: MOCK_PAYMENT_PROVIDER_API_VERSION, - PAYMENT_PROVIDER_APP_NAME: MOCK_PAYMENT_PROVIDER_APP_NAME, PAYMENT_PROVIDER_APP_INFO_URL: MOCK_PAYMENT_PROVIDER_APP_INFO_URL, - CVAT_EXCHANGE_ORACLE_ADDRESS: MOCK_ADDRESS, - CVAT_RECORDING_ORACLE_ADDRESS: MOCK_ADDRESS, HCAPTCHA_SITE_KEY: MOCK_HCAPTCHA_SITE_KEY, HCAPTCHA_RECORDING_ORACLE_URI: MOCK_RECORDING_ORACLE_URL, HCAPTCHA_REPUTATION_ORACLE_URI: MOCK_REPUTATION_ORACLE_URL, HCAPTCHA_ORACLE_ADDRESS: MOCK_ADDRESS, HCAPTCHA_SECRET: MOCK_SECRET, JWT_ACCESS_TOKEN_EXPIRES_IN: MOCK_EXPIRES_IN, - CVAT_JOB_SIZE: MOCK_CVAT_JOB_SIZE, - CVAT_MAX_TIME: MOCK_CVAT_MAX_TIME, - CVAT_VAL_SIZE: MOCK_CVAT_VAL_SIZE, - CVAT_SKELETONS_JOB_SIZE_MULTIPLIER: MOCK_CVAT_SKELETONS_JOB_SIZE_MULTIPLIER, MAX_RETRY_COUNT: MOCK_MAX_RETRY_COUNT, RPC_URL_POLYGON_AMOY: MOCK_WEB3_RPC_URL, SENDGRID_API_KEY: MOCK_SENDGRID_API_KEY, @@ -270,6 +119,5 @@ export const mockConfig: any = { SENDGRID_FROM_NAME: MOCK_SENDGRID_FROM_NAME, REPUTATION_ORACLES: MOCK_REPUTATION_ORACLES, WEB3_ENV: Web3Env.TESTNET, - COINGECKO_API_KEY: MOCK_COINGECKO_API_KEY, FE_URL: MOCK_FE_URL, };