diff --git a/src/commands/protocols/publish/publish-finalization-command.js b/src/commands/protocols/publish/publish-finalization-command.js index b314b5c53b..22609399c8 100644 --- a/src/commands/protocols/publish/publish-finalization-command.js +++ b/src/commands/protocols/publish/publish-finalization-command.js @@ -33,6 +33,8 @@ class PublishFinalizationCommand extends Command { const { id, publishOperationId, merkleRoot, byteSize } = eventData; const { blockchain, contractAddress } = event; const operationId = this.operationIdService.generateId(); + const ual = this.ualService.deriveUAL(blockchain, contractAddress, id); + this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START, operationId, @@ -71,7 +73,9 @@ class PublishFinalizationCommand extends Command { assertion = result.assertion; publisherPeerId = result.remotePeerId; } catch (error) { - this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive + this.logger.error( + `[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId}): ${error.message}`, + ); this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.FAILED, operationId, @@ -81,8 +85,6 @@ class PublishFinalizationCommand extends Command { return Command.empty(); } - const ual = this.ualService.deriveUAL(blockchain, contractAddress, id); - try { await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual); } catch (e) { @@ -185,16 +187,26 @@ class PublishFinalizationCommand extends Command { async readWithRetries(publishOperationId) { let attempt = 0; + const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId); while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) { try { - const datasetPath = - this.fileService.getPendingStorageDocumentPath(publishOperationId); + const stats = await this.fileService.stat(datasetPath); + this.logger.debug( + `[Cache] Cache file present on attempt ${attempt + 1} (publishOperationId: ${publishOperationId}, path: ${datasetPath}, size: ${stats.size} bytes).`, + ); + // eslint-disable-next-line no-await-in-loop const cachedData = await this.fileService.readFile(datasetPath, true); + this.logger.debug( + `[Cache] Read cached publish data on attempt ${attempt + 1} (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`, + ); return cachedData; } catch (error) { attempt += 1; + this.logger.warn( + `[Cache] Attempt ${attempt} to read cached publish data failed (publishOperationId: ${publishOperationId}, path: ${datasetPath}): ${error.message}`, + ); // eslint-disable-next-line no-await-in-loop await new Promise((resolve) => { @@ -202,6 +214,9 @@ class PublishFinalizationCommand extends Command { }); } } + this.logger.error( + `[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`, + ); // TODO: Mark this operation as failed throw new Error('Failed to read cached publish data'); } diff --git a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js index 2f84ce082a..564f070262 100644 --- a/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js +++ b/src/commands/protocols/publish/receiver/v1.0.0/v1-0-0-handle-store-request-command.js @@ -27,7 +27,25 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { } async prepareMessage(commandData) { - const { blockchain, operationId, datasetRoot, remotePeerId, isOperationV0 } = commandData; + const { + blockchain, + operationId, + datasetRoot, + remotePeerId, + isOperationV0, + contract, + tokenId, + } = commandData; + + // Derive UAL if possible + const ual = + contract && tokenId + ? this.ualService.deriveUAL(blockchain, contract, tokenId) + : `pending:${datasetRoot}`; + + this.logger.debug( + `[store-request-debug] Starting prepareMessage. OperationId: ${operationId}, UAL: ${ual}, blockchain: ${blockchain}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}, isOperationV0: ${isOperationV0}`, + ); await this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_START, @@ -35,7 +53,61 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { blockchain, ); - const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId); + this.logger.debug( + `[store-request-debug] Fetching cached operation data. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, + ); + + const cachedData = await this.operationIdService.getCachedOperationIdData(operationId); + + // Detailed logging of cached data + const hasCachedData = cachedData !== undefined && cachedData !== null; + const cachedDataKeys = hasCachedData ? Object.keys(cachedData) : []; + + this.logger.debug( + `[store-request-debug] Cached data retrieved. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, hasCachedData: ${hasCachedData}, cachedDataKeys: [${cachedDataKeys.join( + ', ', + )}]`, + ); + + if (!hasCachedData) { + this.logger.error( + `[store-request-debug] NO CACHED DATA FOUND! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}. This is likely the source of the problem.`, + ); + } + + const { dataset } = cachedData || {}; + + // Detailed dataset logging + const hasDataset = dataset !== undefined; + const isDatasetNull = dataset === null; + const datasetType = typeof dataset; + const datasetSize = hasDataset && !isDatasetNull ? JSON.stringify(dataset).length : 0; + const isDatasetArray = Array.isArray(dataset); + const datasetLength = isDatasetArray ? dataset.length : 'N/A'; + + this.logger.debug( + `[store-request-debug] Dataset extracted. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, hasDataset: ${hasDataset}, isNull: ${isDatasetNull}, type: ${datasetType}, isArray: ${isDatasetArray}, length: ${datasetLength}, size: ${datasetSize} bytes`, + ); + + if (isDatasetNull) { + this.logger.error( + `[store-request-debug] DATASET IS NULL! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. Full cachedData keys: [${cachedDataKeys.join( + ', ', + )}]`, + ); + } + + if (!hasDataset) { + this.logger.error( + `[store-request-debug] DATASET IS UNDEFINED! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. Full cachedData: ${JSON.stringify( + cachedData, + )?.substring(0, 500)}`, + ); + } + + this.logger.debug( + `[store-request-debug] Starting validation. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, + ); const validationResult = await this.validateReceivedData( operationId, @@ -45,6 +117,10 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { isOperationV0, ); + this.logger.debug( + `[store-request-debug] Validation complete. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, result messageType: ${validationResult.messageType}`, + ); + await this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END, operationId, @@ -52,6 +128,9 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { ); if (validationResult.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.NACK) { + this.logger.warn( + `[store-request-debug] Validation failed, returning NACK. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, error: ${validationResult.messageData?.errorMessage}`, + ); return validationResult; } @@ -60,11 +139,20 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { operationId, blockchain, ); + + this.logger.debug( + `[store-request-debug] Starting dataset caching. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, isOperationV0: ${isOperationV0}`, + ); + if (isOperationV0) { - const { contract, tokenId } = commandData; - const ual = this.ualService.deriveUAL(blockchain, contract, tokenId); + this.logger.debug( + `[store-request-debug] Creating V6 knowledge collection. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, + ); await this.tripleStoreService.createV6KnowledgeCollection(dataset, ual); } else { + this.logger.debug( + `[store-request-debug] Caching dataset to pending storage. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, datasetSize: ${datasetSize} bytes`, + ); await this.pendingStorageService.cacheDataset( operationId, datasetRoot, @@ -72,6 +160,11 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { remotePeerId, ); } + + this.logger.debug( + `[store-request-debug] Dataset caching complete. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, + ); + await this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.PUBLISH.PUBLISH_LOCAL_STORE_REMOTE_CACHE_DATASET_END, operationId, @@ -82,6 +175,10 @@ class HandleStoreRequestCommand extends HandleProtocolMessageCommand { const { v, r, s, vs } = await this.signatureService.signMessage(blockchain, datasetRoot); + this.logger.debug( + `[store-request-debug] Signed message, returning ACK. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, identityId: ${identityId}`, + ); + await this.operationIdService.emitChangeEvent( OPERATION_ID_STATUS.PUBLISH.PUBLISH_VALIDATE_ASSET_REMOTE_END, operationId, diff --git a/src/commands/protocols/publish/sender/publish-replication-command.js b/src/commands/protocols/publish/sender/publish-replication-command.js index 1568b1ddfa..f0d89783c8 100644 --- a/src/commands/protocols/publish/sender/publish-replication-command.js +++ b/src/commands/protocols/publish/sender/publish-replication-command.js @@ -132,13 +132,50 @@ class PublishReplicationCommand extends Command { ); return Command.empty(); } - const { dataset } = await this.operationIdService.getCachedOperationIdData(operationId); + const cachedData = await this.operationIdService.getCachedOperationIdData(operationId); + + // Log what we retrieved from cache + const hasCachedData = cachedData !== undefined && cachedData !== null; + const hasDataset = cachedData?.dataset !== undefined; + const isDatasetNull = cachedData?.dataset === null; + const hasPublicDataset = cachedData?.dataset?.public !== undefined; + const isPublicDatasetNull = cachedData?.dataset?.public === null; + const publicDatasetSize = + hasPublicDataset && !isPublicDatasetNull + ? JSON.stringify(cachedData.dataset.public).length + : 0; + + this.logger.debug( + `[publish-sender-debug] Retrieved cached data for sending. OperationId: ${operationId}, hasCachedData: ${hasCachedData}, hasDataset: ${hasDataset}, isDatasetNull: ${isDatasetNull}, hasPublicDataset: ${hasPublicDataset}, isPublicDatasetNull: ${isPublicDatasetNull}, publicDatasetSize: ${publicDatasetSize} bytes`, + ); + + if (!hasDataset || isDatasetNull) { + this.logger.error( + `[publish-sender-debug] DATASET ISSUE BEFORE SENDING! OperationId: ${operationId}, hasDataset: ${hasDataset}, isDatasetNull: ${isDatasetNull}`, + ); + } + + if (!hasPublicDataset || isPublicDatasetNull) { + this.logger.error( + `[publish-sender-debug] PUBLIC DATASET ISSUE BEFORE SENDING! OperationId: ${operationId}, hasPublicDataset: ${hasPublicDataset}, isPublicDatasetNull: ${isPublicDatasetNull}. Dataset keys: [${ + cachedData?.dataset ? Object.keys(cachedData.dataset).join(', ') : 'N/A' + }]`, + ); + } + + const { dataset } = cachedData; const message = { dataset: dataset.public, datasetRoot, blockchain, }; + this.logger.debug( + `[publish-sender-debug] Prepared message for sending. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, blockchain: ${blockchain}, message.dataset size: ${ + message.dataset ? JSON.stringify(message.dataset).length : 0 + } bytes, sending to ${shardNodes.length} nodes`, + ); + // Run all message sending operations in parallel await Promise.all( shardNodes.map((node) => @@ -159,6 +196,14 @@ class PublishReplicationCommand extends Command { } async sendAndHandleMessage(node, operationId, message, command, blockchain) { + const messageDatasetSize = message.dataset ? JSON.stringify(message.dataset).length : 0; + const { datasetRoot } = message; + + this.logger.debug( + `[publish-sender-debug] Sending message to node. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, blockchain: ${blockchain}, targetNode: ${node.id}, protocol: ${node.protocol}, datasetSize: ${messageDatasetSize} bytes`, + ); + + const sendStartTime = Date.now(); const response = await this.messagingService.sendProtocolMessage( node, operationId, @@ -166,8 +211,19 @@ class PublishReplicationCommand extends Command { NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_REQUEST, NETWORK_MESSAGE_TIMEOUT_MILLS.PUBLISH.REQUEST, ); + const sendDuration = Date.now() - sendStartTime; + const responseData = response.data; + const responseType = response.header?.messageType; + + this.logger.debug( + `[publish-sender-debug] Received response from node. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, targetNode: ${node.id}, responseType: ${responseType}, duration: ${sendDuration}ms`, + ); + if (response.header.messageType === NETWORK_MESSAGE_TYPES.RESPONSES.ACK) { + this.logger.debug( + `[publish-sender-debug] ACK received. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, targetNode: ${node.id}, identityId: ${responseData.identityId}`, + ); // eslint-disable-next-line no-await-in-loop await this.signatureService.addSignatureToStorage( NETWORK_SIGNATURES_FOLDER, @@ -185,6 +241,9 @@ class PublishReplicationCommand extends Command { responseData, ); } else { + this.logger.warn( + `[publish-sender-debug] Non-ACK response received. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, targetNode: ${node.id}, responseType: ${responseType}, errorMessage: ${responseData?.errorMessage}`, + ); // eslint-disable-next-line no-await-in-loop await this.operationService.processResponse( command, diff --git a/src/controllers/rpc/publish-rpc-controller.js b/src/controllers/rpc/publish-rpc-controller.js index 845025cdaa..4f35e6d8dc 100644 --- a/src/controllers/rpc/publish-rpc-controller.js +++ b/src/controllers/rpc/publish-rpc-controller.js @@ -11,6 +11,45 @@ class PublishController extends BaseController { async v1_0_0HandleRequest(message, remotePeerId, protocol) { const { operationId, messageType } = message.header; + const { blockchain, contract, tokenId, datasetRoot } = message.data || {}; + + // Derive UAL if possible + const ual = + blockchain && contract && tokenId + ? `did:dkg:${blockchain}/${contract}/${tokenId}` + : 'N/A'; + + this.logger.debug( + `[publish-rpc-debug] Received request. OperationId: ${operationId}, UAL: ${ual}, messageType: ${messageType}, remotePeerId: ${remotePeerId}, protocol: ${protocol}, blockchain: ${blockchain}, datasetRoot: ${datasetRoot}`, + ); + + // Log the incoming message data structure + const messageDataKeys = message.data ? Object.keys(message.data) : []; + const hasDataset = message.data?.dataset !== undefined; + const isDatasetNull = message.data?.dataset === null; + const datasetType = typeof message.data?.dataset; + const datasetSize = + hasDataset && !isDatasetNull ? JSON.stringify(message.data.dataset).length : 0; + + this.logger.debug( + `[publish-rpc-debug] Message data. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, keys: [${messageDataKeys.join( + ', ', + )}], hasDataset: ${hasDataset}, isDatasetNull: ${isDatasetNull}, datasetType: ${datasetType}, datasetSize: ${datasetSize} bytes`, + ); + + if (isDatasetNull) { + this.logger.error( + `[publish-rpc-debug] RECEIVED NULL DATASET! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. This is likely the root cause of cache issues.`, + ); + } + + if (!hasDataset) { + this.logger.error( + `[publish-rpc-debug] RECEIVED NO DATASET (undefined)! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. Full message.data: ${JSON.stringify( + message.data, + )?.substring(0, 1000)}`, + ); + } const command = { sequence: [], transactional: false, data: {} }; const [handleRequestCommand] = this.getCommandSequence(protocol); @@ -20,6 +59,10 @@ class PublishController extends BaseController { priority: COMMAND_PRIORITY.HIGHEST, }); + this.logger.debug( + `[publish-rpc-debug] Caching operation data. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, datasetSize: ${datasetSize} bytes`, + ); + await this.operationIdService.cacheOperationIdDataToMemory(operationId, { dataset: message.data.dataset, datasetRoot: message.data.datasetRoot, @@ -29,7 +72,14 @@ class PublishController extends BaseController { dataset: message.data.dataset, datasetRoot: message.data.datasetRoot, }); + + this.logger.debug( + `[publish-rpc-debug] Operation data cached. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, + ); } else { + this.logger.error( + `[publish-rpc-debug] Unknown message type: ${messageType}. OperationId: ${operationId}, UAL: ${ual}`, + ); throw new Error('Unknown message type'); } @@ -46,6 +96,10 @@ class PublishController extends BaseController { tokenId: message.data.tokenId, }; + this.logger.debug( + `[publish-rpc-debug] Adding command to executor. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, command: ${handleRequestCommand}`, + ); + await this.commandExecutor.add(command); } } diff --git a/src/controllers/rpc/rpc-router.js b/src/controllers/rpc/rpc-router.js index 8dbf24e4e8..d71d864981 100644 --- a/src/controllers/rpc/rpc-router.js +++ b/src/controllers/rpc/rpc-router.js @@ -29,6 +29,48 @@ class RpcRouter { const blockchainImplementations = this.blockchainModuleManager.getImplementationNames(); this.networkModuleManager.handleMessage(protocol, (message, remotePeerId) => { + const operationId = message.header?.operationId; + const messageType = message.header?.messageType; + + // Extract identifiers for logging + const { blockchain, contract, tokenId, datasetRoot } = message.data || {}; + const ual = + blockchain && contract && tokenId + ? `did:dkg:${blockchain}/${contract}/${tokenId}` + : 'N/A'; + + // Log incoming message at router level + const hasMessageData = message.data !== undefined && message.data !== null; + const messageDataKeys = hasMessageData ? Object.keys(message.data) : []; + const hasDataset = message.data?.dataset !== undefined; + const isDatasetNull = message.data?.dataset === null; + const datasetSize = + hasDataset && !isDatasetNull ? JSON.stringify(message.data.dataset).length : 0; + + this.logger.debug( + `[rpc-router-debug] Message received at router. Protocol: ${protocol}, operation: ${operation}, operationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, messageType: ${messageType}, remotePeerId: ${remotePeerId}`, + ); + + this.logger.debug( + `[rpc-router-debug] Message data inspection. OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, hasData: ${hasMessageData}, dataKeys: [${messageDataKeys.join( + ', ', + )}], hasDataset: ${hasDataset}, isDatasetNull: ${isDatasetNull}, datasetSize: ${datasetSize} bytes`, + ); + + if (isDatasetNull) { + this.logger.error( + `[rpc-router-debug] DATASET IS NULL AT ROUTER LEVEL! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}, protocol: ${protocol}. This indicates the sender sent null or data was corrupted in transit.`, + ); + } + + if (hasMessageData && !hasDataset && operation === 'publish') { + this.logger.error( + `[rpc-router-debug] DATASET MISSING FROM PUBLISH MESSAGE! OperationId: ${operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}. Available keys: [${messageDataKeys.join( + ', ', + )}]`, + ); + } + const modifiedMessage = this.modifyMessage(message, blockchainImplementations); this[controller][handleRequest](modifiedMessage, remotePeerId, protocol); }); diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index 112ad76ee2..331291dc0f 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -205,15 +205,29 @@ class Libp2pService { this.node.handle(protocol, async (handlerProps) => { const { stream } = handlerProps; const peerIdString = handlerProps.connection.remotePeer.toB58String(); + const handleStartTime = Date.now(); + + this.logger.debug( + `[libp2p-debug] Incoming connection from peer: ${peerIdString}, protocol: ${protocol}, awaiting message...`, + ); + const { message, valid, busy } = await this._readMessageFromStream( stream, this.isRequestValid.bind(this), peerIdString, ); + const readDuration = Date.now() - handleStartTime; + this.logger.debug( + `[libp2p-debug] Message read complete from peer: ${peerIdString}, protocol: ${protocol}, operationId: ${message.header.operationId}, valid: ${valid}, busy: ${busy}, read duration: ${readDuration}ms`, + ); + this.updateSessionStream(message.header.operationId, peerIdString, stream); if (!valid) { + this.logger.warn( + `[libp2p-debug] Sending NACK for invalid message from peer: ${peerIdString}, protocol: ${protocol}, operationId: ${message.header.operationId}`, + ); await this.sendMessageResponse( protocol, peerIdString, @@ -223,6 +237,9 @@ class Libp2pService { ); this.removeCachedSession(message.header.operationId, peerIdString); } else if (busy) { + this.logger.warn( + `[libp2p-debug] Sending BUSY response to peer: ${peerIdString}, protocol: ${protocol}, operationId: ${message.header.operationId}`, + ); await this.sendMessageResponse( protocol, peerIdString, @@ -232,9 +249,25 @@ class Libp2pService { ); this.removeCachedSession(message.header.operationId, peerIdString); } else { + // Extract identifiers for logging + const { blockchain, contract, tokenId, datasetRoot } = message.data || {}; + const ual = + blockchain && contract && tokenId + ? `did:dkg:${blockchain}/${contract}/${tokenId}` + : 'N/A'; + this.logger.debug( - `Receiving message from ${peerIdString} to ${this.config.id}: protocol: ${protocol}, messageType: ${message.header.messageType};`, + `Receiving message from ${peerIdString} to ${this.config.id}: protocol: ${protocol}, messageType: ${message.header.messageType}, operationId: ${message.header.operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, ); + + // Log dataset presence before passing to handler + if (message.data?.dataset !== undefined) { + const datasetSize = JSON.stringify(message.data.dataset).length; + this.logger.debug( + `[libp2p-debug] Passing message with dataset to handler. Peer: ${peerIdString}, operationId: ${message.header.operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, dataset size: ${datasetSize} bytes`, + ); + } + await handler(message, peerIdString); } }); @@ -420,9 +453,35 @@ class Libp2pService { } async _sendMessageToStream(stream, message) { + const sendStartTime = Date.now(); const stringifiedHeader = JSON.stringify(message.header); const stringifiedData = JSON.stringify(message.data); + // Extract identifiers for logging + const { blockchain, contract, tokenId, datasetRoot } = message.data || {}; + const ual = + blockchain && contract && tokenId + ? `did:dkg:${blockchain}/${contract}/${tokenId}` + : 'N/A'; + + this.logger.debug( + `[libp2p-debug] Preparing to send message. OperationId: ${message.header.operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, messageType: ${message.header.messageType}, header size: ${stringifiedHeader.length} bytes, data size: ${stringifiedData.length} bytes`, + ); + + // Log data structure being sent + if (message.data?.dataset !== undefined) { + const datasetSize = JSON.stringify(message.data.dataset).length; + const datasetType = typeof message.data.dataset; + const isArray = Array.isArray(message.data.dataset); + this.logger.debug( + `[libp2p-debug] Sending dataset. OperationId: ${ + message.header.operationId + }, UAL: ${ual}, datasetRoot: ${datasetRoot}, dataset size: ${datasetSize} bytes, type: ${datasetType}, isArray: ${isArray}, length: ${ + isArray ? message.data.dataset.length : 'N/A' + }`, + ); + } + const chunks = [stringifiedHeader]; const chunkSize = BYTES_IN_MEGABYTE; // 1 MB @@ -431,15 +490,39 @@ class Libp2pService { chunks.push(stringifiedData.slice(i, i + chunkSize)); } - await pipe( - chunks, - // turn strings into buffers - (source) => map(source, (string) => Buffer.from(string)), - // Encode with length prefix (so receiving side knows how much data is coming) - encode(), - // Write to the stream (the sink) - stream.sink, + this.logger.debug( + `[libp2p-debug] Sending message in ${chunks.length} chunks (1 header + ${ + chunks.length - 1 + } data chunks). OperationId: ${ + message.header.operationId + }, UAL: ${ual}, datasetRoot: ${datasetRoot}`, ); + + try { + await pipe( + chunks, + // turn strings into buffers + (source) => map(source, (string) => Buffer.from(string)), + // Encode with length prefix (so receiving side knows how much data is coming) + encode(), + // Write to the stream (the sink) + stream.sink, + ); + + const sendDuration = Date.now() - sendStartTime; + this.logger.debug( + `[libp2p-debug] Message sent successfully. OperationId: ${ + message.header.operationId + }, UAL: ${ual}, datasetRoot: ${datasetRoot}, duration: ${sendDuration}ms, total bytes: ${ + stringifiedHeader.length + stringifiedData.length + }`, + ); + } catch (error) { + this.logger.error( + `[libp2p-debug] Failed to send message to stream. OperationId: ${message.header.operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, error: ${error.message}`, + ); + throw error; + } } async _readMessageFromStream(stream, isMessageValid, peerIdString) { @@ -457,22 +540,46 @@ class Libp2pService { async readMessageSink(source, isMessageValid, peerIdString) { const message = { header: { operationId: '' }, data: {} }; + const readStartTime = Date.now(); + // we expect first buffer to be header const stringifiedHeader = (await source.next()).value; + this.logger.debug( + `[libp2p-debug] Reading message from peer: ${peerIdString}, header raw length: ${ + stringifiedHeader?.length ?? 0 + } bytes`, + ); + if (!stringifiedHeader?.length) { + this.logger.warn( + `[libp2p-debug] Empty or missing header from peer: ${peerIdString}. Raw value: ${JSON.stringify( + stringifiedHeader, + )}`, + ); return { message, valid: false, busy: false }; } try { message.header = JSON.parse(stringifiedHeader); + this.logger.debug( + `[libp2p-debug] Parsed header from peer: ${peerIdString}, operationId: ${message.header.operationId}, messageType: ${message.header.messageType}`, + ); } catch (error) { + this.logger.error( + `[libp2p-debug] Failed to parse header JSON from peer: ${peerIdString}. Error: ${ + error.message + }. Raw header (first 500 chars): ${stringifiedHeader?.substring(0, 500)}`, + ); // Return the same format as invalid request case return { message, valid: false, busy: false }; } // validate request / response if (!(await isMessageValid(message.header, peerIdString))) { + this.logger.warn( + `[libp2p-debug] Message validation failed from peer: ${peerIdString}, operationId: ${message.header.operationId}, messageType: ${message.header.messageType}`, + ); return { message, valid: false }; } @@ -481,18 +588,86 @@ class Libp2pService { message.header.messageType === NETWORK_MESSAGE_TYPES.REQUESTS.PROTOCOL_INIT && this.isBusy() ) { + this.logger.debug( + `[libp2p-debug] Node is busy, returning busy response for peer: ${peerIdString}, operationId: ${message.header.operationId}`, + ); return { message, valid: true, busy: true }; } let stringifiedData = ''; + let chunkCount = 0; + let totalBytesReceived = 0; // read data the data try { for await (const chunk of source) { + chunkCount += 1; + const chunkLength = chunk?.length ?? 0; + totalBytesReceived += chunkLength; + this.logger.trace( + `[libp2p-debug] Received chunk ${chunkCount} from peer: ${peerIdString}, operationId: ${message.header.operationId}, chunk size: ${chunkLength} bytes, total so far: ${totalBytesReceived} bytes`, + ); stringifiedData += chunk; } + + this.logger.debug( + `[libp2p-debug] Finished receiving data from peer: ${peerIdString}, operationId: ${ + message.header.operationId + }, total chunks: ${chunkCount}, total bytes: ${totalBytesReceived}, read duration: ${ + Date.now() - readStartTime + }ms`, + ); + + const parseStartTime = Date.now(); message.data = JSON.parse(stringifiedData); + const parseDuration = Date.now() - parseStartTime; + + // Extract identifiers for logging + const { blockchain, contract, tokenId, datasetRoot } = message.data || {}; + const ual = + blockchain && contract && tokenId + ? `did:dkg:${blockchain}/${contract}/${tokenId}` + : 'N/A'; + + // Log data structure info + const dataKeys = Object.keys(message.data); + const datasetSize = message.data.dataset + ? JSON.stringify(message.data.dataset).length + : 0; + this.logger.debug( + `[libp2p-debug] Parsed data from peer: ${peerIdString}, operationId: ${ + message.header.operationId + }, UAL: ${ual}, datasetRoot: ${datasetRoot}, data keys: [${dataKeys.join( + ', ', + )}], dataset size: ${datasetSize} bytes, parse duration: ${parseDuration}ms`, + ); + + // Validate dataset structure if present + if (message.data.dataset !== undefined) { + const datasetType = typeof message.data.dataset; + const isArray = Array.isArray(message.data.dataset); + const datasetLength = isArray ? message.data.dataset.length : 'N/A'; + this.logger.debug( + `[libp2p-debug] Dataset info from peer: ${peerIdString}, operationId: ${message.header.operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}, type: ${datasetType}, isArray: ${isArray}, length: ${datasetLength}`, + ); + + if (message.data.dataset === null) { + this.logger.warn( + `[libp2p-debug] Dataset is NULL from peer: ${peerIdString}, operationId: ${message.header.operationId}, UAL: ${ual}, datasetRoot: ${datasetRoot}`, + ); + } + } } catch (error) { + this.logger.error( + `[libp2p-debug] Failed to parse data JSON from peer: ${peerIdString}, operationId: ${ + message.header.operationId + }. Error: ${ + error.message + }. Total bytes received: ${totalBytesReceived}, chunks: ${chunkCount}. Raw data (first 1000 chars): ${stringifiedData?.substring( + 0, + 1000, + )}`, + ); // If data parsing fails, return invalid message response return { message, valid: false, busy: false }; } diff --git a/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js b/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js index 16a4d9af36..57ad667398 100644 --- a/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js +++ b/src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js @@ -1,4 +1,5 @@ import axios from 'axios'; +import { writeFile } from 'fs/promises'; import OtTripleStore from '../ot-triple-store.js'; import { MEDIA_TYPES } from '../../../../constants/constants.js'; @@ -177,12 +178,60 @@ class OtBlazegraph extends OtTripleStore { } async queryVoid(repository, query, timeout) { - return axios.post(this.repositories[repository].sparqlEndpoint, query, { - headers: { - 'Content-Type': 'application/sparql-update; charset=UTF-8', - 'X-BIGDATA-MAX-QUERY-MILLIS': timeout, - }, - }); + const snippet = query?.slice(0, 80)?.replace(/\s+/g, ' ') || ''; + const label = `[OtBlazegraph.queryVoid] ${repository} ${snippet}`; + if (this.logger?.startTimer) this.logger.startTimer(label); + try { + this.logger.debug(`[OtBlazegraph.queryVoid] Sending update to ${repository}`); + const response = await axios.post(this.repositories[repository].sparqlEndpoint, query, { + headers: { + 'Content-Type': 'application/sparql-update; charset=UTF-8', + 'X-BIGDATA-MAX-QUERY-MILLIS': timeout, + }, + }); + this.logger.debug( + `[OtBlazegraph.queryVoid] Update result for ${repository} (status: ${response.status})`, + ); + const responseData = { + status: response.status, + statusText: response.statusText, + headers: response.headers, + data: response.data, + config: { + url: response.config?.url, + method: response.config?.method, + timeout: response.config?.timeout, + }, + }; + this.logger.debug('[OtBlazegraph.queryVoid] Response: ' + JSON.stringify(responseData, null, 2)); + + const now = new Date(); + const dateStr = `${now.getFullYear()}-${String(now.getMonth() + 1).padStart(2, '0')}-${String(now.getDate()).padStart(2, '0')}-${String(now.getHours()).padStart(2, '0')}-${String(now.getMinutes()).padStart(2, '0')}-${String(now.getSeconds()).padStart(2, '0')}`; + await writeFile(`response_${response.status}_${dateStr}.txt`, JSON.stringify(responseData, null, 2)); + + if (response.status !== 200) { + this.logger.debug( + `[OtBlazegraph.queryVoid] Response not 200 for ${repository} (status: ${response.status}), response: ${response.data}`, + ); + + } + return response; + } catch (error) { + this.logger.debug('[OtBlazegraph.queryVoid] Error: ' + JSON.stringify(error, null, 2)); + const status = error?.response?.status; + const dataSnippet = + typeof error?.response?.data === 'string' + ? error.response.data.slice(0, 200) + : ''; + this.logger.error( + `[OtBlazegraph.queryVoid] Update failed for ${repository} (status: ${status}): ${error.message}${ + dataSnippet ? ` | data: ${dataSnippet}` : '' + } | query: ${snippet || ''}`, + ); + throw error; + } finally { + if (this.logger?.endTimer) this.logger.endTimer(label); + } } async deleteRepository(repository) { diff --git a/src/service/operation-id-service.js b/src/service/operation-id-service.js index b959699e21..244ee8b3de 100644 --- a/src/service/operation-id-service.js +++ b/src/service/operation-id-service.js @@ -107,6 +107,32 @@ class OperationIdService { async cacheOperationIdDataToMemory(operationId, data) { this.logger.debug(`Caching data for operation id: ${operationId} in memory`); + // Log data structure being cached + const dataKeys = data ? Object.keys(data) : []; + const dataSize = data ? JSON.stringify(data).length : 0; + const hasDataset = data?.dataset !== undefined; + const datasetSize = hasDataset ? JSON.stringify(data.dataset).length : 0; + + this.logger.debug( + `[cache-debug] Caching to memory. OperationId: ${operationId}, data keys: [${dataKeys.join( + ', ', + )}], total size: ${dataSize} bytes, has dataset: ${hasDataset}, dataset size: ${datasetSize} bytes`, + ); + + if (hasDataset) { + const datasetType = typeof data.dataset; + const isDatasetNull = data.dataset === null; + const datasetPublicSize = data.dataset?.public + ? JSON.stringify(data.dataset.public).length + : 0; + const datasetPrivateSize = data.dataset?.private + ? JSON.stringify(data.dataset.private).length + : 0; + this.logger.debug( + `[cache-debug] Dataset details for operationId: ${operationId}, type: ${datasetType}, isNull: ${isDatasetNull}, public size: ${datasetPublicSize} bytes, private size: ${datasetPrivateSize} bytes`, + ); + } + this.memoryCachedHandlersData[operationId] = { data, timestamp: Date.now() }; } @@ -114,6 +140,11 @@ class OperationIdService { this.logger.debug(`Caching data for operation id: ${operationId} in file`); const operationIdCachePath = this.fileService.getOperationIdCachePath(); + const dataSize = data ? JSON.stringify(data).length : 0; + this.logger.debug( + `[cache-debug] Caching to file. OperationId: ${operationId}, path: ${operationIdCachePath}, size: ${dataSize} bytes`, + ); + await this.fileService.writeContentsToFile( operationIdCachePath, operationId, @@ -124,16 +155,66 @@ class OperationIdService { async getCachedOperationIdData(operationId) { if (this.memoryCachedHandlersData[operationId]) { this.logger.debug(`Reading operation id: ${operationId} cached data from memory`); - return this.memoryCachedHandlersData[operationId].data; + + const cachedEntry = this.memoryCachedHandlersData[operationId]; + const { data, timestamp } = cachedEntry; + const cacheAge = Date.now() - timestamp; + + // Log what we're returning from cache + const dataKeys = data ? Object.keys(data) : []; + const hasDataset = data?.dataset !== undefined; + const datasetSize = hasDataset ? JSON.stringify(data.dataset).length : 0; + const isDatasetNull = hasDataset && data.dataset === null; + const isDatasetPublicNull = hasDataset && data.dataset?.public === null; + const isDatasetPublicUndefined = hasDataset && data.dataset?.public === undefined; + + this.logger.debug( + `[cache-debug] Memory cache HIT. OperationId: ${operationId}, cache age: ${cacheAge}ms, data keys: [${dataKeys.join( + ', ', + )}], has dataset: ${hasDataset}, dataset size: ${datasetSize} bytes`, + ); + + if (hasDataset) { + this.logger.debug( + `[cache-debug] Dataset state in cache. OperationId: ${operationId}, isNull: ${isDatasetNull}, public isNull: ${isDatasetPublicNull}, public isUndefined: ${isDatasetPublicUndefined}`, + ); + } + + return data; } this.logger.debug( - `Didn't manage to get cached ${operationId} data from memory, trying file`, + `[cache-debug] Memory cache MISS for operationId: ${operationId}, trying file`, ); const documentPath = this.fileService.getOperationIdDocumentPath(operationId); let data; if (await this.fileService.pathExists(documentPath)) { - data = await this.fileService.readFile(documentPath, true); + this.logger.debug( + `[cache-debug] File cache exists for operationId: ${operationId}, path: ${documentPath}`, + ); + try { + data = await this.fileService.readFile(documentPath, true); + + // Log what we read from file + const dataKeys = data ? Object.keys(data) : []; + const hasDataset = data?.dataset !== undefined; + const datasetSize = hasDataset ? JSON.stringify(data.dataset).length : 0; + + this.logger.debug( + `[cache-debug] File cache read success. OperationId: ${operationId}, data keys: [${dataKeys.join( + ', ', + )}], has dataset: ${hasDataset}, dataset size: ${datasetSize} bytes`, + ); + } catch (error) { + this.logger.error( + `[cache-debug] File cache read FAILED. OperationId: ${operationId}, path: ${documentPath}, error: ${error.message}`, + ); + throw error; + } + } else { + this.logger.warn( + `[cache-debug] File cache MISS (file does not exist). OperationId: ${operationId}, path: ${documentPath}`, + ); } return data; } diff --git a/src/service/pending-storage-service.js b/src/service/pending-storage-service.js index f049ad927a..585f896f49 100644 --- a/src/service/pending-storage-service.js +++ b/src/service/pending-storage-service.js @@ -17,15 +17,59 @@ class PendingStorageService { `Caching ${datasetRoot} dataset root, operation id: ${operationId} in file in pending storage`, ); - await this.fileService.writeContentsToFile( - this.fileService.getPendingStorageCachePath(), - operationId, - JSON.stringify({ - merkleRoot: datasetRoot, - assertion: dataset, - remotePeerId, - }), + // Detailed logging of what's being cached + const datasetType = typeof dataset; + const isDatasetNull = dataset === null; + const isDatasetUndefined = dataset === undefined; + const datasetSize = dataset ? JSON.stringify(dataset).length : 0; + const isArray = Array.isArray(dataset); + + this.logger.debug( + `[pending-storage-debug] Caching dataset. OperationId: ${operationId}, datasetRoot: ${datasetRoot}, remotePeerId: ${remotePeerId}, dataset type: ${datasetType}, isNull: ${isDatasetNull}, isUndefined: ${isDatasetUndefined}, isArray: ${isArray}, size: ${datasetSize} bytes`, ); + + if (dataset && typeof dataset === 'object') { + const datasetKeys = Object.keys(dataset); + this.logger.debug( + `[pending-storage-debug] Dataset structure. OperationId: ${operationId}, keys: [${datasetKeys.join( + ', ', + )}]`, + ); + + // Log sample of dataset content for debugging + if (isArray && dataset.length > 0) { + this.logger.debug( + `[pending-storage-debug] Dataset is array with ${ + dataset.length + } items. OperationId: ${operationId}, first item type: ${typeof dataset[0]}`, + ); + } + } + + const dataToCache = { + merkleRoot: datasetRoot, + assertion: dataset, + remotePeerId, + }; + + const serializedData = JSON.stringify(dataToCache); + const cachePath = this.fileService.getPendingStorageCachePath(); + + this.logger.debug( + `[pending-storage-debug] Writing to pending storage. OperationId: ${operationId}, path: ${cachePath}, serialized size: ${serializedData.length} bytes`, + ); + + try { + await this.fileService.writeContentsToFile(cachePath, operationId, serializedData); + this.logger.debug( + `[pending-storage-debug] Successfully cached dataset. OperationId: ${operationId}`, + ); + } catch (error) { + this.logger.error( + `[pending-storage-debug] Failed to cache dataset. OperationId: ${operationId}, error: ${error.message}`, + ); + throw error; + } } async getCachedDataset(operationId) { @@ -33,12 +77,36 @@ class PendingStorageService { const filePath = this.fileService.getPendingStorageDocumentPath(operationId); + this.logger.debug( + `[pending-storage-debug] Reading cached dataset. OperationId: ${operationId}, path: ${filePath}`, + ); + try { const fileContents = await this.fileService.readFile(filePath, true); + + // Log what we read + const hasAssertion = fileContents?.assertion !== undefined; + const assertionType = typeof fileContents?.assertion; + const assertionSize = hasAssertion ? JSON.stringify(fileContents.assertion).length : 0; + const isAssertionNull = fileContents?.assertion === null; + const merkleRoot = fileContents?.merkleRoot; + + this.logger.debug( + `[pending-storage-debug] Read cached dataset. OperationId: ${operationId}, has assertion: ${hasAssertion}, assertion type: ${assertionType}, isNull: ${isAssertionNull}, assertion size: ${assertionSize} bytes, merkleRoot: ${merkleRoot}`, + ); + + if (isAssertionNull) { + this.logger.warn( + `[pending-storage-debug] Assertion is NULL in cached file! OperationId: ${operationId}, full file contents keys: [${Object.keys( + fileContents || {}, + ).join(', ')}]`, + ); + } + return fileContents.assertion; } catch (error) { this.logger.error( - `Failed to retrieve or parse cached dataset for ${operationId}: ${error.message}`, + `[pending-storage-debug] Failed to retrieve or parse cached dataset. OperationId: ${operationId}, path: ${filePath}, error: ${error.message}, stack: ${error.stack}`, ); throw error; } diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 6f043c1a52..f1aaf0cacb 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -69,7 +69,7 @@ class PublishService extends OperationService { // } // 2. Check if all responses have been received - if (totalResponses === numberOfFoundNodes) { + // if { // 2.1 If minimum replication is reached, mark the operation as completed if (completedNumber >= minAckResponses) { await this.markOperationAsCompleted( @@ -82,7 +82,7 @@ class PublishService extends OperationService { this.logResponsesSummary(completedNumber, failedNumber); } // 2.2 Otherwise, mark as failed - else { + else if (totalResponses === numberOfFoundNodes) { await this.markOperationAsFailed( operationId, blockchain, @@ -95,7 +95,7 @@ class PublishService extends OperationService { ); this.logResponsesSummary(completedNumber, failedNumber); } - } + // } // else { // // 3. Not all responses have arrived yet. // const potentialCompletedNumber = completedNumber + leftoverNodes.length; diff --git a/src/service/triple-store-service.js b/src/service/triple-store-service.js index 14395ff7c4..3933acddfa 100644 --- a/src/service/triple-store-service.js +++ b/src/service/triple-store-service.js @@ -1,5 +1,6 @@ /* eslint-disable no-await-in-loop */ import { setTimeout } from 'timers/promises'; +import { writeFile } from 'fs/promises'; import { kcTools } from 'assertion-tools'; import { BASE_NAMED_GRAPHS, @@ -57,6 +58,12 @@ class TripleStoreService { `to the Triple Store's ${repository} repository.`, ); + const totalInsertLabel = `[TripleStoreService.insertKnowledgeCollection TOTAL] ${repository} ${knowledgeCollectionUAL}`; + const attemptInsertLabel = (attempt) => + `[TripleStoreService.insertKnowledgeCollection ATTEMPT ${attempt}] ${repository} ${knowledgeCollectionUAL}`; + + this.logger.startTimer(totalInsertLabel); + const publicAssertion = triples.public ?? triples; const filteredPublic = []; @@ -272,13 +279,43 @@ class TripleStoreService { let success = false; while (attempts < retries && !success) { + const attemptTimerLabel = attemptInsertLabel(attempts + 1); + this.logger.debug( + `INSERT ATTEMPT: ${attempts} of ${retries}, UAL: ${knowledgeCollectionUAL}`, + ); + this.logger.startTimer(attemptTimerLabel); try { - await this.tripleStoreModuleManager.queryVoid( - this.repositoryImplementations[repository], - repository, - insertQuery, - this.config.modules.tripleStore.timeout.insert, - ); + const queryLabel = `[TripleStoreService.insertKnowledgeCollection QUERY] ${repository} ${knowledgeCollectionUAL}`; + this.logger.debug(queryLabel); + + await writeFile(`insert_query_${Date.now()}_attempt_${attempts}.txt`, insertQuery); + + this.logger.startTimer(queryLabel); + try { + await this.tripleStoreModuleManager.queryVoid( + this.repositoryImplementations[repository], + repository, + insertQuery, + this.config.modules.tripleStore.timeout.insert, + ); + this.logger.debug( + `queryVoid succeeded for repository: ${repository}, UAL: ${knowledgeCollectionUAL}`, + ); + } catch (queryError) { + const status = queryError?.response?.status; + const dataSnippet = + typeof queryError?.response?.data === 'string' + ? queryError.response.data.slice(0, 200) + : ''; + this.logger.error( + `queryVoid failed for repository: ${repository}, UAL: ${knowledgeCollectionUAL}, status: ${status}. ${queryError.message}${ + dataSnippet ? ` | data: ${dataSnippet}` : '' + }`, + ); + throw queryError; + } finally { + this.logger.endTimer(queryLabel); + } if (paranetUAL) { await this.tripleStoreModuleManager.createParanetKnoledgeCollectionConnection( this.repositoryImplementations[repository], @@ -336,14 +373,18 @@ class TripleStoreService { ), ]); + this.logger.endTimer(totalInsertLabel); throw new Error( `Failed to store Knowledge Collection with the UAL: ${knowledgeCollectionUAL} ` + `to the Triple Store's ${repository} repository after maximum retries. Error ${error}`, ); } + } finally { + this.logger.endTimer(attemptTimerLabel); } } + this.logger.endTimer(totalInsertLabel); return totalNumberOfTriplesInserted; }