Skip to content
25 changes: 20 additions & 5 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
const { id, publishOperationId, merkleRoot, byteSize } = eventData;
const { blockchain, contractAddress } = event;
const operationId = this.operationIdService.generateId();
const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START,
operationId,
Expand Down Expand Up @@ -71,7 +73,9 @@
assertion = result.assertion;
publisherPeerId = result.remotePeerId;
} catch (error) {
this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive
this.logger.error(
`[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId}): ${error.message}`,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
Expand All @@ -81,8 +85,6 @@
return Command.empty();
}

const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

try {
await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual);
} catch (e) {
Expand Down Expand Up @@ -185,23 +187,36 @@

async readWithRetries(publishOperationId) {
let attempt = 0;
const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId);

while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
try {
const datasetPath =
this.fileService.getPendingStorageDocumentPath(publishOperationId);
const stats = await this.fileService.stat(datasetPath);

Check failure on line 194 in src/commands/protocols/publish/publish-finalization-command.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected `await` inside a loop

Check failure on line 194 in src/commands/protocols/publish/publish-finalization-command.js

View workflow job for this annotation

GitHub Actions / lint

Unexpected `await` inside a loop
this.logger.debug(
`[Cache] Cache file present on attempt ${attempt + 1} (publishOperationId: ${publishOperationId}, path: ${datasetPath}, size: ${stats.size} bytes).`,
);

// eslint-disable-next-line no-await-in-loop
const cachedData = await this.fileService.readFile(datasetPath, true);
this.logger.debug(
`[Cache] Read cached publish data on attempt ${attempt + 1} (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
return cachedData;
} catch (error) {
attempt += 1;
this.logger.warn(
`[Cache] Attempt ${attempt} to read cached publish data failed (publishOperationId: ${publishOperationId}, path: ${datasetPath}): ${error.message}`,
);

// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
}
}
this.logger.error(
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
// TODO: Mark this operation as failed
throw new Error('Failed to read cached publish data');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,87 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {
}

async prepareMessage(commandData) {
const { blockchain, operationId, datasetRoot, remotePeerId, isOperationV0 } = commandData;
const {
blockchain,
operationId,
datasetRoot,
remotePeerId,
isOperationV0,
contract,
tokenId,
} = commandData;

// Derive UAL if possible
const ual =
contract && tokenId
? this.ualService.deriveUAL(blockchain, contract, tokenId)
: `pending:${datasetRoot}`;

this.logger.debug(
`[store-request-debug] Starting prepareMessage. OperationId: ${operationId}, UAL: ${ual}, blockchain: ${blockchain}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}, isOperationV0: ${isOperationV0}`,
);

await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_START,
operationId,
blockchain,
);

const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId);
this.logger.debug(
`[store-request-debug] Fetching cached operation data. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`,
);

const cachedData = await this.operationIdService.getCachedOperationIdData(operationId);

// Detailed logging of cached data
const hasCachedData = cachedData !== undefined && cachedData !== null;
const cachedDataKeys = hasCachedData ? Object.keys(cachedData) : [];

this.logger.debug(
`[store-request-debug] Cached data retrieved. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, hasCachedData: ${hasCachedData}, cachedDataKeys: [${cachedDataKeys.join(
', ',
)}]`,
);

if (!hasCachedData) {
this.logger.error(
`[store-request-debug] NO CACHED DATA FOUND! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}. This is likely the source of the problem.`,
);
}

const { dataset } = cachedData || {};

// Detailed dataset logging
const hasDataset = dataset !== undefined;
const isDatasetNull = dataset === null;
const datasetType = typeof dataset;
const datasetSize = hasDataset && !isDatasetNull ? JSON.stringify(dataset).length : 0;
const isDatasetArray = Array.isArray(dataset);
const datasetLength = isDatasetArray ? dataset.length : 'N/A';

this.logger.debug(
`[store-request-debug] Dataset extracted. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, hasDataset: ${hasDataset}, isNull: ${isDatasetNull}, type: ${datasetType}, isArray: ${isDatasetArray}, length: ${datasetLength}, size: ${datasetSize} bytes`,
);

if (isDatasetNull) {
this.logger.error(
`[store-request-debug] DATASET IS NULL! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. Full cachedData keys: [${cachedDataKeys.join(
', ',
)}]`,
);
}

if (!hasDataset) {
this.logger.error(
`[store-request-debug] DATASET IS UNDEFINED! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. Full cachedData: ${JSON.stringify(
cachedData,
)?.substring(0, 500)}`,
);
}

this.logger.debug(
`[store-request-debug] Starting validation. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`,
);

const validationResult = await this.validateReceivedData(
operationId,
Expand All @@ -45,13 +117,20 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {
isOperationV0,
);

this.logger.debug(
`[store-request-debug] Validation complete. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, result messageType: ${validationResult.messageType}`,
);

await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END,
operationId,
blockchain,
);

if (validationResult.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.NACK) {
this.logger.warn(
`[store-request-debug] Validation failed, returning NACK. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, error: ${validationResult.messageData?.errorMessage}`,
);
return validationResult;
}

Expand All @@ -60,18 +139,32 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {
operationId,
blockchain,
);

this.logger.debug(
`[store-request-debug] Starting dataset caching. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, isOperationV0: ${isOperationV0}`,
);

if (isOperationV0) {
const { contract, tokenId } = commandData;
const ual = this.ualService.deriveUAL(blockchain, contract, tokenId);
this.logger.debug(
`[store-request-debug] Creating V6 knowledge collection. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`,
);
await this.tripleStoreService.createV6KnowledgeCollection(dataset, ual);
} else {
this.logger.debug(
`[store-request-debug] Caching dataset to pending storage. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, datasetSize: ${datasetSize} bytes`,
);
await this.pendingStorageService.cacheDataset(
operationId,
datasetRoot,
dataset,
remotePeerId,
);
}

this.logger.debug(
`[store-request-debug] Dataset caching complete. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`,
);

await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_REMOTE_CACHE_DATASET_END,
operationId,
Expand All @@ -82,6 +175,10 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand {

const { v, r, s, vs } = await this.signatureService.signMessage(blockchain, datasetRoot);

this.logger.debug(
`[store-request-debug] Signed message, returning ACK. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, identityId: ${identityId}`,
);

await this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END,
operationId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,50 @@ class PublishReplicationCommand extends Command {
);
return Command.empty();
}
const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId);
const cachedData = await this.operationIdService.getCachedOperationIdData(operationId);

// Log what we retrieved from cache
const hasCachedData = cachedData !== undefined && cachedData !== null;
const hasDataset = cachedData?.dataset !== undefined;
const isDatasetNull = cachedData?.dataset === null;
const hasPublicDataset = cachedData?.dataset?.public !== undefined;
const isPublicDatasetNull = cachedData?.dataset?.public === null;
const publicDatasetSize =
hasPublicDataset && !isPublicDatasetNull
? JSON.stringify(cachedData.dataset.public).length
: 0;

this.logger.debug(
`[publish-sender-debug] Retrieved cached data for sending. OperationId: ${operationId}, hasCachedData: ${hasCachedData}, hasDataset: ${hasDataset}, isDatasetNull: ${isDatasetNull}, hasPublicDataset: ${hasPublicDataset}, isPublicDatasetNull: ${isPublicDatasetNull}, publicDatasetSize: ${publicDatasetSize} bytes`,
);

if (!hasDataset || isDatasetNull) {
this.logger.error(
`[publish-sender-debug] DATASET ISSUE BEFORE SENDING! OperationId: ${operationId}, hasDataset: ${hasDataset}, isDatasetNull: ${isDatasetNull}`,
);
}

if (!hasPublicDataset || isPublicDatasetNull) {
this.logger.error(
`[publish-sender-debug] PUBLIC DATASET ISSUE BEFORE SENDING! OperationId: ${operationId}, hasPublicDataset: ${hasPublicDataset}, isPublicDatasetNull: ${isPublicDatasetNull}. Dataset keys: [${
cachedData?.dataset ? Object.keys(cachedData.dataset).join(', ') : 'N/A'
}]`,
);
}

const { dataset } = cachedData;
const message = {
dataset: dataset.public,
datasetRoot,
blockchain,
};

this.logger.debug(
`[publish-sender-debug] Prepared message for sending. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, blockchain: ${blockchain}, message.dataset size: ${
message.dataset ? JSON.stringify(message.dataset).length : 0
} bytes, sending to ${shardNodes.length} nodes`,
);

// Run all message sending operations in parallel
await Promise.all(
shardNodes.map((node) =>
Expand All @@ -159,15 +196,34 @@ class PublishReplicationCommand extends Command {
}

async sendAndHandleMessage(node, operationId, message, command, blockchain) {
const messageDatasetSize = message.dataset ? JSON.stringify(message.dataset).length : 0;
const { datasetRoot } = message;

this.logger.debug(
`[publish-sender-debug] Sending message to node. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, blockchain: ${blockchain}, targetNode: ${node.id}, protocol: ${node.protocol}, datasetSize: ${messageDatasetSize} bytes`,
);

const sendStartTime = Date.now();
const response = await this.messagingService.sendProtocolMessage(
node,
operationId,
message,
NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST,
NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST,
);
const sendDuration = Date.now() - sendStartTime;

const responseData = response.data;
const responseType = response.header?.messageType;

this.logger.debug(
`[publish-sender-debug] Received response from node. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, targetNode: ${node.id}, responseType: ${responseType}, duration: ${sendDuration}ms`,
);

if (response.header.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.ACK) {
this.logger.debug(
`[publish-sender-debug] ACK received. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, targetNode: ${node.id}, identityId: ${responseData.identityId}`,
);
// eslint-disable-next-line no-await-in-loop
await this.signatureService.addSignatureToStorage(
NETWORK_SIGNATURES_FOLDER,
Expand All @@ -185,6 +241,9 @@ class PublishReplicationCommand extends Command {
responseData,
);
} else {
this.logger.warn(
`[publish-sender-debug] Non-ACK response received. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, targetNode: ${node.id}, responseType: ${responseType}, errorMessage: ${responseData?.errorMessage}`,
);
// eslint-disable-next-line no-await-in-loop
await this.operationService.processResponse(
command,
Expand Down
Loading
Loading