From 29a12fb34a6d6bf6fa60a69728c6c74cabefc5b3 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Mon, 25 Nov 2024 19:43:45 +0100 Subject: [PATCH 1/9] Sync and cleanup rework --- .../src/services/archive/ArchiveService.ts | 71 +++--- storage-node/src/services/queryNode/api.ts | 129 ++++++---- .../queryNode/queries/queries.graphql | 76 +++++- .../src/services/sync/acceptPendingObjects.ts | 2 +- .../src/services/sync/cleanupService.ts | 114 +++++---- .../src/services/sync/storageObligations.ts | 230 ++++++++++++------ .../src/services/sync/synchronizer.ts | 113 +++------ .../services/webApi/controllers/stateApi.ts | 3 +- 8 files changed, 439 insertions(+), 299 deletions(-) diff --git a/storage-node/src/services/archive/ArchiveService.ts b/storage-node/src/services/archive/ArchiveService.ts index 9ed7dd1bd6..e3da432c7c 100644 --- a/storage-node/src/services/archive/ArchiveService.ts +++ b/storage-node/src/services/archive/ArchiveService.ts @@ -13,7 +13,7 @@ import { OBJECTS_TRACKING_FILENAME, } from './tracking' import { QueryNodeApi } from '../queryNode/api' -import { getStorageObligationsFromRuntime } from '../sync/storageObligations' +import { DataObjectDetailsLoader, getStorageObligationsFromRuntime } from '../sync/storageObligations' import { getDownloadTasks } from '../sync/synchronizer' import sleep from 'sleep-promise' import { Logger } from 'winston' @@ -369,40 +369,49 @@ export class ArchiveService { public async performSync(): Promise { const model = await getStorageObligationsFromRuntime(this.queryNodeApi) - const assignedObjects = model.dataObjects - const added = assignedObjects.filter((obj) => !this.objectTrackingService.isTracked(obj.id)) - added.sort((a, b) => parseInt(b.id) - parseInt(a.id)) + const assignedObjectsIds = await model.createAssignedObjectsIdsLoader(true).getAll() + const unsyncedIds = assignedObjectsIds + .filter((id) => !this.objectTrackingService.isTracked(id)) + .map((id) => parseInt(id)) + .sort((a, b) => a - b) - this.logger.info(`Sync - new objects: ${added.length}`) + this.logger.info(`Sync - new objects: ${unsyncedIds.length}`) - // Add new download tasks while the upload dir size limit allows - while (added.length) { - const uploadDirectorySize = await this.getUploadDirSize() - while (true) { - const object = added.pop() - if (!object) { - break - } - if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) { - this.logger.debug( - `Waiting for some disk space to free ` + - `(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` + - `sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... ` + // Sync objects in batches of 10_000 + for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) { + const objectsBatchLoader = new DataObjectDetailsLoader(this.queryNodeApi, { + by: 'ids', + ids: unsyncedIdsBatch.map((id) => id.toString()), + }) + const objectsBatch = await objectsBatchLoader.getAll() + // Add new download tasks while the upload dir size limit allows + while (objectsBatch.length) { + const uploadDirectorySize = await this.getUploadDirSize() + while (true) { + const object = objectsBatch.pop() + if (!object) { + break + } + if (object.size + uploadDirectorySize + this.syncQueueObjectsSize > this.uploadDirSizeLimit) { + this.logger.debug( + `Waiting for some disk space to free ` + + `(upload_dir: ${uploadDirectorySize} / ${this.uploadDirSizeLimit}, ` + + `sync_q=${this.syncQueueObjectsSize}, obj_size=${object.size})... ` + ) + objectsBatch.push(object) + await sleep(60_000) + break + } + const [downloadTask] = await getDownloadTasks( + model, + [object], + this.uploadQueueDir, + this.tmpDownloadDir, + this.syncWorkersTimeout, + this.hostId ) - added.push(object) - await sleep(60_000) - break + await this.addDownloadTask(downloadTask, object.size) } - const [downloadTask] = await getDownloadTasks( - model, - [], - [object], - this.uploadQueueDir, - this.tmpDownloadDir, - this.syncWorkersTimeout, - this.hostId - ) - await this.addDownloadTask(downloadTask, object.size) } } } diff --git a/storage-node/src/services/queryNode/api.ts b/storage-node/src/services/queryNode/api.ts index 8fb896dafa..afafd65b2b 100644 --- a/storage-node/src/services/queryNode/api.ts +++ b/storage-node/src/services/queryNode/api.ts @@ -4,20 +4,26 @@ import fetch from 'cross-fetch' import stringify from 'fast-safe-stringify' import logger from '../logger' import { - DataObjectByBagIdsDetailsFragment, DataObjectDetailsFragment, + DataObjectIdsByBagId, + DataObjectIdsByBagIdQuery, + DataObjectIdsByBagIdQueryVariables, + DataObjectsByBagsConnection, + DataObjectsByBagsConnectionQuery, + DataObjectsByBagsConnectionQueryVariables, + DataObjectsByIdsConnection, + DataObjectsByIdsConnectionQuery, + DataObjectsByIdsConnectionQueryVariables, + DataObjectsWithBagDetailsByIds, + DataObjectsWithBagDetailsByIdsQuery, + DataObjectsWithBagDetailsByIdsQueryVariables, + DataObjectWithBagDetailsFragment, GetAllStorageBagDetails, GetAllStorageBagDetailsQuery, GetAllStorageBagDetailsQueryVariables, - GetDataObjects, - GetDataObjectsByBagIds, - GetDataObjectsByBagIdsQuery, - GetDataObjectsByBagIdsQueryVariables, GetDataObjectsDeletedEvents, GetDataObjectsDeletedEventsQuery, GetDataObjectsDeletedEventsQueryVariables, - GetDataObjectsQuery, - GetDataObjectsQueryVariables, GetSquidVersion, GetSquidVersionQuery, GetSquidVersionQueryVariables, @@ -41,7 +47,7 @@ import { StorageBucketDetailsFragment, StorageBucketIdsFragment, } from './generated/queries' -import { Maybe, StorageBagWhereInput } from './generated/schema' +import { Maybe } from './generated/schema' /** * Defines query paging limits. @@ -53,7 +59,7 @@ type PaginationQueryVariables = { lastCursor?: Maybe } -type PaginationQueryResult = { +export type PaginationQueryResult = { edges: { node: T }[] pageInfo: { hasNextPage: boolean @@ -249,50 +255,87 @@ export class QueryNodeApi { } /** - * Returns data objects info by pages for the given bags. + * Gets a page of data objects belonging to specified bags. * * @param bagIds - query filter: bag IDs */ - public async getDataObjectsByBagIds(bagIds: string[]): Promise> { - const allBagIds = [...bagIds] // Copy to avoid modifying the original array - let fullResult: DataObjectByBagIdsDetailsFragment[] = [] - while (allBagIds.length) { - const bagIdsBatch = allBagIds.splice(0, 1000) - const input: StorageBagWhereInput = { id_in: bagIdsBatch } - fullResult = [ - ...fullResult, - ...(await this.multipleEntitiesQuery( - GetDataObjectsByBagIds, - { bagIds: input }, - 'storageDataObjects' - )), - ] - } + public async getDataObjectsByBagsPage( + bagIds: string[], + limit: number, + after: string | undefined, + includeDetails: IncludeDetails, + isAccepted?: boolean + ): Promise< + IncludeDetails extends true + ? PaginationQueryResult | null + : PaginationQueryResult<{ id: string }> | null + > { + return this.uniqueEntityQuery( + DataObjectsByBagsConnection, + { + bagIds: [...bagIds], + isAccepted, + limit, + after, + includeDetails: includeDetails, + }, + 'storageDataObjectsConnection' + ) + } - return fullResult + /** + * Gets a page of data objects by the given list of dataObject IDs. + * + * @param ids - query filter: data object ids + */ + public async getDataObjectsByIdsPage( + ids: string[], + limit: number, + after: string | undefined, + includeDetails: IncludeDetails, + isAccepted?: boolean + ): Promise< + IncludeDetails extends true + ? PaginationQueryResult | null + : PaginationQueryResult<{ id: string }> | null + > { + return this.uniqueEntityQuery( + DataObjectsByIdsConnection, + { + ids: [...ids], + isAccepted, + limit, + after, + includeDetails: includeDetails, + }, + 'storageDataObjectsConnection' + ) } /** - * Returns data objects info by pages for the given dataObject IDs. + * Returns a list of data objects by ids, with their corresponding bag details * - * @param dataObjectIds - query filter: dataObject IDs + * @param ids - query filter: data object ids */ - public async getDataObjectDetails(dataObjectIds: string[]): Promise> { - const allDataObjectIds = [...dataObjectIds] // Copy to avoid modifying the original array - let fullResult: DataObjectDetailsFragment[] = [] - while (allDataObjectIds.length) { - const dataObjectIdsBatch = allDataObjectIds.splice(0, 1000) - fullResult = [ - ...fullResult, - ...(await this.multipleEntitiesQuery( - GetDataObjects, - { dataObjectIds: dataObjectIdsBatch }, - 'storageDataObjects' - )), - ] - } + public async getDataObjectsWithBagDetails(ids: string[]): Promise { + return this.multipleEntitiesQuery< + DataObjectsWithBagDetailsByIdsQuery, + DataObjectsWithBagDetailsByIdsQueryVariables + >(DataObjectsWithBagDetailsByIds, { ids: [...ids] }, 'storageDataObjects') + } - return fullResult + /** + * Returns a list of data object ids that belong to a given bag. + * + * @param bagId - query filter: bag ID + */ + public async getDataObjectIdsByBagId(bagId: string): Promise { + const result = await this.multipleEntitiesQuery( + DataObjectIdsByBagId, + { bagId }, + 'storageDataObjects' + ) + return result.map((o) => o.id) } /** diff --git a/storage-node/src/services/queryNode/queries/queries.graphql b/storage-node/src/services/queryNode/queries/queries.graphql index b29c1c51df..32851a9051 100644 --- a/storage-node/src/services/queryNode/queries/queries.graphql +++ b/storage-node/src/services/queryNode/queries/queries.graphql @@ -59,7 +59,13 @@ query getAllStorageBagDetails { } } -fragment DataObjectByBagIdsDetails on StorageDataObject { +query dataObjectIdsByBagId($bagId: String) { + storageDataObjects(where: { storageBag: { id_eq: $bagId } }) { + id + } +} + +fragment DataObjectDetails on StorageDataObject { id size ipfsHash @@ -68,13 +74,7 @@ fragment DataObjectByBagIdsDetails on StorageDataObject { } } -query getDataObjectsByBagIds($bagIds: StorageBagWhereInput) { - storageDataObjects(where: { storageBag: $bagIds, isAccepted_eq: true }) { - ...DataObjectByBagIdsDetails - } -} - -fragment DataObjectDetails on StorageDataObject { +fragment DataObjectWithBagDetails on StorageDataObject { id isAccepted ipfsHash @@ -83,9 +83,63 @@ fragment DataObjectDetails on StorageDataObject { } } -query getDataObjects($dataObjectIds: [String!]) { - storageDataObjects(where: { id_in: $dataObjectIds }) { - ...DataObjectDetails +query dataObjectsByBagsConnection( + $bagIds: [String!] + $limit: Int + $after: String + $includeDetails: Boolean! + $isAccepted: Boolean +) { + storageDataObjectsConnection( + where: { storageBag: { id_in: $bagIds }, isAccepted_eq: $isAccepted } + first: $limit + after: $after + orderBy: id_ASC + ) { + edges { + node { + id + ...DataObjectDetails @include(if: $includeDetails) + } + } + pageInfo { + startCursor + endCursor + hasNextPage + } + } +} + +query dataObjectsByIdsConnection( + $ids: [String!] + $limit: Int + $after: String + $includeDetails: Boolean! + $isAccepted: Boolean +) { + storageDataObjectsConnection( + where: { id_in: $ids, isAccepted_eq: $isAccepted } + first: $limit + after: $after + orderBy: id_ASC + ) { + edges { + node { + id + ...DataObjectDetails @include(if: $includeDetails) + } + } + pageInfo { + startCursor + endCursor + hasNextPage + } + } +} + +query dataObjectsWithBagDetailsByIds($ids: [String!]) { + storageDataObjects(where: { id_in: $ids }) { + ...DataObjectWithBagDetails } } diff --git a/storage-node/src/services/sync/acceptPendingObjects.ts b/storage-node/src/services/sync/acceptPendingObjects.ts index 170498688d..7b8f1db4b8 100644 --- a/storage-node/src/services/sync/acceptPendingObjects.ts +++ b/storage-node/src/services/sync/acceptPendingObjects.ts @@ -91,7 +91,7 @@ export class AcceptPendingObjectsService { } private async processPendingObjects(pendingIds: string[]): Promise { - const pendingDataObjects = await this.qnApi.getDataObjectDetails(pendingIds) + const pendingDataObjects = await this.qnApi.getDataObjectsWithBagDetails(pendingIds) // objects not found in the query node const maybeDeletedObjectIds = pendingIds.filter( diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index bfc99e54e6..fdaf4f5f2f 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -3,12 +3,13 @@ import _ from 'lodash' import superagent from 'superagent' import urljoin from 'url-join' import { getDataObjectIDs } from '../../services/caching/localDataObjects' -import logger from '../../services/logger' +import rootLogger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' -import { DataObjectDetailsFragment } from '../queryNode/generated/queries' -import { DataObligations, getDataObjectsByIDs, getStorageObligationsFromRuntime } from './storageObligations' +import { DataObjectIdsLoader, DataObligations, getStorageObligationsFromRuntime } from './storageObligations' import { DeleteLocalFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess' +import { DataObjectWithBagDetailsFragment } from '../queryNode/generated/queries' +import { Logger } from 'winston' /** * The maximum allowed threshold by which the QN processor can lag behind @@ -43,7 +44,7 @@ export const MINIMUM_REPLICATION_THRESHOLD = parseInt(process.env.CLEANUP_MIN_RE * @param api - (optional) runtime API promise * @param workerId - current storage provider ID * @param buckets - Selected storage buckets - * @param asyncWorkersNumber - maximum parallel downloads number + * @param asyncWorkersNumber - maximum parallel cleanups number * @param asyncWorkersTimeout - downloading asset timeout * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from @@ -57,6 +58,7 @@ export async function performCleanup( uploadDirectory: string, hostId: string ): Promise { + const logger = rootLogger.child({ label: 'Cleanup' }) logger.info('Started cleanup service...') const squidStatus = await qnApi.getState() if (!squidStatus || !squidStatus.height) { @@ -77,89 +79,93 @@ export async function performCleanup( const model = await getStorageObligationsFromRuntime(qnApi, buckets) const storedObjectsIds = getDataObjectIDs() - const assignedObjectsIds = model.dataObjects.map((obj) => obj.id) - const removedIds = _.difference(storedObjectsIds, assignedObjectsIds) - const removedObjects = await getDataObjectsByIDs(qnApi, removedIds) + const assignedObjectsLoader = model.createAssignedObjectsIdsLoader() + const assignedObjectIds = new Set(await assignedObjectsLoader.getAll()) + const obsoleteObjectIds = new Set(storedObjectsIds.filter((id) => !assignedObjectIds.has(id))) - logger.debug(`Cleanup - stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectsIds.length}`) - logger.debug(`Cleanup - pruning ${removedIds.length} obsolete objects`) + // If objects are obsolete but still exist: They are "moved" objects + const movedObjectsLoader = new DataObjectIdsLoader(qnApi, { by: 'ids', ids: Array.from(obsoleteObjectIds) }) + const movedObjectIds = new Set(await movedObjectsLoader.getAll()) - // Data objects permanently deleted from the runtime - const deletedDataObjects = removedIds.filter( - (removedId) => !removedObjects.some((removedObject) => removedObject.id === removedId) - ) + // If objects are obsolete and don't exist: They are "deleted objects" + const deletedDataObjectIds = new Set([...obsoleteObjectIds].filter((id) => !movedObjectIds.has(id))) - // Data objects no-longer assigned to current storage-node - // operated buckets, and have been moved to other buckets - const movedDataObjects = removedObjects + logger.info(`stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectIds.size}`) + logger.info( + `pruning ${obsoleteObjectIds.size} obsolete objects ` + + `(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)` + ) const workingStack = new WorkingStack() const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - const deletionTasksOfDeletedDataObjects = await Promise.all( - deletedDataObjects.map((dataObject) => new DeleteLocalFileTask(uploadDirectory, dataObject)) - ) - const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( - buckets, - uploadDirectory, - model, - movedDataObjects, - hostId - ) + // Execute deleted objects removal tasks in batches of 10_000 + let deletedProcessed = 0 + logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) + for (const deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) { + const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id)) + await workingStack.add(deletionTasks) + await processSpawner.process() + deletedProcessed += deletedObjectsIdsBatch.length + logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) + } + + // Execute moved objects removal tasks in batches of 10_000 + let movedProcessed = 0 + logger.info(`removing ${movedObjectIds.size} moved objects...`) + for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) { + const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) + const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( + logger, + uploadDirectory, + model, + movedDataObjectsBatch, + hostId + ) + await workingStack.add(deletionTasksOfMovedDataObjects) + await processSpawner.process() + movedProcessed += movedDataObjectsBatch.length + logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) + } - await workingStack.add(deletionTasksOfDeletedDataObjects) - await workingStack.add(deletionTasksOfMovedDataObjects) - await processSpawner.process() logger.info('Cleanup ended.') } /** * Creates the local file deletion tasks. * - * @param ownBuckets - list of bucket ids operated by this node + * @param logger - cleanup service logger * @param uploadDirectory - local directory for data uploading * @param dataObligations - defines the current data obligations for the node * @param movedDataObjects- obsolete (no longer assigned) data objects that has been moved to other buckets + * @param hostId - host id of the current node */ async function getDeletionTasksFromMovedDataObjects( - ownBuckets: string[], + logger: Logger, uploadDirectory: string, dataObligations: DataObligations, - movedDataObjects: DataObjectDetailsFragment[], + movedDataObjects: DataObjectWithBagDetailsFragment[], hostId: string ): Promise { - const ownOperatorUrls: string[] = [] - for (const entry of dataObligations.storageBuckets) { - if (ownBuckets.includes(entry.id)) { - ownOperatorUrls.push(entry.operatorUrl) - } - } - - const bucketOperatorUrlById = new Map() - for (const entry of dataObligations.storageBuckets) { - if (!ownBuckets.includes(entry.id)) { - if (ownOperatorUrls.includes(entry.operatorUrl)) { - logger.warn(`(cleanup) Skipping remote bucket ${entry.id} - ${entry.operatorUrl}`) - } else { - bucketOperatorUrlById.set(entry.id, entry.operatorUrl) - } - } - } - const timeoutMs = 60 * 1000 // 1 minute since it's only a HEAD request const deletionTasks: DeleteLocalFileTask[] = [] + + const { bucketOperatorUrlById } = dataObligations await Promise.allSettled( movedDataObjects.map(async (movedDataObject) => { let dataObjectReplicationCount = 0 for (const { storageBucket } of movedDataObject.storageBag.storageBuckets) { - const url = urljoin(bucketOperatorUrlById.get(storageBucket.id), 'api/v1/files', movedDataObject.id) - await superagent.head(url).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) - dataObjectReplicationCount++ + const nodeUrl = bucketOperatorUrlById.get(storageBucket.id) + if (nodeUrl) { + const fileUrl = urljoin(nodeUrl, 'api/v1/files', movedDataObject.id) + await superagent.head(fileUrl).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) + dataObjectReplicationCount++ + } } if (dataObjectReplicationCount < MINIMUM_REPLICATION_THRESHOLD) { - logger.warn(`Cleanup - data object replication threshold unmet - file deletion canceled: ${movedDataObject.id}`) + logger.warn(`data object replication threshold unmet - file deletion canceled: ${movedDataObject.id}`) return } diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index 58f6b75ac6..c9369b9d42 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -1,12 +1,7 @@ import _ from 'lodash' import logger from '../logger' -import { MAX_RESULTS_PER_QUERY, QueryNodeApi } from '../queryNode/api' -import { - DataObjectByBagIdsDetailsFragment, - DataObjectDetailsFragment, - StorageBagDetailsFragment, - StorageBucketDetailsFragment, -} from '../queryNode/generated/queries' +import { MAX_RESULTS_PER_QUERY, PaginationQueryResult, QueryNodeApi } from '../queryNode/api' +import { DataObjectDetailsFragment, StorageBucketDetailsFragment } from '../queryNode/generated/queries' import { ApiPromise } from '@polkadot/api' import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup' @@ -25,9 +20,19 @@ export type DataObligations = { bags: Bag[] /** - * Assigned data objects for the storage provider. + * Map from bucket id to storage node url, without own buckets. */ - dataObjects: DataObject[] + bucketOperatorUrlById: Map + + /** + * Map from assigned bag ids to storage node urls. + */ + bagOperatorsUrlsById: Map + + /** + * A function that returns a loader of all assigned data object ids + */ + createAssignedObjectsIdsLoader(isAccepted?: boolean): DataObjectIdsLoader } /** @@ -90,6 +95,93 @@ export type DataObject = { size: number } +export abstract class LazyBatchLoader, MappedEntity> { + private endCursor: string | undefined + private _hasNextPage: boolean + private queryFn: (limit: number, after?: string) => Promise + + constructor(queryFn: (limit: number, after?: string) => Promise) { + this.queryFn = queryFn + this._hasNextPage = true + } + + public get hasNextPage(): boolean { + return this._hasNextPage + } + + abstract mapResults(results: QueryResult['edges'][number]['node'][]): Promise + + async nextBatch(size = 10_000): Promise { + if (!this._hasNextPage) { + return null + } + const result = await this.queryFn(size, this.endCursor) + if (!result) { + throw new Error('Connection query returned empty result') + } + + this.endCursor = result.pageInfo.endCursor || undefined + this._hasNextPage = result.pageInfo.hasNextPage + const mapped = await this.mapResults(result.edges.map((e) => e.node)) + return mapped + } + + async getAll(): Promise { + const results: MappedEntity[] = [] + while (this._hasNextPage) { + const batch = await this.nextBatch() + if (!batch) { + break + } + results.push(...batch) + } + + return results + } +} + +type DataObjectsLoadBy = { by: 'bagIds' | 'ids'; ids: string[]; isAccepted?: boolean } + +export class DataObjectDetailsLoader extends LazyBatchLoader< + PaginationQueryResult, + DataObject +> { + constructor(qnApi: QueryNodeApi, by: DataObjectsLoadBy) { + if (by.by === 'bagIds') { + super((limit, after) => qnApi.getDataObjectsByBagsPage(by.ids, limit, after, true, by.isAccepted)) + } else if (by.by === 'ids') { + super((limit, after) => qnApi.getDataObjectsByIdsPage(by.ids, limit, after, true, by.isAccepted)) + } else { + throw new Error(`Unknown "by" condition: ${JSON.stringify(by)}`) + } + } + + async mapResults(results: DataObjectDetailsFragment[]): Promise { + return results.map((dataObject) => ({ + id: dataObject.id, + size: parseInt(dataObject.size), + bagId: dataObject.storageBag.id, + ipfsHash: dataObject.ipfsHash, + })) + } +} + +export class DataObjectIdsLoader extends LazyBatchLoader, string> { + constructor(qnApi: QueryNodeApi, by: DataObjectsLoadBy) { + if (by.by === 'bagIds') { + super((limit, after) => qnApi.getDataObjectsByBagsPage(by.ids, limit, after, false, by.isAccepted)) + } else if (by.by === 'ids') { + super((limit, after) => qnApi.getDataObjectsByIdsPage(by.ids, limit, after, false, by.isAccepted)) + } else { + throw new Error(`Unknown "by" condition: ${JSON.stringify(by)}`) + } + } + + async mapResults(results: { id: string }[]): Promise { + return results.map(({ id }) => id) + } +} + /** * Get storage provider obligations like (assigned data objects) from the * runtime (Query Node). @@ -102,30 +194,58 @@ export async function getStorageObligationsFromRuntime( qnApi: QueryNodeApi, bucketIds?: string[] ): Promise { - const allBuckets = await getAllBuckets(qnApi) + const storageBuckets = (await getAllBuckets(qnApi)).map((bucket) => ({ + id: bucket.id, + operatorUrl: bucket.operatorMetadata?.nodeEndpoint ?? '', + workerId: bucket.operatorStatus?.workerId, + })) + + const bags = ( + bucketIds === undefined ? await qnApi.getAllStorageBagsDetails() : await qnApi.getStorageBagsDetails(bucketIds) + ).map((bag) => ({ + id: bag.id, + buckets: bag.storageBuckets.map((bucketInBag) => bucketInBag.storageBucket.id), + })) + + const ownBuckets = new Set(bucketIds || []) + const ownOperatorUrls = new Set() + for (const bucket of storageBuckets) { + if (ownBuckets.has(bucket.id)) { + ownOperatorUrls.add(bucket.operatorUrl) + } + } - const assignedBags = - bucketIds === undefined ? await qnApi.getAllStorageBagsDetails() : await getAllAssignedBags(qnApi, bucketIds) + const bucketOperatorUrlById = new Map() + for (const bucket of storageBuckets) { + if (!ownBuckets.has(bucket.id)) { + if (ownOperatorUrls.has(bucket.operatorUrl)) { + logger.warn(`(sync) Skipping remote bucket ${bucket.id} - ${bucket.operatorUrl}`) + } else { + bucketOperatorUrlById.set(bucket.id, bucket.operatorUrl) + } + } + } + + const bagOperatorsUrlsById = new Map() + for (const bag of bags) { + const operatorUrls = [] + for (const bucketId of bag.buckets) { + const operatorUrl = bucketOperatorUrlById.get(bucketId) + if (operatorUrl) { + operatorUrls.push(operatorUrl) + } + } - const bagIds = assignedBags.map((bag) => bag.id) - const assignedDataObjects = await getAllAssignedDataObjects(qnApi, bagIds) + bagOperatorsUrlsById.set(bag.id, operatorUrls) + } const model: DataObligations = { - storageBuckets: allBuckets.map((bucket) => ({ - id: bucket.id, - operatorUrl: bucket.operatorMetadata?.nodeEndpoint ?? '', - workerId: bucket.operatorStatus?.workerId, - })), - bags: assignedBags.map((bag) => ({ - id: bag.id, - buckets: bag.storageBuckets.map((bucketInBag) => bucketInBag.storageBucket.id), - })), - dataObjects: assignedDataObjects.map((dataObject) => ({ - id: dataObject.id, - size: parseInt(dataObject.size), - bagId: dataObject.storageBag.id, - ipfsHash: dataObject.ipfsHash, - })), + storageBuckets, + bags, + bagOperatorsUrlsById, + bucketOperatorUrlById, + createAssignedObjectsIdsLoader: (isAccepted?: boolean) => + new DataObjectIdsLoader(qnApi, { by: 'bagIds', ids: bags.map((b) => b.id), isAccepted }), } return model @@ -145,19 +265,6 @@ export async function getStorageBucketIdsByWorkerId(qnApi: QueryNodeApi, workerI return ids } -/** - * Get IDs of the data objects assigned to the bag ID. - * - * @param qnApi - initialized QueryNodeApi instance - * @param bagId - bag ID - * @returns data object IDs - */ -export async function getDataObjectIDsByBagId(qnApi: QueryNodeApi, bagId: string): Promise { - const dataObjects = await getAllAssignedDataObjects(qnApi, [bagId]) - - return dataObjects.map((obj) => obj.id) -} - /** * Get all storage buckets registered in the runtime (Query Node). * @@ -179,45 +286,6 @@ async function getAllBuckets(api: QueryNodeApi): Promise { - return await api.getDataObjectsByBagIds(bagIds) -} - -/** - * Get details of storage data objects by IDs. - * - * @param api - initialized QueryNodeApi instance - * @param dataObjectIds - data objects' IDs - * @returns storage data objects - */ -export async function getDataObjectsByIDs( - api: QueryNodeApi, - dataObjectIds: string[] -): Promise { - return await api.getDataObjectDetails(dataObjectIds) -} - -/** - * Get all bags assigned to storage provider. - * - * @param api - initialiazed QueryNodeApi instance - * @param bucketIds - assigned storage provider buckets' IDs - * @returns storage bag data - */ -async function getAllAssignedBags(api: QueryNodeApi, bucketIds: string[]): Promise { - return await api.getStorageBagsDetails(bucketIds) -} - /** * Abstract object acquiring function for the QueryNode. It uses paging for * queries and gets data using record offset and limit (hardcoded to 1000). diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index a4a7f0a409..93118036bd 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -1,7 +1,12 @@ import { getDataObjectIDs, isDataObjectIdInCache } from '../../services/caching/localDataObjects' import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' -import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' +import { + DataObject, + DataObjectDetailsLoader, + DataObligations, + getStorageObligationsFromRuntime, +} from './storageObligations' import { DownloadFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess' import _ from 'lodash' @@ -48,35 +53,39 @@ export async function performSync( const model = await getStorageObligationsFromRuntime(qnApi, buckets) const storedObjectIds = getDataObjectIDs() - const assignedObjects = model.dataObjects - const assignedObjectIds = assignedObjects.map((obj) => obj.id) + const assignedObjectIdsLoader = model.createAssignedObjectsIdsLoader(true) + const assignedObjectIds = new Set(await assignedObjectIdsLoader.getAll()) - const added = assignedObjects.filter((obj) => !isDataObjectIdInCache(obj.id)) - const removed = _.difference(storedObjectIds, assignedObjectIds) + const unsyncedObjectIds = [...assignedObjectIds].filter((id) => !isDataObjectIdInCache(id)) + const obsoleteObjectsNum = storedObjectIds.reduce((count, id) => (assignedObjectIds.has(id) ? count : count + 1), 0) - logger.debug(`Sync - new objects: ${added.length}`) - logger.debug(`Sync - obsolete objects: ${removed.length}`) + logger.debug(`Sync - new objects: ${unsyncedObjectIds.length}`) + logger.debug(`Sync - obsolete objects: ${obsoleteObjectsNum}`) const workingStack = new WorkingStack() - - const addedTasks = await getDownloadTasks( - model, - buckets, - added, - uploadDirectory, - tempDirectory, - asyncWorkersTimeout, - hostId, - selectedOperatorUrl - ) - - logger.debug(`Sync - started processing...`) - const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - await workingStack.add(addedTasks) + // Process unsynced objects in batches od 10_000 + logger.debug(`Sync - started processing...`) + let processed = 0 + for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, 10_000)) { + const objectsLoader = new DataObjectDetailsLoader(qnApi, { by: 'ids', ids: unsyncedIdsBatch }) + const objectsBatch = await objectsLoader.getAll() + const syncTasks = await getDownloadTasks( + model, + objectsBatch, + uploadDirectory, + tempDirectory, + asyncWorkersTimeout, + hostId, + selectedOperatorUrl + ) + await workingStack.add(syncTasks) + await processSpawner.process() + processed += objectsBatch.length + logger.debug(`Sync - processed ${processed} / ${unsyncedObjectIds.length} objects...`) + } - await processSpawner.process() logger.info('Sync ended.') } @@ -84,8 +93,7 @@ export async function performSync( * Creates the download tasks. * * @param dataObligations - defines the current data obligations for the node - * @param ownBuckets - list of bucket ids operated this node - * @param addedIds - data object IDs to download + * @param dataObjects - list of data objects to download * @param uploadDirectory - local directory for data uploading * @param tempDirectory - local directory for temporary data uploading * @param taskSink - a destination for the newly created tasks @@ -95,65 +103,18 @@ export async function performSync( */ export async function getDownloadTasks( dataObligations: DataObligations, - ownBuckets: string[], - added: DataObligations['dataObjects'], + dataObjects: DataObject[], uploadDirectory: string, tempDirectory: string, asyncWorkersTimeout: number, hostId: string, selectedOperatorUrl?: string ): Promise { - const bagIdByDataObjectId = new Map() - for (const entry of dataObligations.dataObjects) { - bagIdByDataObjectId.set(entry.id, entry.bagId) - } - - const ownOperatorUrls: string[] = [] - for (const entry of dataObligations.storageBuckets) { - if (ownBuckets.includes(entry.id)) { - ownOperatorUrls.push(entry.operatorUrl) - } - } - - const bucketOperatorUrlById = new Map() - for (const entry of dataObligations.storageBuckets) { - if (!ownBuckets.includes(entry.id)) { - if (ownOperatorUrls.includes(entry.operatorUrl)) { - logger.warn(`(sync) Skipping remote bucket ${entry.id} - ${entry.operatorUrl}`) - } else { - bucketOperatorUrlById.set(entry.id, entry.operatorUrl) - } - } - } - - const bagOperatorsUrlsById = new Map() - for (const entry of dataObligations.bags) { - const operatorUrls = [] - - for (const bucket of entry.buckets) { - if (bucketOperatorUrlById.has(bucket)) { - const operatorUrl = bucketOperatorUrlById.get(bucket) - if (operatorUrl) { - operatorUrls.push(operatorUrl) - } - } - } - - bagOperatorsUrlsById.set(entry.id, operatorUrls) - } - - const tasks = added.map((dataObject) => { - let operatorUrls: string[] = [] // can be empty after look up - let bagId = null - if (bagIdByDataObjectId.has(dataObject.id)) { - bagId = bagIdByDataObjectId.get(dataObject.id) - if (bagOperatorsUrlsById.has(bagId)) { - operatorUrls = bagOperatorsUrlsById.get(bagId) - } - } + const { bagOperatorsUrlsById } = dataObligations + const tasks = dataObjects.map((dataObject) => { return new DownloadFileTask( - selectedOperatorUrl ? [selectedOperatorUrl] : operatorUrls, + selectedOperatorUrl ? [selectedOperatorUrl] : bagOperatorsUrlsById.get(dataObject.bagId) || [], dataObject.id, dataObject.ipfsHash, dataObject.size, diff --git a/storage-node/src/services/webApi/controllers/stateApi.ts b/storage-node/src/services/webApi/controllers/stateApi.ts index 10273719f4..ea49a6d58c 100644 --- a/storage-node/src/services/webApi/controllers/stateApi.ts +++ b/storage-node/src/services/webApi/controllers/stateApi.ts @@ -8,7 +8,6 @@ import { promisify } from 'util' import { getDataObjectIDs } from '../../../services/caching/localDataObjects' import logger from '../../logger' import { QueryNodeApi } from '../../queryNode/api' -import { getDataObjectIDsByBagId } from '../../sync/storageObligations' import { DataObjectResponse, DataStatsResponse, @@ -168,7 +167,7 @@ async function getCachedDataObjectsObligations(qnApi: QueryNodeApi, bagId: strin const entryName = `data_object_obligations_${bagId}` if (!dataCache.has(entryName)) { - const data = await getDataObjectIDsByBagId(qnApi, bagId) + const data = await qnApi.getDataObjectIdsByBagId(bagId) dataCache.set(entryName, data) } From 0094a618f60afc2588c0e252a583c468e9b40a53 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Tue, 26 Nov 2024 09:46:22 +0100 Subject: [PATCH 2/9] Update changelog --- storage-node/CHANGELOG.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index da23df9c92..a3cf96681f 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,6 +1,12 @@ ### 4.3.0 -- Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup. +- **New feature:** `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup. +- **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: + - `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server. + - Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory. + - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`) +- Improved logging during cleanup + ### 4.2.0 From 2520e15017ca4ba5030d93bbf1c1497744900def Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Tue, 26 Nov 2024 10:16:41 +0100 Subject: [PATCH 3/9] Colossus cleanup: Additional safety mechanism --- storage-node/CHANGELOG.md | 5 +- .../src/services/sync/cleanupService.ts | 87 +++++++++++-------- 2 files changed, 56 insertions(+), 36 deletions(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index a3cf96681f..8982fb9db2 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -4,8 +4,9 @@ - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server. - Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory. - - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`) -- Improved logging during cleanup + - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). +- A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. +- Improved logging during cleanup. ### 4.2.0 diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index fdaf4f5f2f..8f1eaa1d40 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -91,41 +91,60 @@ export async function performCleanup( const deletedDataObjectIds = new Set([...obsoleteObjectIds].filter((id) => !movedObjectIds.has(id))) logger.info(`stored objects: ${storedObjectsIds.length}, assigned objects: ${assignedObjectIds.size}`) - logger.info( - `pruning ${obsoleteObjectIds.size} obsolete objects ` + - `(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)` - ) - - const workingStack = new WorkingStack() - const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - - // Execute deleted objects removal tasks in batches of 10_000 - let deletedProcessed = 0 - logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) - for (const deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) { - const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id)) - await workingStack.add(deletionTasks) - await processSpawner.process() - deletedProcessed += deletedObjectsIdsBatch.length - logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) - } - - // Execute moved objects removal tasks in batches of 10_000 - let movedProcessed = 0 - logger.info(`removing ${movedObjectIds.size} moved objects...`) - for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) { - const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) - const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( - logger, - uploadDirectory, - model, - movedDataObjectsBatch, - hostId + if (obsoleteObjectIds.size) { + logger.info( + `pruning ${obsoleteObjectIds.size} obsolete objects ` + + `(${movedObjectIds.size} moved, ${deletedDataObjectIds.size} deleted)` ) - await workingStack.add(deletionTasksOfMovedDataObjects) - await processSpawner.process() - movedProcessed += movedDataObjectsBatch.length - logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) + + const workingStack = new WorkingStack() + const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) + + // Execute deleted objects removal tasks in batches of 10_000 + if (deletedDataObjectIds.size) { + let deletedProcessed = 0 + logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) + for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) { + // Confirm whether the objects were actually deleted by fetching the related deletion events + const dataObjectDeletedEvents = await qnApi.getDataObjectDeletedEvents(deletedObjectsIdsBatch) + const confirmedIds = new Set(dataObjectDeletedEvents.map((e) => e.data.dataObjectId)) + deletedObjectsIdsBatch = deletedObjectsIdsBatch.filter((id) => { + if (confirmedIds.has(id)) { + return true + } else { + logger.warn(`Could not find DataObjectDeleted event for object ${id}, skipping from cleanup...`) + return false + } + }) + const deletionTasks = deletedObjectsIdsBatch.map((id) => new DeleteLocalFileTask(uploadDirectory, id)) + await workingStack.add(deletionTasks) + await processSpawner.process() + deletedProcessed += deletedObjectsIdsBatch.length + logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) + } + } + + // Execute moved objects removal tasks in batches of 10_000 + if (movedObjectIds.size) { + let movedProcessed = 0 + logger.info(`removing ${movedObjectIds.size} moved objects...`) + for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) { + const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) + const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( + logger, + uploadDirectory, + model, + movedDataObjectsBatch, + hostId + ) + await workingStack.add(deletionTasksOfMovedDataObjects) + await processSpawner.process() + movedProcessed += movedDataObjectsBatch.length + logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) + } + } + } else { + logger.info('No objects to prune, skipping...') } logger.info('Cleanup ended.') From 276cd31bc6bc988710263b639068f07a6a3fab78 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 12 Dec 2024 11:03:15 +0100 Subject: [PATCH 4/9] Update CHANGELOG and package.json --- storage-node/CHANGELOG.md | 6 ++++-- storage-node/package.json | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index 8982fb9db2..48c10aa1b5 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,6 +1,5 @@ -### 4.3.0 +### 4.4.0 -- **New feature:** `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup. - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server. - Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory. @@ -8,6 +7,9 @@ - A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. - Improved logging during cleanup. +### 4.3.0 + +- Adds `archive` mode / command, which allows downloading, compressing and uploading all data objects to an external S3 bucket that can be used as a backup. ### 4.2.0 diff --git a/storage-node/package.json b/storage-node/package.json index a7e782ad41..cf13a67859 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -1,7 +1,7 @@ { "name": "storage-node", "description": "Joystream storage subsystem.", - "version": "4.3.0", + "version": "4.4.0", "author": "Joystream contributors", "bin": { "storage-node": "./bin/run" From 7c534e8b86640ac246edc394d9f55c816cdae486 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Thu, 12 Dec 2024 11:04:41 +0100 Subject: [PATCH 5/9] Update README --- storage-node/README.md | 552 +++++++++++++++++++---------------------- 1 file changed, 260 insertions(+), 292 deletions(-) diff --git a/storage-node/README.md b/storage-node/README.md index 52fa902d43..e7228f6c62 100644 --- a/storage-node/README.md +++ b/storage-node/README.md @@ -170,6 +170,7 @@ There is also an option to run Colossus as [Docker container](../colossus.Docker * [`storage-node util:cleanup`](#storage-node-utilcleanup) * [`storage-node util:fetch-bucket`](#storage-node-utilfetch-bucket) * [`storage-node util:multihash`](#storage-node-utilmultihash) +* [`storage-node util:search-archives`](#storage-node-utilsearch-archives) * [`storage-node util:verify-bag-id`](#storage-node-utilverify-bag-id) ## `storage-node archive` @@ -181,11 +182,6 @@ USAGE $ storage-node archive OPTIONS - -b, --buckets=buckets - [default: 1] Comma separated list of bucket IDs to sync. Buckets that are not assigned to - worker are ignored. - If not specified all buckets will be synced. - -e, --elasticSearchEndpoint=elasticSearchEndpoint Elasticsearch endpoint (e.g.: http://some.com:8081). Log level could be set using the ELASTIC_LOG_LEVEL environment variable. @@ -210,12 +206,12 @@ OPTIONS [default: 7] Maximum rolling log files number. -p, --password=password - Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If - not specified a single password can be set in ACCOUNT_PWD environment variable. + Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If not specified a single + password can be set in ACCOUNT_PWD environment variable. -q, --storageSquidEndpoint=storageSquidEndpoint - (required) [default: http://localhost:4352/graphql] Storage Squid graphql server endpoint - (e.g.: http://some.com:4352/graphql) + (required) [default: http://localhost:4352/graphql] Storage Squid graphql server endpoint (e.g.: + http://some.com:4352/graphql) -r, --syncWorkersNumber=syncWorkersNumber [default: 8] Sync workers number (max async operations in progress). @@ -233,19 +229,17 @@ OPTIONS [default: 50000000] Maximum rolling log files size in bytes. -y, --accountUri=accountUri - Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment - variable. + Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment variable. -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) [default: daily] Log files update frequency. --archiveFileSizeLimitMB=archiveFileSizeLimitMB - [default: 1000] Try to avoid creating archive files larger than this size limit (in MB) - unless necessary + [default: 1000] Try to avoid creating archive files larger than this size limit (in MB) unless unaviodable. --archiveTrackfileBackupFreqMinutes=archiveTrackfileBackupFreqMinutes - [default: 60] Determines how frequently the archive tracking file (containing information - about .7z files content) should be uploaded to S3 in case a change is detected. + [default: 60] Specifies how frequently the archive tracking file (containing information about archive files + content) should be uploaded to S3 (in case it's changed). --awsS3BucketName=awsS3BucketName (required) Name of the AWS S3 bucket where the files will be stored. @@ -253,52 +247,65 @@ OPTIONS --awsS3BucketRegion=awsS3BucketRegion (required) AWS region of the AWS S3 bucket where the files will be stored. + --awsStorageClass=(DEEP_ARCHIVE|EXPRESS_ONEZONE|GLACIER|GLACIER_IR|INTELLIGENT_TIERING|ONEZONE_IA|OUTPOSTS|REDUCED_RED + UNDANCY|SNOW|STANDARD|STANDARD_IA) + (required) [default: DEEP_ARCHIVE] AWS S3 storage class to use when uploading the archives to S3. + + --compressionAlgorithm=(7zip|zstd|none) + (required) [default: zstd] Compression algorithm to use for archive files + + --compressionLevel=(low|medium|high) + (required) [default: medium] Compression level to use for archive files (lower is faster, but provides lower storage + savings) + + --compressionThreads=compressionThreads + (required) [default: 1] Number of threads to use for compression. Note that {uploadWorkersNumber} upload tasks may + be running at once and each of them can spawn a separate compression task which uses {compressionThreads} threads! + --elasticSearchIndexPrefix=elasticSearchIndexPrefix - [default: logs-colossus] Elasticsearch index prefix. Node ID will be appended to the prefix. - Default: logs-colossus. Can be passed through ELASTIC_INDEX_PREFIX environment variable. + [default: logs-colossus] Elasticsearch index prefix. Node ID will be appended to the prefix. Default: logs-colossus. + Can be passed through ELASTIC_INDEX_PREFIX environment variable. --elasticSearchPassword=elasticSearchPassword - Elasticsearch password for basic authentication. Can be passed through ELASTIC_PASSWORD - environment variable. + Elasticsearch password for basic authentication. Can be passed through ELASTIC_PASSWORD environment variable. --elasticSearchUser=elasticSearchUser - Elasticsearch user for basic authentication. Can be passed through ELASTIC_USER environment - variable. + Elasticsearch user for basic authentication. Can be passed through ELASTIC_USER environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. --localAgeTriggerThresholdMinutes=localAgeTriggerThresholdMinutes - [default: 1440] Compress and upload local data objects to S3 if the oldest of them was - downloaded more than X minutes ago + [default: 1440] Compress and upload all local data objects to S3 if the oldest of them was downloaded more than X + minutes ago --localCountTriggerThreshold=localCountTriggerThreshold - Compress and upload local data objects to S3 if the number of them reaches this threshold. + Compress and upload all local data objects to S3 if the number of them reaches this threshold. --localSizeTriggerThresholdMB=localSizeTriggerThresholdMB - [default: 10000] Compress and upload local data objects to S3 if the combined size of them - reaches this threshold (in MB) + [default: 10000] Compress and upload all local data objects to S3 if the combined size of them reaches this + threshold (in MB) + + --statsLoggingInterval=statsLoggingInterval + (required) [default: 60] How often the upload/download/compression statistics summary will be logged (in minutes). --tmpDownloadDir=tmpDownloadDir - (required) Directory to store tempory files during sync (absolute path). + (required) Directory to store temporary data (downloads in progress) during sync (absolute path). --uploadQueueDir=uploadQueueDir - (required) Directory to store fully downloaded data objects before compressing them and - uploading to S3 (absolute path). + (required) Directory to store fully downloaded data objects before compressing them and uploading to S3 (absolute + path). --uploadQueueDirSizeLimitMB=uploadQueueDirSizeLimitMB - (required) [default: 20000] Limits the total size of files stored in upload queue directory - (in MB). Download of the new objects will be limitted in order to prevent exceeding this - limit. To leave a safe margin of error, it should be set to ~50% of available disk space. - - --uploadRetryInterval=uploadRetryInterval - [default: 3] Interval before retrying failed upload (in minutes) + (required) [default: 20000] Limits the total size of files stored in upload queue directory (in MB). Download of the + new objects may be slowed down in order to try to prevent exceeding this limit. WARNING: To leave a safe margin of + error (for compression etc.), it should be set to ~50% of available disk space. --uploadWorkersNumber=uploadWorkersNumber [default: 4] Upload workers number (max async operations in progress). ``` -_See code: [src/commands/archive.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/archive.ts)_ +_See code: [src/commands/archive.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/archive.ts)_ ## `storage-node help [COMMAND]` @@ -331,20 +338,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/cancel-invite.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/cancel-invite.ts)_ +_See code: [src/commands/leader/cancel-invite.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/cancel-invite.ts)_ ## `storage-node leader:create-bucket` @@ -362,22 +367,20 @@ OPTIONS -m, --dev Use development mode -n, --number=number Storage bucket max total objects number - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --size=size Storage bucket max total objects size - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/create-bucket.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/create-bucket.ts)_ +_See code: [src/commands/leader/create-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/create-bucket.ts)_ ## `storage-node leader:delete-bucket` @@ -393,20 +396,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/delete-bucket.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/delete-bucket.ts)_ +_See code: [src/commands/leader/delete-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/delete-bucket.ts)_ ## `storage-node leader:invite-operator` @@ -422,22 +423,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -w, --operatorId=operatorId (required) Storage bucket operator ID (storage group worker ID) - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/invite-operator.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/invite-operator.ts)_ +_See code: [src/commands/leader/invite-operator.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/invite-operator.ts)_ ## `storage-node leader:remove-operator` @@ -453,20 +452,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/remove-operator.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/remove-operator.ts)_ +_See code: [src/commands/leader/remove-operator.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/remove-operator.ts)_ ## `storage-node leader:set-bucket-limits` @@ -483,22 +480,20 @@ OPTIONS -m, --dev Use development mode -o, --objects=objects (required) New 'voucher object number limit' value - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --size=size (required) New 'voucher object size limit' value - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/set-bucket-limits.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/set-bucket-limits.ts)_ +_See code: [src/commands/leader/set-bucket-limits.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/set-bucket-limits.ts)_ ## `storage-node leader:set-global-uploading-status` @@ -513,22 +508,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --set=(on|off) (required) Sets global uploading block (on/off). - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/set-global-uploading-status.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/set-global-uploading-status.ts)_ +_See code: [src/commands/leader/set-global-uploading-status.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/set-global-uploading-status.ts)_ ## `storage-node leader:update-bag-limit` @@ -544,20 +537,18 @@ OPTIONS -l, --limit=limit (required) New StorageBucketsPerBagLimit value -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bag-limit.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-bag-limit.ts)_ +_See code: [src/commands/leader/update-bag-limit.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bag-limit.ts)_ ## `storage-node leader:update-bags` @@ -594,8 +585,8 @@ OPTIONS Use development mode -p, --password=password - Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If - not specified a single password can be set in ACCOUNT_PWD environment variable. + Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If not specified a single + password can be set in ACCOUNT_PWD environment variable. -r, --remove=remove [default: ] Comma separated list of bucket IDs to remove from all bag/s @@ -607,14 +598,13 @@ OPTIONS [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -y, --accountUri=accountUri - Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment - variable. + Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bags.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-bags.ts)_ +_See code: [src/commands/leader/update-bags.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bags.ts)_ ## `storage-node leader:update-blacklist` @@ -630,22 +620,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -r, --remove=remove [default: ] Content ID to remove - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-blacklist.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-blacklist.ts)_ +_See code: [src/commands/leader/update-blacklist.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-blacklist.ts)_ ## `storage-node leader:update-bucket-status` @@ -661,23 +649,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -s, --set=(on|off) (required) Sets 'accepting new bags' parameter for the bucket - (on/off). + -s, --set=(on|off) (required) Sets 'accepting new bags' parameter for the bucket (on/off). - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-bucket-status.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-bucket-status.ts)_ +_See code: [src/commands/leader/update-bucket-status.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-bucket-status.ts)_ ## `storage-node leader:update-data-fee` @@ -693,20 +678,18 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-data-fee.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-data-fee.ts)_ +_See code: [src/commands/leader/update-data-fee.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-data-fee.ts)_ ## `storage-node leader:update-data-object-bloat-bond` @@ -721,22 +704,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -v, --value=value (required) New data object bloat bond value - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-data-object-bloat-bond.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-data-object-bloat-bond.ts)_ +_See code: [src/commands/leader/update-data-object-bloat-bond.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-data-object-bloat-bond.ts)_ ## `storage-node leader:update-dynamic-bag-policy` @@ -752,23 +733,21 @@ OPTIONS -m, --dev Use development mode -n, --number=number (required) New storage buckets number - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, - to try against all files. If not specified a single password - can be set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all + files. If not specified a single password can be set in ACCOUNT_PWD environment + variable. -t, --bagType=(Channel|Member) (required) Dynamic bag type (Channel, Member). - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be - set in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. - --keyStore=keyStore Path to a folder with multiple key files to load into - keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-dynamic-bag-policy.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-dynamic-bag-policy.ts)_ +_See code: [src/commands/leader/update-dynamic-bag-policy.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-dynamic-bag-policy.ts)_ ## `storage-node leader:update-voucher-limits` @@ -784,22 +763,20 @@ OPTIONS -m, --dev Use development mode -o, --objects=objects (required) New 'max voucher object number limit' value - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. -s, --size=size (required) New 'max voucher object size limit' value - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/leader/update-voucher-limits.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/leader/update-voucher-limits.ts)_ +_See code: [src/commands/leader/update-voucher-limits.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/leader/update-voucher-limits.ts)_ ## `storage-node operator:accept-invitation` @@ -815,27 +792,24 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords - can be passed, to try against all files. If not - specified a single password can be set in + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try + against all files. If not specified a single password can be set in ACCOUNT_PWD environment variable. -t, --transactorAccountId=transactorAccountId (required) Transactor account ID (public key) - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. - Mandatory in non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev + environment. -w, --workerId=workerId (required) Storage operator worker ID - -y, --accountUri=accountUri Account URI (optional). If not specified a - single key can be set in ACCOUNT_URI - environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in + ACCOUNT_URI environment variable. - --keyStore=keyStore Path to a folder with multiple key files to - load into keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/operator/accept-invitation.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/operator/accept-invitation.ts)_ +_See code: [src/commands/operator/accept-invitation.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/operator/accept-invitation.ts)_ ## `storage-node operator:set-metadata` @@ -853,22 +827,20 @@ OPTIONS -k, --keyFile=keyFile Path to key file to add to the keyring. -m, --dev Use development mode - -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to - try against all files. If not specified a single password can be - set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. + If not specified a single password can be set in ACCOUNT_PWD environment variable. - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in - non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. -w, --workerId=workerId (required) Storage operator worker ID - -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set - in ACCOUNT_URI environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in ACCOUNT_URI + environment variable. --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/operator/set-metadata.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/operator/set-metadata.ts)_ +_See code: [src/commands/operator/set-metadata.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/operator/set-metadata.ts)_ ## `storage-node server` @@ -879,109 +851,102 @@ USAGE $ storage-node server OPTIONS - -b, --buckets=buckets - [default: ] Comma separated list of bucket IDs to service. Buckets that are not assigned to - worker are ignored. If not specified all buckets will be serviced. + -b, --buckets=buckets [default: ] Comma separated list of bucket IDs to + service. Buckets that are not assigned to worker are + ignored. If not specified all buckets will be + serviced. - -c, --cleanup - Enable cleanup/pruning of no-longer assigned assets. + -c, --cleanup Enable cleanup/pruning of no-longer assigned assets. - -d, --uploads=uploads - (required) Data uploading directory (absolute path). + -d, --uploads=uploads (required) Data uploading directory (absolute path). - -e, --elasticSearchEndpoint=elasticSearchEndpoint - Elasticsearch endpoint (e.g.: http://some.com:8081). - Log level could be set using the ELASTIC_LOG_LEVEL environment variable. - Supported values: warn, error, debug, info. Default:debug + -e, --elasticSearchEndpoint=elasticSearchEndpoint Elasticsearch endpoint (e.g.: http://some.com:8081). + Log level could be set using the ELASTIC_LOG_LEVEL + environment variable. + Supported values: warn, error, debug, info. + Default:debug - -h, --help - show CLI help + -h, --help show CLI help - -i, --cleanupInterval=cleanupInterval - [default: 360] Interval between periodic cleanup actions (in minutes) + -i, --cleanupInterval=cleanupInterval [default: 360] Interval between periodic cleanup + actions (in minutes) - -i, --syncInterval=syncInterval - [default: 20] Interval between synchronizations (in minutes) + -i, --syncInterval=syncInterval [default: 20] Interval between synchronizations (in + minutes) - -k, --keyFile=keyFile - Path to key file to add to the keyring. + -k, --keyFile=keyFile Path to key file to add to the keyring. - -l, --logFilePath=logFilePath - Absolute path to the rolling log files. + -l, --logFilePath=logFilePath Absolute path to the rolling log files. - -m, --dev - Use development mode + -m, --dev Use development mode - -n, --logMaxFileNumber=logMaxFileNumber - [default: 7] Maximum rolling log files number. + -n, --logMaxFileNumber=logMaxFileNumber [default: 7] Maximum rolling log files number. - -o, --port=port - (required) Server port. + -o, --port=port (required) Server port. - -p, --password=password - Password to unlock keyfiles. Multiple passwords can be passed, to try against all files. If - not specified a single password can be set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can + be passed, to try against all files. If not specified + a single password can be set in ACCOUNT_PWD + environment variable. - -q, --storageSquidEndpoint=storageSquidEndpoint - (required) [default: http://localhost:4352/graphql] Storage Squid graphql server endpoint - (e.g.: http://some.com:4352/graphql) + -q, --storageSquidEndpoint=storageSquidEndpoint (required) [default: http://localhost:4352/graphql] + Storage Squid graphql server endpoint (e.g.: + http://some.com:4352/graphql) - -r, --syncWorkersNumber=syncWorkersNumber - [default: 20] Sync workers number (max async operations in progress). + -r, --syncWorkersNumber=syncWorkersNumber [default: 20] Sync workers number (max async + operations in progress). - -s, --sync - Enable data synchronization. + -s, --sync Enable data synchronization. - -t, --syncWorkersTimeout=syncWorkersTimeout - [default: 30] Asset downloading timeout for the syncronization (in minutes). + -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for the + syncronization (in minutes). - -u, --apiUrl=apiUrl - [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. + Mandatory in non-dev environment. - -w, --worker=worker - (required) Storage provider worker ID + -w, --worker=worker (required) Storage provider worker ID - -x, --logMaxFileSize=logMaxFileSize - [default: 50000000] Maximum rolling log files size in bytes. + -x, --logMaxFileSize=logMaxFileSize [default: 50000000] Maximum rolling log files size in + bytes. - -y, --accountUri=accountUri - Account URI (optional). If not specified a single key can be set in ACCOUNT_URI environment - variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key + can be set in ACCOUNT_URI environment variable. - -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) - [default: daily] Log files update frequency. + -z, --logFileChangeFrequency=(yearly|monthly|daily|hourly|none) [default: daily] Log files update frequency. - --elasticSearchIndexPrefix=elasticSearchIndexPrefix - Elasticsearch index prefix. Node ID will be appended to the prefix. Default: logs-colossus. - Can be passed through ELASTIC_INDEX_PREFIX environment variable. + --elasticSearchIndexPrefix=elasticSearchIndexPrefix Elasticsearch index prefix. Node ID will be appended + to the prefix. Default: logs-colossus. Can be passed + through ELASTIC_INDEX_PREFIX environment variable. - --elasticSearchPassword=elasticSearchPassword - Elasticsearch password for basic authentication. Can be passed through ELASTIC_PASSWORD - environment variable. + --elasticSearchPassword=elasticSearchPassword Elasticsearch password for basic authentication. Can + be passed through ELASTIC_PASSWORD environment + variable. - --elasticSearchUser=elasticSearchUser - Elasticsearch user for basic authentication. Can be passed through ELASTIC_USER environment - variable. + --elasticSearchUser=elasticSearchUser Elasticsearch user for basic authentication. Can be + passed through ELASTIC_USER environment variable. - --keyStore=keyStore - Path to a folder with multiple key files to load into keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into + keystore. - --maxBatchTxSize=maxBatchTxSize - [default: 20] Maximum number of `accept_pending_data_objects` in a batch transactions. + --maxBatchTxSize=maxBatchTxSize [default: 20] Maximum number of + `accept_pending_data_objects` in a batch + transactions. - --pendingFolder=pendingFolder - Directory to store pending files which are being uploaded (absolute path). - If not specified a subfolder under the uploads directory will be used. + --pendingFolder=pendingFolder Directory to store pending files which are being + uploaded (absolute path). + If not specified a subfolder under the uploads + directory will be used. - --syncRetryInterval=syncRetryInterval - [default: 3] Interval before retrying failed synchronization run (in minutes) + --syncRetryInterval=syncRetryInterval [default: 3] Interval before retrying failed + synchronization run (in minutes) - --tempFolder=tempFolder - Directory to store tempory files during sync (absolute path). - If not specified a subfolder under the uploads directory will be used. + --tempFolder=tempFolder Directory to store tempory files during sync + (absolute path). + If not specified a subfolder under the uploads + directory will be used. ``` -_See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/server.ts)_ +_See code: [src/commands/server.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/server.ts)_ ## `storage-node util:cleanup` @@ -993,42 +958,33 @@ USAGE OPTIONS -b, --bucketId=bucketId (required) The buckerId to sync prune/cleanup - - -d, --uploads=uploads (required) Data uploading directory (absolute - path). - + -d, --uploads=uploads (required) Data uploading directory (absolute path). -h, --help show CLI help - -k, --keyFile=keyFile Path to key file to add to the keyring. - -m, --dev Use development mode - -p, --cleanupWorkersNumber=cleanupWorkersNumber [default: 20] Cleanup/Pruning workers number - (max async operations in progress). + -p, --cleanupWorkersNumber=cleanupWorkersNumber [default: 20] Cleanup/Pruning workers number (max async operations in + progress). - -p, --password=password Password to unlock keyfiles. Multiple - passwords can be passed, to try against all - files. If not specified a single password can - be set in ACCOUNT_PWD environment variable. + -p, --password=password Password to unlock keyfiles. Multiple passwords can be passed, to try + against all files. If not specified a single password can be set in + ACCOUNT_PWD environment variable. - -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] - Storage Squid graphql server endpoint (e.g.: - http://some.com:4352/graphql) + -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] Storage Squid graphql server + endpoint (e.g.: http://some.com:4352/graphql) - -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API - URL. Mandatory in non-dev environment. + -u, --apiUrl=apiUrl [default: ws://localhost:9944] Runtime API URL. Mandatory in non-dev + environment. -w, --workerId=workerId (required) Storage node operator worker ID. - -y, --accountUri=accountUri Account URI (optional). If not specified a - single key can be set in ACCOUNT_URI - environment variable. + -y, --accountUri=accountUri Account URI (optional). If not specified a single key can be set in + ACCOUNT_URI environment variable. - --keyStore=keyStore Path to a folder with multiple key files to - load into keystore. + --keyStore=keyStore Path to a folder with multiple key files to load into keystore. ``` -_See code: [src/commands/util/cleanup.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/cleanup.ts)_ +_See code: [src/commands/util/cleanup.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/cleanup.ts)_ ## `storage-node util:fetch-bucket` @@ -1040,33 +996,28 @@ USAGE OPTIONS -b, --bucketId=bucketId (required) The buckerId to fetch - - -d, --uploads=uploads (required) Data uploading directory - (absolute path). - + -d, --uploads=uploads (required) Data uploading directory (absolute path). -h, --help show CLI help - -n, --syncWorkersNumber=syncWorkersNumber [default: 20] Sync workers number (max - async operations in progress). + -n, --syncWorkersNumber=syncWorkersNumber [default: 20] Sync workers number (max async operations in + progress). - -o, --dataSourceOperatorUrl=dataSourceOperatorUrl Storage node url base (e.g.: - http://some.com:3333) to get data from. + -o, --dataSourceOperatorUrl=dataSourceOperatorUrl Storage node url base (e.g.: http://some.com:3333) to get data + from. - -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] - Storage Squid graphql server endpoint - (e.g.: http://some.com:4352/graphql) + -q, --queryNodeEndpoint=queryNodeEndpoint [default: http://localhost:4352/graphql] Storage Squid graphql + server endpoint (e.g.: http://some.com:4352/graphql) - -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for - the syncronization (in minutes). + -t, --syncWorkersTimeout=syncWorkersTimeout [default: 30] Asset downloading timeout for the syncronization (in + minutes). - --tempFolder=tempFolder Directory to store tempory files during - sync and upload (absolute path). - ,Temporary directory (absolute path). If - not specified a subfolder under the uploads - directory will be used. + --tempFolder=tempFolder Directory to store tempory files during sync and upload (absolute + path). + ,Temporary directory (absolute path). If not specified a subfolder + under the uploads directory will be used. ``` -_See code: [src/commands/util/fetch-bucket.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/fetch-bucket.ts)_ +_See code: [src/commands/util/fetch-bucket.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/fetch-bucket.ts)_ ## `storage-node util:multihash` @@ -1081,7 +1032,24 @@ OPTIONS -h, --help show CLI help ``` -_See code: [src/commands/util/multihash.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/multihash.ts)_ +_See code: [src/commands/util/multihash.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/multihash.ts)_ + +## `storage-node util:search-archives` + +Searches for the archive file names given an archive trackfile and a list of data objects of interest. + +``` +USAGE + $ storage-node util:search-archives + +OPTIONS + -f, --archiveTrackfile=archiveTrackfile (required) Path to the archive trackfile (jsonl) + -j, --json Output as JSON + -n, --nameOnly Output only the archive names + -o, --dataObjects=dataObjects (required) List of the data object ids to look for (comma-separated) +``` + +_See code: [src/commands/util/search-archives.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/search-archives.ts)_ ## `storage-node util:verify-bag-id` @@ -1109,5 +1077,5 @@ OPTIONS - dynamic:member:4 ``` -_See code: [src/commands/util/verify-bag-id.ts](https://github.com/Joystream/joystream/blob/v4.3.0/src/commands/util/verify-bag-id.ts)_ +_See code: [src/commands/util/verify-bag-id.ts](https://github.com/Joystream/joystream/blob/v4.4.0/src/commands/util/verify-bag-id.ts)_ From 5e0e2e7fb9caae2b5bb79bb4d833cc332c5cf264 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Fri, 3 Jan 2025 12:16:26 +0100 Subject: [PATCH 6/9] ArchiveService: Sort batch by object ids --- storage-node/src/services/archive/ArchiveService.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage-node/src/services/archive/ArchiveService.ts b/storage-node/src/services/archive/ArchiveService.ts index e3da432c7c..4f3d9b2484 100644 --- a/storage-node/src/services/archive/ArchiveService.ts +++ b/storage-node/src/services/archive/ArchiveService.ts @@ -383,7 +383,7 @@ export class ArchiveService { by: 'ids', ids: unsyncedIdsBatch.map((id) => id.toString()), }) - const objectsBatch = await objectsBatchLoader.getAll() + const objectsBatch = (await objectsBatchLoader.getAll()).sort((a, b) => parseInt(b.id) - parseInt(a.id)) // Add new download tasks while the upload dir size limit allows while (objectsBatch.length) { const uploadDirectorySize = await this.getUploadDirSize() From 1e76ce8afd242894f2fc78cff1eb4814aed19a36 Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Fri, 3 Jan 2025 14:32:15 +0100 Subject: [PATCH 7/9] Simplify querying logic, remove DataObjectsLoader --- storage-node/CHANGELOG.md | 1 - .../src/services/archive/ArchiveService.ts | 16 +- storage-node/src/services/queryNode/api.ts | 153 ++++++++++-------- .../queryNode/queries/queries.graphql | 43 ++--- .../src/services/sync/cleanupService.ts | 8 +- .../src/services/sync/storageObligations.ts | 120 +++----------- .../src/services/sync/synchronizer.ts | 8 +- 7 files changed, 140 insertions(+), 209 deletions(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index 48c10aa1b5..fb63963500 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,7 +1,6 @@ ### 4.4.0 - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - - `DataObjectDetailsLoader` and `DataObjectIdsLoader` were implemented. They allow loading data objects / data object ids in batches using a connection query and avoid fetching redundant data from the GraphQL server. - Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory. - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). - A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. diff --git a/storage-node/src/services/archive/ArchiveService.ts b/storage-node/src/services/archive/ArchiveService.ts index 4f3d9b2484..0d1cdedced 100644 --- a/storage-node/src/services/archive/ArchiveService.ts +++ b/storage-node/src/services/archive/ArchiveService.ts @@ -13,7 +13,7 @@ import { OBJECTS_TRACKING_FILENAME, } from './tracking' import { QueryNodeApi } from '../queryNode/api' -import { DataObjectDetailsLoader, getStorageObligationsFromRuntime } from '../sync/storageObligations' +import { getDataObjectsByIDs, getStorageObligationsFromRuntime } from '../sync/storageObligations' import { getDownloadTasks } from '../sync/synchronizer' import sleep from 'sleep-promise' import { Logger } from 'winston' @@ -369,21 +369,21 @@ export class ArchiveService { public async performSync(): Promise { const model = await getStorageObligationsFromRuntime(this.queryNodeApi) - const assignedObjectsIds = await model.createAssignedObjectsIdsLoader(true).getAll() - const unsyncedIds = assignedObjectsIds + const unsyncedIds = (await model.getAssignedDataObjectIds(true)) .filter((id) => !this.objectTrackingService.isTracked(id)) .map((id) => parseInt(id)) + // Sort unsynced ids in ASCENDING order (oldest first) .sort((a, b) => a - b) this.logger.info(`Sync - new objects: ${unsyncedIds.length}`) // Sync objects in batches of 10_000 for (const unsyncedIdsBatch of _.chunk(unsyncedIds, 10_000)) { - const objectsBatchLoader = new DataObjectDetailsLoader(this.queryNodeApi, { - by: 'ids', - ids: unsyncedIdsBatch.map((id) => id.toString()), - }) - const objectsBatch = (await objectsBatchLoader.getAll()).sort((a, b) => parseInt(b.id) - parseInt(a.id)) + const objectIdsBatch = unsyncedIdsBatch.map((id) => id.toString()) + // Sort objectsBatch by ids in DESCENDING order (because we're using .pop() to get the next object) + const objectsBatch = (await getDataObjectsByIDs(this.queryNodeApi, objectIdsBatch)).sort( + (a, b) => parseInt(b.id) - parseInt(a.id) + ) // Add new download tasks while the upload dir size limit allows while (objectsBatch.length) { const uploadDirectorySize = await this.getUploadDirSize() diff --git a/storage-node/src/services/queryNode/api.ts b/storage-node/src/services/queryNode/api.ts index afafd65b2b..0ab027b609 100644 --- a/storage-node/src/services/queryNode/api.ts +++ b/storage-node/src/services/queryNode/api.ts @@ -8,12 +8,15 @@ import { DataObjectIdsByBagId, DataObjectIdsByBagIdQuery, DataObjectIdsByBagIdQueryVariables, - DataObjectsByBagsConnection, - DataObjectsByBagsConnectionQuery, - DataObjectsByBagsConnectionQueryVariables, - DataObjectsByIdsConnection, - DataObjectsByIdsConnectionQuery, - DataObjectsByIdsConnectionQueryVariables, + DataObjectIdsByBagIdsConnection, + DataObjectIdsByBagIdsConnectionQuery, + DataObjectIdsByBagIdsConnectionQueryVariables, + DataObjectDetailsByIds, + DataObjectDetailsByIdsQuery, + DataObjectIdsByIdsQueryVariables, + DataObjectIdsByIds, + DataObjectIdsByIdsQuery, + DataObjectDetailsByIdsQueryVariables, DataObjectsWithBagDetailsByIds, DataObjectsWithBagDetailsByIdsQuery, DataObjectsWithBagDetailsByIdsQueryVariables, @@ -48,18 +51,20 @@ import { StorageBucketIdsFragment, } from './generated/queries' import { Maybe } from './generated/schema' +import _ from 'lodash' /** * Defines query paging limits. */ -export const MAX_RESULTS_PER_QUERY = 1000 +export const MAX_INPUT_ARGS_SIZE = 1_000 +export const MAX_RESULTS_PER_QUERY = 10_000 type PaginationQueryVariables = { - limit: number - lastCursor?: Maybe + limit?: Maybe + after?: Maybe } -export type PaginationQueryResult = { +type PaginationQueryResult = { edges: { node: T }[] pageInfo: { hasNextPage: boolean @@ -134,10 +139,10 @@ export class QueryNodeApi { protected async multipleEntitiesWithPagination< NodeT, QueryT extends { [k: string]: PaginationQueryResult }, - CustomVariablesT extends Record + VariablesT extends PaginationQueryVariables >( query: DocumentNode, - variables: CustomVariablesT, + variables: VariablesT, resultKey: keyof QueryT, itemsPerPage = MAX_RESULTS_PER_QUERY ): Promise { @@ -145,10 +150,9 @@ export class QueryNodeApi { let results: NodeT[] = [] let lastCursor: string | undefined while (hasNextPage) { - const paginationVariables = { limit: itemsPerPage, cursor: lastCursor } + const paginationVariables: PaginationQueryVariables = { limit: itemsPerPage, after: lastCursor } const queryVariables = { ...variables, ...paginationVariables } - logger.debug(`Query - ${String(resultKey)}`) - const result = await this.apolloClient.query({ + const result = await this.apolloClient.query({ query, variables: queryVariables, }) @@ -255,73 +259,94 @@ export class QueryNodeApi { } /** - * Gets a page of data objects belonging to specified bags. + * Gets a list of all data object ids belonging to provided bags. * * @param bagIds - query filter: bag IDs + * @param isAccepted - query filter: value of isAccepted field (any if not specified) + * @param bagIdsBatchSize - max. size of a single batch of bagIds to query */ - public async getDataObjectsByBagsPage( + public async getDataObjectIdsByBagIds( bagIds: string[], - limit: number, - after: string | undefined, - includeDetails: IncludeDetails, - isAccepted?: boolean - ): Promise< - IncludeDetails extends true - ? PaginationQueryResult | null - : PaginationQueryResult<{ id: string }> | null - > { - return this.uniqueEntityQuery( - DataObjectsByBagsConnection, - { - bagIds: [...bagIds], - isAccepted, - limit, - after, - includeDetails: includeDetails, - }, - 'storageDataObjectsConnection' - ) + isAccepted?: boolean, + bagIdsBatchSize = MAX_INPUT_ARGS_SIZE + ): Promise { + let dataObjectIds: string[] = [] + for (const bagIdsBatch of _.chunk(bagIds, bagIdsBatchSize)) { + const dataObjectIdsBatch = await this.multipleEntitiesWithPagination< + { id: string }, + DataObjectIdsByBagIdsConnectionQuery, + DataObjectIdsByBagIdsConnectionQueryVariables + >( + DataObjectIdsByBagIdsConnection, + { + bagIds: bagIdsBatch, + isAccepted, + }, + 'storageDataObjectsConnection' + ) + dataObjectIds = dataObjectIds.concat(dataObjectIdsBatch.map(({ id }) => id)) + } + return dataObjectIds + } + + /** + * Gets a list of existing data object ids by the given list of data object ids. + * + * @param ids - query filter: data object ids + * @param batchSize - max. size of a single batch of ids to query + */ + public async getExistingDataObjectsIdsByIds(ids: string[], batchSize = MAX_INPUT_ARGS_SIZE): Promise { + let existingIds: string[] = [] + for (const idsBatch of _.chunk(ids, batchSize)) { + const existingIdsBatch = await this.multipleEntitiesQuery< + DataObjectIdsByIdsQuery, + DataObjectIdsByIdsQueryVariables + >(DataObjectIdsByIds, { ids: idsBatch }, 'storageDataObjects') + existingIds = existingIds.concat(existingIdsBatch.map(({ id }) => id)) + } + return existingIds } /** - * Gets a page of data objects by the given list of dataObject IDs. + * Gets a list of data object details by the given list of dataObject IDs. * * @param ids - query filter: data object ids + * @param batchSize - max. size of a single batch of ids to query */ - public async getDataObjectsByIdsPage( + public async getDataObjectsDetailsByIds( ids: string[], - limit: number, - after: string | undefined, - includeDetails: IncludeDetails, - isAccepted?: boolean - ): Promise< - IncludeDetails extends true - ? PaginationQueryResult | null - : PaginationQueryResult<{ id: string }> | null - > { - return this.uniqueEntityQuery( - DataObjectsByIdsConnection, - { - ids: [...ids], - isAccepted, - limit, - after, - includeDetails: includeDetails, - }, - 'storageDataObjectsConnection' - ) + batchSize = MAX_INPUT_ARGS_SIZE + ): Promise { + let dataObjects: DataObjectDetailsFragment[] = [] + for (const idsBatch of _.chunk(ids, batchSize)) { + const dataObjectsBatch = await this.multipleEntitiesQuery< + DataObjectDetailsByIdsQuery, + DataObjectDetailsByIdsQueryVariables + >(DataObjectDetailsByIds, { ids: idsBatch }, 'storageDataObjects') + dataObjects = dataObjects.concat(dataObjectsBatch) + } + return dataObjects } /** * Returns a list of data objects by ids, with their corresponding bag details * * @param ids - query filter: data object ids + * @param batchSize - max. size of a single batch of ids to query */ - public async getDataObjectsWithBagDetails(ids: string[]): Promise { - return this.multipleEntitiesQuery< - DataObjectsWithBagDetailsByIdsQuery, - DataObjectsWithBagDetailsByIdsQueryVariables - >(DataObjectsWithBagDetailsByIds, { ids: [...ids] }, 'storageDataObjects') + public async getDataObjectsWithBagDetails( + ids: string[], + batchSize = MAX_INPUT_ARGS_SIZE + ): Promise { + let dataObjects: DataObjectWithBagDetailsFragment[] = [] + for (const idsBatch of _.chunk(ids, batchSize)) { + const dataObjectsBatch = await this.multipleEntitiesQuery< + DataObjectsWithBagDetailsByIdsQuery, + DataObjectsWithBagDetailsByIdsQueryVariables + >(DataObjectsWithBagDetailsByIds, { ids: idsBatch }, 'storageDataObjects') + dataObjects = dataObjects.concat(dataObjectsBatch) + } + return dataObjects } /** diff --git a/storage-node/src/services/queryNode/queries/queries.graphql b/storage-node/src/services/queryNode/queries/queries.graphql index 32851a9051..43cd37d62a 100644 --- a/storage-node/src/services/queryNode/queries/queries.graphql +++ b/storage-node/src/services/queryNode/queries/queries.graphql @@ -83,13 +83,7 @@ fragment DataObjectWithBagDetails on StorageDataObject { } } -query dataObjectsByBagsConnection( - $bagIds: [String!] - $limit: Int - $after: String - $includeDetails: Boolean! - $isAccepted: Boolean -) { +query dataObjectIdsByBagIdsConnection($bagIds: [String!], $limit: Int, $after: String, $isAccepted: Boolean) { storageDataObjectsConnection( where: { storageBag: { id_in: $bagIds }, isAccepted_eq: $isAccepted } first: $limit @@ -99,7 +93,6 @@ query dataObjectsByBagsConnection( edges { node { id - ...DataObjectDetails @include(if: $includeDetails) } } pageInfo { @@ -110,30 +103,16 @@ query dataObjectsByBagsConnection( } } -query dataObjectsByIdsConnection( - $ids: [String!] - $limit: Int - $after: String - $includeDetails: Boolean! - $isAccepted: Boolean -) { - storageDataObjectsConnection( - where: { id_in: $ids, isAccepted_eq: $isAccepted } - first: $limit - after: $after - orderBy: id_ASC - ) { - edges { - node { - id - ...DataObjectDetails @include(if: $includeDetails) - } - } - pageInfo { - startCursor - endCursor - hasNextPage - } +# For verifying if data objects still exist +query dataObjectIdsByIds($ids: [String!]) { + storageDataObjects(where: { id_in: $ids }) { + id + } +} + +query dataObjectDetailsByIds($ids: [String!]) { + storageDataObjects(where: { id_in: $ids }) { + ...DataObjectDetails } } diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index 8f1eaa1d40..7494577047 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -5,7 +5,7 @@ import urljoin from 'url-join' import { getDataObjectIDs } from '../../services/caching/localDataObjects' import rootLogger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' -import { DataObjectIdsLoader, DataObligations, getStorageObligationsFromRuntime } from './storageObligations' +import { DataObligations, getStorageObligationsFromRuntime } from './storageObligations' import { DeleteLocalFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess' import { DataObjectWithBagDetailsFragment } from '../queryNode/generated/queries' @@ -79,13 +79,11 @@ export async function performCleanup( const model = await getStorageObligationsFromRuntime(qnApi, buckets) const storedObjectsIds = getDataObjectIDs() - const assignedObjectsLoader = model.createAssignedObjectsIdsLoader() - const assignedObjectIds = new Set(await assignedObjectsLoader.getAll()) + const assignedObjectIds = new Set(await model.getAssignedDataObjectIds()) const obsoleteObjectIds = new Set(storedObjectsIds.filter((id) => !assignedObjectIds.has(id))) // If objects are obsolete but still exist: They are "moved" objects - const movedObjectsLoader = new DataObjectIdsLoader(qnApi, { by: 'ids', ids: Array.from(obsoleteObjectIds) }) - const movedObjectIds = new Set(await movedObjectsLoader.getAll()) + const movedObjectIds = new Set(await qnApi.getExistingDataObjectsIdsByIds([...obsoleteObjectIds])) // If objects are obsolete and don't exist: They are "deleted objects" const deletedDataObjectIds = new Set([...obsoleteObjectIds].filter((id) => !movedObjectIds.has(id))) diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index c9369b9d42..1abd3806ca 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -1,7 +1,7 @@ import _ from 'lodash' import logger from '../logger' -import { MAX_RESULTS_PER_QUERY, PaginationQueryResult, QueryNodeApi } from '../queryNode/api' -import { DataObjectDetailsFragment, StorageBucketDetailsFragment } from '../queryNode/generated/queries' +import { MAX_RESULTS_PER_QUERY, QueryNodeApi } from '../queryNode/api' +import { StorageBucketDetailsFragment } from '../queryNode/generated/queries' import { ApiPromise } from '@polkadot/api' import { PalletStorageStorageBucketRecord } from '@polkadot/types/lookup' @@ -30,9 +30,9 @@ export type DataObligations = { bagOperatorsUrlsById: Map /** - * A function that returns a loader of all assigned data object ids + * A function that returns assigned data object ids */ - createAssignedObjectsIdsLoader(isAccepted?: boolean): DataObjectIdsLoader + getAssignedDataObjectIds(isAccepted?: boolean): Promise } /** @@ -95,93 +95,6 @@ export type DataObject = { size: number } -export abstract class LazyBatchLoader, MappedEntity> { - private endCursor: string | undefined - private _hasNextPage: boolean - private queryFn: (limit: number, after?: string) => Promise - - constructor(queryFn: (limit: number, after?: string) => Promise) { - this.queryFn = queryFn - this._hasNextPage = true - } - - public get hasNextPage(): boolean { - return this._hasNextPage - } - - abstract mapResults(results: QueryResult['edges'][number]['node'][]): Promise - - async nextBatch(size = 10_000): Promise { - if (!this._hasNextPage) { - return null - } - const result = await this.queryFn(size, this.endCursor) - if (!result) { - throw new Error('Connection query returned empty result') - } - - this.endCursor = result.pageInfo.endCursor || undefined - this._hasNextPage = result.pageInfo.hasNextPage - const mapped = await this.mapResults(result.edges.map((e) => e.node)) - return mapped - } - - async getAll(): Promise { - const results: MappedEntity[] = [] - while (this._hasNextPage) { - const batch = await this.nextBatch() - if (!batch) { - break - } - results.push(...batch) - } - - return results - } -} - -type DataObjectsLoadBy = { by: 'bagIds' | 'ids'; ids: string[]; isAccepted?: boolean } - -export class DataObjectDetailsLoader extends LazyBatchLoader< - PaginationQueryResult, - DataObject -> { - constructor(qnApi: QueryNodeApi, by: DataObjectsLoadBy) { - if (by.by === 'bagIds') { - super((limit, after) => qnApi.getDataObjectsByBagsPage(by.ids, limit, after, true, by.isAccepted)) - } else if (by.by === 'ids') { - super((limit, after) => qnApi.getDataObjectsByIdsPage(by.ids, limit, after, true, by.isAccepted)) - } else { - throw new Error(`Unknown "by" condition: ${JSON.stringify(by)}`) - } - } - - async mapResults(results: DataObjectDetailsFragment[]): Promise { - return results.map((dataObject) => ({ - id: dataObject.id, - size: parseInt(dataObject.size), - bagId: dataObject.storageBag.id, - ipfsHash: dataObject.ipfsHash, - })) - } -} - -export class DataObjectIdsLoader extends LazyBatchLoader, string> { - constructor(qnApi: QueryNodeApi, by: DataObjectsLoadBy) { - if (by.by === 'bagIds') { - super((limit, after) => qnApi.getDataObjectsByBagsPage(by.ids, limit, after, false, by.isAccepted)) - } else if (by.by === 'ids') { - super((limit, after) => qnApi.getDataObjectsByIdsPage(by.ids, limit, after, false, by.isAccepted)) - } else { - throw new Error(`Unknown "by" condition: ${JSON.stringify(by)}`) - } - } - - async mapResults(results: { id: string }[]): Promise { - return results.map(({ id }) => id) - } -} - /** * Get storage provider obligations like (assigned data objects) from the * runtime (Query Node). @@ -219,7 +132,7 @@ export async function getStorageObligationsFromRuntime( for (const bucket of storageBuckets) { if (!ownBuckets.has(bucket.id)) { if (ownOperatorUrls.has(bucket.operatorUrl)) { - logger.warn(`(sync) Skipping remote bucket ${bucket.id} - ${bucket.operatorUrl}`) + logger.warn(`Skipping remote bucket ${bucket.id} - ${bucket.operatorUrl}`) } else { bucketOperatorUrlById.set(bucket.id, bucket.operatorUrl) } @@ -244,8 +157,11 @@ export async function getStorageObligationsFromRuntime( bags, bagOperatorsUrlsById, bucketOperatorUrlById, - createAssignedObjectsIdsLoader: (isAccepted?: boolean) => - new DataObjectIdsLoader(qnApi, { by: 'bagIds', ids: bags.map((b) => b.id), isAccepted }), + getAssignedDataObjectIds: (isAccepted?: boolean) => + qnApi.getDataObjectIdsByBagIds( + bags.map((b) => b.id), + isAccepted + ), } return model @@ -286,6 +202,22 @@ async function getAllBuckets(api: QueryNodeApi): Promise { + return (await api.getDataObjectsDetailsByIds(dataObjectIds)).map((o) => ({ + id: o.id, + size: parseInt(o.size), + bagId: o.storageBag.id, + ipfsHash: o.ipfsHash, + })) +} + /** * Abstract object acquiring function for the QueryNode. It uses paging for * queries and gets data using record offset and limit (hardcoded to 1000). diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index 93118036bd..e97ef07c8c 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -3,8 +3,8 @@ import logger from '../../services/logger' import { QueryNodeApi } from '../queryNode/api' import { DataObject, - DataObjectDetailsLoader, DataObligations, + getDataObjectsByIDs, getStorageObligationsFromRuntime, } from './storageObligations' import { DownloadFileTask } from './tasks' @@ -53,8 +53,7 @@ export async function performSync( const model = await getStorageObligationsFromRuntime(qnApi, buckets) const storedObjectIds = getDataObjectIDs() - const assignedObjectIdsLoader = model.createAssignedObjectsIdsLoader(true) - const assignedObjectIds = new Set(await assignedObjectIdsLoader.getAll()) + const assignedObjectIds = new Set(await model.getAssignedDataObjectIds(true)) const unsyncedObjectIds = [...assignedObjectIds].filter((id) => !isDataObjectIdInCache(id)) const obsoleteObjectsNum = storedObjectIds.reduce((count, id) => (assignedObjectIds.has(id) ? count : count + 1), 0) @@ -69,8 +68,7 @@ export async function performSync( logger.debug(`Sync - started processing...`) let processed = 0 for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, 10_000)) { - const objectsLoader = new DataObjectDetailsLoader(qnApi, { by: 'ids', ids: unsyncedIdsBatch }) - const objectsBatch = await objectsLoader.getAll() + const objectsBatch = await getDataObjectsByIDs(qnApi, unsyncedIdsBatch) const syncTasks = await getDownloadTasks( model, objectsBatch, From 6fcd6a739fd3982b6538bb086107a8a949b5a08c Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Wed, 8 Jan 2025 17:14:46 +0100 Subject: [PATCH 8/9] Configurable cleanup/sync batch size and workersNum, limit async requests during cleanup, improve logging --- storage-node/CHANGELOG.md | 6 +- storage-node/package.json | 1 + storage-node/src/commands/server.ts | 32 +++++- storage-node/src/commands/util/cleanup.ts | 14 ++- .../src/commands/util/fetch-bucket.ts | 5 + .../src/services/sync/cleanupService.ts | 101 +++++++++++++----- .../src/services/sync/storageObligations.ts | 2 +- .../src/services/sync/synchronizer.ts | 6 +- yarn.lock | 2 +- 9 files changed, 133 insertions(+), 36 deletions(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index fb63963500..686d991930 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -1,10 +1,12 @@ ### 4.4.0 - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - - Sync and cleanup services now process tasks in batches of `10_000` to avoid overflowing the memory. + - Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory. - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). + - Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000` + - Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup. - A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. -- Improved logging during cleanup. +- Improved logging during sync and cleanup. ### 4.3.0 diff --git a/storage-node/package.json b/storage-node/package.json index cf13a67859..fb5127f42d 100644 --- a/storage-node/package.json +++ b/storage-node/package.json @@ -54,6 +54,7 @@ "multihashes": "^4.0.2", "node-cache": "^5.1.2", "openapi-editor": "^0.3.0", + "p-limit": "^3", "promise-timeout": "^1.3.0", "proper-lockfile": "^4.1.2", "react": "^18.2.0", diff --git a/storage-node/src/commands/server.ts b/storage-node/src/commands/server.ts index 6f61b38fc9..62be3c75cc 100644 --- a/storage-node/src/commands/server.ts +++ b/storage-node/src/commands/server.ts @@ -78,16 +78,29 @@ export default class Server extends ApiCommandBase { description: 'Interval before retrying failed synchronization run (in minutes)', default: 3, }), + syncBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch during synchronization.', + default: 10_000, + }), cleanup: flags.boolean({ char: 'c', description: 'Enable cleanup/pruning of no-longer assigned assets.', default: false, }), + cleanupBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch during cleanup.', + default: 10_000, + }), cleanupInterval: flags.integer({ char: 'i', description: 'Interval between periodic cleanup actions (in minutes)', default: 360, }), + cleanupWorkersNumber: flags.integer({ + required: false, + description: 'Cleanup workers number (max async operations in progress).', + default: 100, + }), storageSquidEndpoint: flags.string({ char: 'q', required: true, @@ -299,6 +312,7 @@ Supported values: warn, error, debug, info. Default:debug`, flags.syncWorkersTimeout, flags.syncInterval, flags.syncRetryInterval, + flags.syncBatchSize, X_HOST_ID ), 0 @@ -319,8 +333,9 @@ Supported values: warn, error, debug, info. Default:debug`, api, qnApi, flags.uploads, - flags.syncWorkersNumber, + flags.cleanupWorkersNumber, flags.cleanupInterval, + flags.cleanupBatchSize, X_HOST_ID ), 0 @@ -397,6 +412,7 @@ async function runSyncWithInterval( syncWorkersTimeout: number, syncIntervalMinutes: number, syncRetryIntervalMinutes: number, + syncBatchSize: number, hostId: string ) { const sleepInterval = syncIntervalMinutes * 60 * 1000 @@ -404,7 +420,16 @@ async function runSyncWithInterval( while (true) { try { logger.info(`Resume syncing....`) - await performSync(buckets, syncWorkersNumber, syncWorkersTimeout, qnApi, uploadsDirectory, tempDirectory, hostId) + await performSync( + buckets, + syncWorkersNumber, + syncWorkersTimeout, + qnApi, + uploadsDirectory, + tempDirectory, + syncBatchSize, + hostId + ) logger.info(`Sync run complete. Next run in ${syncIntervalMinutes} minute(s).`) await sleep(sleepInterval) } catch (err) { @@ -434,6 +459,7 @@ async function runCleanupWithInterval( uploadsDirectory: string, syncWorkersNumber: number, cleanupIntervalMinutes: number, + cleanupBatchSize: number, hostId: string ) { const sleepInterval = cleanupIntervalMinutes * 60 * 1000 @@ -442,7 +468,7 @@ async function runCleanupWithInterval( await sleep(sleepInterval) try { logger.info(`Resume cleanup....`) - await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, hostId) + await performCleanup(buckets, syncWorkersNumber, api, qnApi, uploadsDirectory, cleanupBatchSize, hostId) } catch (err) { logger.error(`Critical cleanup error: ${err}`) } diff --git a/storage-node/src/commands/util/cleanup.ts b/storage-node/src/commands/util/cleanup.ts index 6ebde5d051..d1b5cffba7 100644 --- a/storage-node/src/commands/util/cleanup.ts +++ b/storage-node/src/commands/util/cleanup.ts @@ -27,6 +27,10 @@ export default class Cleanup extends ApiCommandBase { required: true, description: 'The buckerId to sync prune/cleanup', }), + cleanupBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch during cleanup.', + default: 10_000, + }), cleanupWorkersNumber: flags.integer({ char: 'p', required: false, @@ -57,7 +61,15 @@ export default class Cleanup extends ApiCommandBase { logger.info('Cleanup...') try { - await performCleanup([bucketId], flags.cleanupWorkersNumber, api, qnApi, flags.uploads, '') + await performCleanup( + [bucketId], + flags.cleanupWorkersNumber, + api, + qnApi, + flags.uploads, + flags.cleanupBatchSize, + '' + ) } catch (err) { logger.error(err) logger.error(stringify(err)) diff --git a/storage-node/src/commands/util/fetch-bucket.ts b/storage-node/src/commands/util/fetch-bucket.ts index 8be873cd7e..1bfc097bb4 100644 --- a/storage-node/src/commands/util/fetch-bucket.ts +++ b/storage-node/src/commands/util/fetch-bucket.ts @@ -36,6 +36,10 @@ export default class FetchBucket extends Command { description: 'Asset downloading timeout for the syncronization (in minutes).', default: 30, }), + syncBatchSize: flags.integer({ + description: 'Maximum number of objects to process in a single batch.', + default: 10_000, + }), queryNodeEndpoint: flags.string({ char: 'q', required: false, @@ -74,6 +78,7 @@ export default class FetchBucket extends Command { qnApi, flags.uploads, flags.tempFolder ? flags.tempFolder : path.join(flags.uploads, 'temp'), + flags.syncBatchSize, '', flags.dataSourceOperatorUrl ) diff --git a/storage-node/src/services/sync/cleanupService.ts b/storage-node/src/services/sync/cleanupService.ts index 7494577047..683f350881 100644 --- a/storage-node/src/services/sync/cleanupService.ts +++ b/storage-node/src/services/sync/cleanupService.ts @@ -10,6 +10,7 @@ import { DeleteLocalFileTask } from './tasks' import { TaskProcessorSpawner, WorkingStack } from '../processing/workingProcess' import { DataObjectWithBagDetailsFragment } from '../queryNode/generated/queries' import { Logger } from 'winston' +import pLimit from 'p-limit' /** * The maximum allowed threshold by which the QN processor can lag behind @@ -41,14 +42,13 @@ export const MINIMUM_REPLICATION_THRESHOLD = parseInt(process.env.CLEANUP_MIN_RE * - If the asset being pruned from this storage-node is currently being downloaded * by some external actors, then the cleanup action for this asset would be postponed * - * @param api - (optional) runtime API promise - * @param workerId - current storage provider ID - * @param buckets - Selected storage buckets + * @param buckets - selected storage buckets * @param asyncWorkersNumber - maximum parallel cleanups number - * @param asyncWorkersTimeout - downloading asset timeout + * @param api - runtime API promise * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from - * @param tempDirectory - local directory for temporary data uploading + * @param batchSize - max. number of data objects to process in a single batch + * @param hostId */ export async function performCleanup( buckets: string[], @@ -56,6 +56,7 @@ export async function performCleanup( api: ApiPromise, qnApi: QueryNodeApi, uploadDirectory: string, + batchSize: number, hostId: string ): Promise { const logger = rootLogger.child({ label: 'Cleanup' }) @@ -98,11 +99,11 @@ export async function performCleanup( const workingStack = new WorkingStack() const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - // Execute deleted objects removal tasks in batches of 10_000 + // Execute deleted objects removal tasks in batches if (deletedDataObjectIds.size) { let deletedProcessed = 0 logger.info(`removing ${deletedDataObjectIds.size} deleted objects...`) - for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], 10_000)) { + for (let deletedObjectsIdsBatch of _.chunk([...deletedDataObjectIds], batchSize)) { // Confirm whether the objects were actually deleted by fetching the related deletion events const dataObjectDeletedEvents = await qnApi.getDataObjectDeletedEvents(deletedObjectsIdsBatch) const confirmedIds = new Set(dataObjectDeletedEvents.map((e) => e.data.dataObjectId)) @@ -120,26 +121,35 @@ export async function performCleanup( deletedProcessed += deletedObjectsIdsBatch.length logger.debug(`${deletedProcessed} / ${deletedDataObjectIds.size} deleted objects processed...`) } + logger.info(`${deletedProcessed}/${deletedDataObjectIds.size} deleted data objects successfully cleared.`) } - // Execute moved objects removal tasks in batches of 10_000 + // Execute moved objects removal tasks in batches if (movedObjectIds.size) { let movedProcessed = 0 logger.info(`removing ${movedObjectIds.size} moved objects...`) - for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], 10_000)) { + for (const movedObjectsIdsBatch of _.chunk([...movedObjectIds], batchSize)) { const movedDataObjectsBatch = await qnApi.getDataObjectsWithBagDetails(movedObjectsIdsBatch) const deletionTasksOfMovedDataObjects = await getDeletionTasksFromMovedDataObjects( logger, uploadDirectory, model, movedDataObjectsBatch, + asyncWorkersNumber, hostId ) + const numberOfTasks = deletionTasksOfMovedDataObjects.length + if (numberOfTasks !== movedObjectsIdsBatch.length) { + logger.warn( + `Only ${numberOfTasks} / ${movedObjectsIdsBatch.length} moved objects will be removed in this batch...` + ) + } await workingStack.add(deletionTasksOfMovedDataObjects) await processSpawner.process() - movedProcessed += movedDataObjectsBatch.length + movedProcessed += numberOfTasks logger.debug(`${movedProcessed} / ${movedObjectIds.size} moved objects processed...`) } + logger.info(`${movedProcessed}/${movedObjectIds.size} moved data objects successfully cleared.`) } } else { logger.info('No objects to prune, skipping...') @@ -155,6 +165,7 @@ export async function performCleanup( * @param uploadDirectory - local directory for data uploading * @param dataObligations - defines the current data obligations for the node * @param movedDataObjects- obsolete (no longer assigned) data objects that has been moved to other buckets + * @param asyncWorkersNumber - number of async workers assigned for cleanup tasks * @param hostId - host id of the current node */ async function getDeletionTasksFromMovedDataObjects( @@ -162,33 +173,71 @@ async function getDeletionTasksFromMovedDataObjects( uploadDirectory: string, dataObligations: DataObligations, movedDataObjects: DataObjectWithBagDetailsFragment[], + asyncWorkersNumber: number, hostId: string ): Promise { const timeoutMs = 60 * 1000 // 1 minute since it's only a HEAD request const deletionTasks: DeleteLocalFileTask[] = [] const { bucketOperatorUrlById } = dataObligations - await Promise.allSettled( - movedDataObjects.map(async (movedDataObject) => { - let dataObjectReplicationCount = 0 - - for (const { storageBucket } of movedDataObject.storageBag.storageBuckets) { - const nodeUrl = bucketOperatorUrlById.get(storageBucket.id) - if (nodeUrl) { - const fileUrl = urljoin(nodeUrl, 'api/v1/files', movedDataObject.id) + const limit = pLimit(asyncWorkersNumber) + let checkedObjects = 0 + const checkReplicationThreshold = async (movedDataObject: DataObjectWithBagDetailsFragment) => { + ++checkedObjects + if (checkedObjects % asyncWorkersNumber === 0) { + logger.debug( + `Checking replication: ${checkedObjects}/${movedDataObjects.length} (active: ${limit.activeCount}, pending: ${limit.pendingCount})` + ) + } + + const externaBucketEndpoints = movedDataObject.storageBag.storageBuckets + .map(({ storageBucket: { id } }) => { + return bucketOperatorUrlById.get(id) + }) + .filter((url): url is string => !!url) + let lastErr = '' + let successes = 0 + let failures = 0 + + if (externaBucketEndpoints.length >= MINIMUM_REPLICATION_THRESHOLD) { + for (const nodeUrl of externaBucketEndpoints) { + const fileUrl = urljoin(nodeUrl, 'api/v1/files', movedDataObject.id) + try { await superagent.head(fileUrl).timeout(timeoutMs).set('X-COLOSSUS-HOST-ID', hostId) - dataObjectReplicationCount++ + ++successes + } catch (e) { + ++failures + lastErr = e instanceof Error ? e.message : e.toString() + } + if (successes >= MINIMUM_REPLICATION_THRESHOLD) { + break } } + } - if (dataObjectReplicationCount < MINIMUM_REPLICATION_THRESHOLD) { - logger.warn(`data object replication threshold unmet - file deletion canceled: ${movedDataObject.id}`) - return - } + if (successes < MINIMUM_REPLICATION_THRESHOLD) { + logger.debug( + `Replication threshold unmet for object ${movedDataObject.id} ` + + `(buckets: ${externaBucketEndpoints.length}, successes: ${successes}, failures: ${failures}). ` + + (lastErr ? `Last error: ${lastErr}. ` : '') + + `File deletion canceled...` + ) + return + } + + deletionTasks.push(new DeleteLocalFileTask(uploadDirectory, movedDataObject.id)) + } + + await Promise.all(movedDataObjects.map((movedDataObject) => limit(() => checkReplicationThreshold(movedDataObject)))) + + const failedCount = movedDataObjects.length - deletionTasks.length + if (failedCount > 0) { + logger.warn( + `Replication threshold was unmet or couldn't be verified for ${failedCount} / ${movedDataObjects.length} objects in the current batch.` + ) + } - deletionTasks.push(new DeleteLocalFileTask(uploadDirectory, movedDataObject.id)) - }) - ) + logger.debug('Checking replication: Done') return deletionTasks } diff --git a/storage-node/src/services/sync/storageObligations.ts b/storage-node/src/services/sync/storageObligations.ts index 1abd3806ca..165f8a30fe 100644 --- a/storage-node/src/services/sync/storageObligations.ts +++ b/storage-node/src/services/sync/storageObligations.ts @@ -194,7 +194,7 @@ async function getAllBuckets(api: QueryNodeApi): Promise { const idsPart = ids.slice(offset, offset + limit) if (!_.isEmpty(idsPart)) { - logger.debug(`Sync - getting all storage buckets: offset = ${offset}, limit = ${limit}`) + logger.debug(`Getting all storage buckets: offset = ${offset}, limit = ${limit}`) return await api.getStorageBucketDetails(idsPart) } else { return false diff --git a/storage-node/src/services/sync/synchronizer.ts b/storage-node/src/services/sync/synchronizer.ts index e97ef07c8c..2637d7acc7 100644 --- a/storage-node/src/services/sync/synchronizer.ts +++ b/storage-node/src/services/sync/synchronizer.ts @@ -35,6 +35,7 @@ export const PendingDirName = 'pending' * @param qnApi - Query Node API * @param uploadDirectory - local directory to get file names from * @param tempDirectory - local directory for temporary data uploading + * @param batchSize - maximum number of data objects to process in a single batch * @param selectedOperatorUrl - (optional) defines the data source URL. If not set * the source URL is resolved for each data object separately using the Query * Node information about the storage providers. @@ -46,6 +47,7 @@ export async function performSync( qnApi: QueryNodeApi, uploadDirectory: string, tempDirectory: string, + batchSize: number, hostId: string, selectedOperatorUrl?: string ): Promise { @@ -64,10 +66,10 @@ export async function performSync( const workingStack = new WorkingStack() const processSpawner = new TaskProcessorSpawner(workingStack, asyncWorkersNumber) - // Process unsynced objects in batches od 10_000 + // Process unsynced objects in batches logger.debug(`Sync - started processing...`) let processed = 0 - for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, 10_000)) { + for (const unsyncedIdsBatch of _.chunk(unsyncedObjectIds, batchSize)) { const objectsBatch = await getDataObjectsByIDs(qnApi, unsyncedIdsBatch) const syncTasks = await getDownloadTasks( model, diff --git a/yarn.lock b/yarn.lock index a7d5d565ac..6e9cdd6401 100644 --- a/yarn.lock +++ b/yarn.lock @@ -18274,7 +18274,7 @@ p-is-promise@^2.0.0: resolved "https://registry.npmjs.org/p-is-promise/-/p-is-promise-2.1.0.tgz" integrity sha512-Y3W0wlRPK8ZMRbNq97l4M5otioeA5lm1z7bkNkxCka8HSPjR0xRWmpCmc9utiaLP9Jb1eD8BgeIxTW4AIF45Pg== -p-limit@3.1.0, p-limit@^3.0.2: +p-limit@3.1.0, p-limit@^3, p-limit@^3.0.2: version "3.1.0" resolved "https://registry.npmjs.org/p-limit/-/p-limit-3.1.0.tgz" integrity sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ== From 03ff3ba74bdc8e24d13ee4347b1be34a74c0e6ed Mon Sep 17 00:00:00 2001 From: Lezek123 Date: Wed, 8 Jan 2025 17:48:49 +0100 Subject: [PATCH 9/9] Re-trigger github workflows --- storage-node/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storage-node/CHANGELOG.md b/storage-node/CHANGELOG.md index 686d991930..193e9b6a88 100644 --- a/storage-node/CHANGELOG.md +++ b/storage-node/CHANGELOG.md @@ -3,7 +3,7 @@ - **Optimizations:** The way data objects / data object ids are queried and processed during sync and cleanup has been optimized: - Sync and cleanup services now process tasks in batches of configurable size (`--syncBatchSize`, `--cleanupBatchSize`) to avoid overflowing the memory. - Synchronous operations like `sort` or `filter` on larger arrays of data objects have been optimized (for example, by replacing `.filter(Array.includes(...))` with `.filter(Set.has(...))`). - - Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000` + - Enforced a limit of max. results per single GraphQL query to `10,000` and max input arguments per query to `1,000`. - Added `--cleanupWorkersNumber` flag to limit the number of concurrent async requests during cleanup. - A safety mechanism was added to avoid removing "deleted" objects for which a related `DataObjectDeleted` event cannot be found in storage squid. - Improved logging during sync and cleanup.