Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "8.2.2",
"version": "8.2.5",
"description": "OTNode V8",
"main": "index.js",
"type": "module",
Expand Down
24 changes: 22 additions & 2 deletions src/commands/cleaners/operation-id-cleaner-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BYTES_IN_KILOBYTE,
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
OPERATION_ID_STATUS,
COMMAND_PRIORITY,
} from '../../constants/constants.js';
Expand All @@ -23,14 +24,33 @@ class OperationIdCleanerCommand extends Command {
* @param command
*/
async execute() {
let memoryBytes = 0;
let fileBytes = 0;
try {
memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes();
} catch (error) {
this.logger.warn(`Unable to read memory cache footprint: ${error.message}`);
}
try {
fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes();
} catch (error) {
this.logger.warn(`Unable to read file cache footprint: ${error.message}`);
}
const bytesInMegabyte = 1024 * 1024;
this.logger.debug(
`Operation cache footprint before cleanup: memory=${(
memoryBytes / bytesInMegabyte
).toFixed(2)}MB, files=${(fileBytes / bytesInMegabyte).toFixed(2)}MB`,
);

this.logger.debug('Starting command for removal of expired cache files');
const timeToBeDeleted = Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS;
await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [
OPERATION_ID_STATUS.COMPLETED,
OPERATION_ID_STATUS.FAILED,
]);
let removed = await this.operationIdService.removeExpiredOperationIdMemoryCache(
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
);
if (removed) {
this.logger.debug(
Expand Down Expand Up @@ -68,7 +88,7 @@ class OperationIdCleanerCommand extends Command {
default(map) {
const command = {
name: 'operationIdCleanerCommand',
period: OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
period: OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
data: {},
transactional: false,
priority: COMMAND_PRIORITY.LOWEST,
Expand Down
17 changes: 10 additions & 7 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class PublishFinalizationCommand extends Command {
const { id, publishOperationId, merkleRoot, byteSize } = eventData;
const { blockchain, contractAddress } = event;
const operationId = this.operationIdService.generateId();
const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START,
operationId,
Expand Down Expand Up @@ -70,8 +72,10 @@ class PublishFinalizationCommand extends Command {
cachedMerkleRoot = result.merkleRoot;
assertion = result.assertion;
publisherPeerId = result.remotePeerId;
} catch (error) {
this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive
} catch (_error) {
this.logger.warn(
`[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId})`,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
Expand All @@ -81,8 +85,6 @@ class PublishFinalizationCommand extends Command {
return Command.empty();
}

const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

try {
await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual);
} catch (e) {
Expand Down Expand Up @@ -185,23 +187,24 @@ class PublishFinalizationCommand extends Command {

async readWithRetries(publishOperationId) {
let attempt = 0;
const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId);

while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
try {
const datasetPath =
this.fileService.getPendingStorageDocumentPath(publishOperationId);
// eslint-disable-next-line no-await-in-loop
const cachedData = await this.fileService.readFile(datasetPath, true);
return cachedData;
} catch (error) {
attempt += 1;

// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
}
}
this.logger.warn(
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
// TODO: Mark this operation as failed
throw new Error('Failed to read cached publish data');
}
Expand Down
5 changes: 5 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,11 @@ export const EXPECTED_TRANSACTION_ERRORS = {
* operation id command cleanup interval time 24h
*/
export const OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
/**
* @constant {number} OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS -
* operation id memory cleanup interval time 1h
*/
export const OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS = 60 * 60 * 1000;
/**
* @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time
* finalized commands command cleanup interval time 24h
Expand Down
35 changes: 34 additions & 1 deletion src/controllers/http-api/v1/publish-http-api-controller-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
OPERATION_STATUS,
LOCAL_STORE_TYPES,
COMMAND_PRIORITY,
PUBLISH_MIN_NUM_OF_NODE_REPLICATIONS,
} from '../../../constants/constants.js';

class PublishController extends BaseController {
Expand All @@ -16,6 +17,7 @@ class PublishController extends BaseController {
this.repositoryModuleManager = ctx.repositoryModuleManager;
this.pendingStorageService = ctx.pendingStorageService;
this.networkModuleManager = ctx.networkModuleManager;
this.blockchainModuleManager = ctx.blockchainModuleManager;
}

async handleRequest(req, res) {
Expand Down Expand Up @@ -62,6 +64,37 @@ class PublishController extends BaseController {
datasetRoot,
});

let effectiveMinReplications = minimumNumberOfNodeReplications;
let chainMinNumber = null;
try {
const chainMin = await this.blockchainModuleManager.getMinimumRequiredSignatures(
blockchain,
);
chainMinNumber = Number(chainMin);
} catch (err) {
this.logger.warn(
`Failed to fetch on-chain minimumRequiredSignatures for ${blockchain}: ${err.message}`,
);
}

const userMinNumber = Number(effectiveMinReplications);
const resolvedUserMin =
!Number.isNaN(userMinNumber) && userMinNumber > 0
? userMinNumber
: PUBLISH_MIN_NUM_OF_NODE_REPLICATIONS;

if (!Number.isNaN(chainMinNumber) && chainMinNumber > 0) {
effectiveMinReplications = Math.max(chainMinNumber, resolvedUserMin);
} else {
effectiveMinReplications = resolvedUserMin;
}

if (effectiveMinReplications === 0) {
this.logger.error(
`Effective minimum replications resolved to 0 for operationId: ${operationId}, blockchain: ${blockchain}. This should never happen.`,
);
}

const publisherNodePeerId = this.networkModuleManager.getPeerId().toB58String();
await this.pendingStorageService.cacheDataset(
operationId,
Expand All @@ -80,7 +113,7 @@ class PublishController extends BaseController {
blockchain,
operationId,
storeType: LOCAL_STORE_TYPES.TRIPLE,
minimumNumberOfNodeReplications,
minimumNumberOfNodeReplications: effectiveMinReplications,
},
transactional: false,
priority: COMMAND_PRIORITY.HIGHEST,
Expand Down
4 changes: 4 additions & 0 deletions src/modules/blockchain/blockchain-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class BlockchainModuleManager extends BaseModuleManager {
return this.callImplementationFunction(blockchain, 'getMaximumStake');
}

async getMinimumRequiredSignatures(blockchain) {
return this.callImplementationFunction(blockchain, 'getMinimumRequiredSignatures');
}

async getLatestBlock(blockchain) {
return this.callImplementationFunction(blockchain, 'getLatestBlock');
}
Expand Down
9 changes: 9 additions & 0 deletions src/modules/blockchain/implementation/web3-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,15 @@ class Web3Service {
return Number(ethers.utils.formatEther(maximumStake));
}

async getMinimumRequiredSignatures() {
return this.callContractFunction(
this.contracts.ParametersStorage,
'minimumRequiredSignatures',
[],
CONTRACTS.PARAMETERS_STORAGE,
);
}

async getShardingTableHead() {
return this.callContractFunction(this.contracts.ShardingTableStorage, 'head', []);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,24 @@ class OtBlazegraph extends OtTripleStore {
}

async queryVoid(repository, query, timeout) {
return axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
try {
return await axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
} catch (error) {
const status = error?.response?.status;
const dataSnippet =
typeof error?.response?.data === 'string' ? error.response.data.slice(0, 200) : '';
this.logger.error(
`[OtBlazegraph.queryVoid] Update failed for ${repository} (status: ${status}): ${
error.message
}${dataSnippet ? ` | data: ${dataSnippet}` : ''}`,
);
throw error;
}
}

async deleteRepository(repository) {
Expand Down
27 changes: 27 additions & 0 deletions src/service/operation-id-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,33 @@ class OperationIdService {
delete this.memoryCachedHandlersData[operationId];
}

getOperationIdMemoryCacheSizeBytes() {
let total = 0;
for (const operationId in this.memoryCachedHandlersData) {
const { data } = this.memoryCachedHandlersData[operationId];
total += Buffer.from(JSON.stringify(data)).byteLength;
}
return total;
}

async getOperationIdFileCacheSizeBytes() {
const cacheFolderPath = this.fileService.getOperationIdCachePath();
const cacheFolderExists = await this.fileService.pathExists(cacheFolderPath);
if (!cacheFolderExists) return 0;

const fileList = await this.fileService.readDirectory(cacheFolderPath);
const sizeResults = await Promise.allSettled(
fileList.map((fileName) =>
this.fileService
.stat(path.join(cacheFolderPath, fileName))
.then((stats) => stats.size),
),
);
return sizeResults
.filter((res) => res.status === 'fulfilled')
.reduce((acc, res) => acc + res.value, 0);
}

async removeExpiredOperationIdMemoryCache(expiredTimeout) {
const now = Date.now();
let deleted = 0;
Expand Down
24 changes: 17 additions & 7 deletions src/service/operation-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,31 @@ class OperationService {
return operationIdStatuses;
}

async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) {
async markOperationAsCompleted(
operationId,
blockchain,
responseData,
endStatuses,
options = {},
) {
const { reuseExistingCache = false } = options;
this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`);

await this.repositoryModuleManager.updateOperationStatus(
this.operationName,
operationId,
OPERATION_STATUS.COMPLETED,
);

if (responseData === null) {
await this.operationIdService.removeOperationIdCache(operationId);
} else {
await this.operationIdService.cacheOperationIdDataToMemory(operationId, responseData);
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
if (!reuseExistingCache) {
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
}
}

await this.repositoryModuleManager.updateOperationStatus(
this.operationName,
operationId,
OPERATION_STATUS.COMPLETED,
);
for (let i = 0; i < endStatuses.length; i += 1) {
const status = endStatuses[i];
const response = {
Expand Down
Loading
Loading