Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "8.2.2",
"version": "8.2.5",
"description": "OTNode V8",
"main": "index.js",
"type": "module",
Expand Down
24 changes: 22 additions & 2 deletions src/commands/cleaners/operation-id-cleaner-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BYTES_IN_KILOBYTE,
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
OPERATION_ID_STATUS,
COMMAND_PRIORITY,
} from '../../constants/constants.js';
Expand All @@ -23,14 +24,33 @@ class OperationIdCleanerCommand extends Command {
* @param command
*/
async execute() {
let memoryBytes = 0;
let fileBytes = 0;
try {
memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes();
} catch (error) {
this.logger.warn(`Unable to read memory cache footprint: ${error.message}`);
}
try {
fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes();
} catch (error) {
this.logger.warn(`Unable to read file cache footprint: ${error.message}`);
}
const bytesInMegabyte = 1024 * 1024;
this.logger.debug(
`Operation cache footprint before cleanup: memory=${(
memoryBytes / bytesInMegabyte
).toFixed(2)}MB, files=${(fileBytes / bytesInMegabyte).toFixed(2)}MB`,
);

this.logger.debug('Starting command for removal of expired cache files');
const timeToBeDeleted = Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS;
await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [
OPERATION_ID_STATUS.COMPLETED,
OPERATION_ID_STATUS.FAILED,
]);
let removed = await this.operationIdService.removeExpiredOperationIdMemoryCache(
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
);
if (removed) {
this.logger.debug(
Expand Down Expand Up @@ -68,7 +88,7 @@ class OperationIdCleanerCommand extends Command {
default(map) {
const command = {
name: 'operationIdCleanerCommand',
period: OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
period: OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
data: {},
transactional: false,
priority: COMMAND_PRIORITY.LOWEST,
Expand Down
17 changes: 10 additions & 7 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class PublishFinalizationCommand extends Command {
const { id, publishOperationId, merkleRoot, byteSize } = eventData;
const { blockchain, contractAddress } = event;
const operationId = this.operationIdService.generateId();
const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START,
operationId,
Expand Down Expand Up @@ -70,8 +72,10 @@ class PublishFinalizationCommand extends Command {
cachedMerkleRoot = result.merkleRoot;
assertion = result.assertion;
publisherPeerId = result.remotePeerId;
} catch (error) {
this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive
} catch (_error) {
this.logger.warn(
`[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId})`,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
Expand All @@ -81,8 +85,6 @@ class PublishFinalizationCommand extends Command {
return Command.empty();
}

const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

try {
await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual);
} catch (e) {
Expand Down Expand Up @@ -185,23 +187,24 @@ class PublishFinalizationCommand extends Command {

async readWithRetries(publishOperationId) {
let attempt = 0;
const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId);

while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
try {
const datasetPath =
this.fileService.getPendingStorageDocumentPath(publishOperationId);
// eslint-disable-next-line no-await-in-loop
const cachedData = await this.fileService.readFile(datasetPath, true);
return cachedData;
} catch (error) {
attempt += 1;

// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
}
}
this.logger.warn(
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
// TODO: Mark this operation as failed
throw new Error('Failed to read cached publish data');
}
Expand Down
5 changes: 5 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,11 @@ export const EXPECTED_TRANSACTION_ERRORS = {
* operation id command cleanup interval time 24h
*/
export const OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
/**
* @constant {number} OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS -
* operation id memory cleanup interval time 1h
*/
export const OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS = 60 * 60 * 1000;
/**
* @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time
* finalized commands command cleanup interval time 24h
Expand Down
35 changes: 34 additions & 1 deletion src/controllers/http-api/v1/publish-http-api-controller-v1.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
OPERATION_STATUS,
LOCAL_STORE_TYPES,
COMMAND_PRIORITY,
PUBLISH_MIN_NUM_OF_NODE_REPLICATIONS,
} from '../../../constants/constants.js';

class PublishController extends BaseController {
Expand All @@ -16,6 +17,7 @@ class PublishController extends BaseController {
this.repositoryModuleManager = ctx.repositoryModuleManager;
this.pendingStorageService = ctx.pendingStorageService;
this.networkModuleManager = ctx.networkModuleManager;
this.blockchainModuleManager = ctx.blockchainModuleManager;
}

async handleRequest(req, res) {
Expand Down Expand Up @@ -62,6 +64,37 @@ class PublishController extends BaseController {
datasetRoot,
});

let effectiveMinReplications = minimumNumberOfNodeReplications;
let chainMinNumber = null;
try {
const chainMin = await this.blockchainModuleManager.getMinimumRequiredSignatures(
blockchain,
);
chainMinNumber = Number(chainMin);
} catch (err) {
this.logger.warn(
`Failed to fetch on-chain minimumRequiredSignatures for ${blockchain}: ${err.message}`,
);
}

const userMinNumber = Number(effectiveMinReplications);
const resolvedUserMin =
!Number.isNaN(userMinNumber) && userMinNumber > 0
? userMinNumber
: PUBLISH_MIN_NUM_OF_NODE_REPLICATIONS;

if (!Number.isNaN(chainMinNumber) && chainMinNumber > 0) {
effectiveMinReplications = Math.max(chainMinNumber, resolvedUserMin);
} else {
effectiveMinReplications = resolvedUserMin;
}

if (effectiveMinReplications === 0) {
this.logger.error(
`Effective minimum replications resolved to 0 for operationId: ${operationId}, blockchain: ${blockchain}. This should never happen.`,
);
}

const publisherNodePeerId = this.networkModuleManager.getPeerId().toB58String();
await this.pendingStorageService.cacheDataset(
operationId,
Expand All @@ -80,7 +113,7 @@ class PublishController extends BaseController {
blockchain,
operationId,
storeType: LOCAL_STORE_TYPES.TRIPLE,
minimumNumberOfNodeReplications,
minimumNumberOfNodeReplications: effectiveMinReplications,
},
transactional: false,
priority: COMMAND_PRIORITY.HIGHEST,
Expand Down
4 changes: 4 additions & 0 deletions src/modules/blockchain/blockchain-module-manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,10 @@ class BlockchainModuleManager extends BaseModuleManager {
return this.callImplementationFunction(blockchain, 'getMaximumStake');
}

async getMinimumRequiredSignatures(blockchain) {
return this.callImplementationFunction(blockchain, 'getMinimumRequiredSignatures');
}

async getLatestBlock(blockchain) {
return this.callImplementationFunction(blockchain, 'getLatestBlock');
}
Expand Down
9 changes: 9 additions & 0 deletions src/modules/blockchain/implementation/web3-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,15 @@ class Web3Service {
return Number(ethers.utils.formatEther(maximumStake));
}

async getMinimumRequiredSignatures() {
return this.callContractFunction(
this.contracts.ParametersStorage,
'minimumRequiredSignatures',
[],
CONTRACTS.PARAMETERS_STORAGE,
);
}

async getShardingTableHead() {
return this.callContractFunction(this.contracts.ShardingTableStorage, 'head', []);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,24 @@ class OtBlazegraph extends OtTripleStore {
}

async queryVoid(repository, query, timeout) {
return axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
try {
return await axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
} catch (error) {
const status = error?.response?.status;
const dataSnippet =
typeof error?.response?.data === 'string' ? error.response.data.slice(0, 200) : '';
this.logger.error(
`[OtBlazegraph.queryVoid] Update failed for ${repository} (status: ${status}): ${
error.message
}${dataSnippet ? ` | data: ${dataSnippet}` : ''}`,
);
throw error;
}
}

async deleteRepository(repository) {
Expand Down
27 changes: 27 additions & 0 deletions src/service/operation-id-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,33 @@ class OperationIdService {
delete this.memoryCachedHandlersData[operationId];
}

getOperationIdMemoryCacheSizeBytes() {
let total = 0;
for (const operationId in this.memoryCachedHandlersData) {
const { data } = this.memoryCachedHandlersData[operationId];
total += Buffer.from(JSON.stringify(data)).byteLength;
}
return total;
}

async getOperationIdFileCacheSizeBytes() {
const cacheFolderPath = this.fileService.getOperationIdCachePath();
const cacheFolderExists = await this.fileService.pathExists(cacheFolderPath);
if (!cacheFolderExists) return 0;

const fileList = await this.fileService.readDirectory(cacheFolderPath);
const sizeResults = await Promise.allSettled(
fileList.map((fileName) =>
this.fileService
.stat(path.join(cacheFolderPath, fileName))
.then((stats) => stats.size),
),
);
return sizeResults
.filter((res) => res.status === 'fulfilled')
.reduce((acc, res) => acc + res.value, 0);
}

async removeExpiredOperationIdMemoryCache(expiredTimeout) {
const now = Date.now();
let deleted = 0;
Expand Down
24 changes: 17 additions & 7 deletions src/service/operation-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,31 @@ class OperationService {
return operationIdStatuses;
}

async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) {
async markOperationAsCompleted(
operationId,
blockchain,
responseData,
endStatuses,
options = {},
) {
const { reuseExistingCache = false } = options;
this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`);

await this.repositoryModuleManager.updateOperationStatus(
this.operationName,
operationId,
OPERATION_STATUS.COMPLETED,
);

if (responseData === null) {
await this.operationIdService.removeOperationIdCache(operationId);
} else {
await this.operationIdService.cacheOperationIdDataToMemory(operationId, responseData);
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
if (!reuseExistingCache) {
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
}
}

await this.repositoryModuleManager.updateOperationStatus(
this.operationName,
operationId,
OPERATION_STATUS.COMPLETED,
);
for (let i = 0; i < endStatuses.length; i += 1) {
const status = endStatuses[i];
const response = {
Expand Down
Loading
Loading