Skip to content
103 changes: 83 additions & 20 deletions apps/cpu-profile-summarizer/src/worker.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
// Copyright (c) Microsoft Corporation. All rights reserved. Licensed under the MIT license.
// See LICENSE in the project root for license information.

import fs from 'node:fs';
import worker_threads from 'node:worker_threads';
import * as fs from 'fs';
import { parentPort, type MessagePort } from 'worker_threads';

import type { ICallFrame, ICpuProfile, INodeSummary, IProfileSummary } from './types';
import type { IMessageToWorker } from './protocol';

/**
* Tracks the time spent in a local node.
*/
/**
* Tracks the time spent in a local node.
*/
interface ILocalTimeInfo {
/** Time spent exclusively in this node (excluding children). */
/** Time spent exclusively in this node (excluding children). */
self: number;
contributors: Set<number> | undefined;
}

/**
* Computes an identifier to use for summarizing call frames.
* @param callFrame - The call frame to compute the ID for
* @returns A portable string identifying the call frame
*
* @param callFrame - The call frame to compute the ID for.
* @returns A portable string uniquely identifying the call frame.
*
* @param callFrame - The call frame to compute the ID for.
* @returns A portable string uniquely identifying the call frame.
*/
function computeCallFrameId(callFrame: ICallFrame): string {
const { url, lineNumber, columnNumber, functionName } = callFrame;
Expand All @@ -26,6 +39,14 @@ function computeCallFrameId(callFrame: ICallFrame): string {
* Adds the contents of a .cpuprofile file to a summary.
* @param filePath - The path to the .cpuprofile file to read
* @param accumulator - The summary to add the profile to
* Reads and parses a `.cpuprofile` file from disk, then adds its data to a profile summary.
*
* @param filePath - The path to the `.cpuprofile` file to read.
* @param accumulator - The summary to add the parsed profile data to.
* Reads and parses a `.cpuprofile` file from disk, then adds its data to a profile summary.
*
* @param filePath - The path to the `.cpuprofile` file to read.
* @param accumulator - The summary to add the parsed profile data to.
*/
function addFileToSummary(filePath: string, accumulator: IProfileSummary): void {
const profile: ICpuProfile = JSON.parse(fs.readFileSync(filePath, 'utf8'));
Expand All @@ -37,6 +58,20 @@ function addFileToSummary(filePath: string, accumulator: IProfileSummary): void
* @param profile - The profile to add
* @param accumulator - The summary to add the profile to
* @returns
* Aggregates CPU profile data into a summary map.
* Handles recursive frames by ensuring totalTime is computed
* via traversal instead of naive summation.
*
* @param profile - The parsed `.cpuprofile` data.
* @param accumulator - A Map keyed by callFrameId with summary info.
* @returns The updated accumulator with the new profile included.
* Aggregates CPU profile data into a summary map.
* Handles recursive frames by ensuring totalTime is computed
* via traversal instead of naive summation.
*
* @param profile - The parsed `.cpuprofile` data.
* @param accumulator - A Map keyed by callFrameId with summary info.
* @returns The updated accumulator with the new profile included.
*/
function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary): IProfileSummary {
const { nodes, samples, timeDeltas, startTime, endTime }: ICpuProfile = profile;
Expand All @@ -53,17 +88,20 @@ function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary)
return index;
}

// Initialize local time info for all nodes and establish nodeId -> index mapping
for (let i: number = 0; i < nodes.length; i++) {
localTimes.push({
self: 0,
contributors: undefined
});
localTimes.push({ self: 0 });
// Initialize local time info for all nodes
for (let i = 0; i < nodes.length; i++) {
localTimes.push({ self: 0 });

const { id } = nodes[i];
// Ensure that the mapping entry has been created.
getIndexFromNodeId(id);
}

// Distribute time samples across nodes
// Distribute time samples across nodes
const duration: number = endTime - startTime;
let lastNodeTime: number = duration - timeDeltas[0];
for (let i: number = 0; i < timeDeltas.length - 1; i++) {
Expand All @@ -73,11 +111,14 @@ function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary)
lastNodeTime -= sampleDuration;
}

// Add remaining time to the last sample
// Add remaining time to the last sample
localTimes[getIndexFromNodeId(samples[samples.length - 1])].self += lastNodeTime;

// Have to pick the maximum totalTime for a given frame,
// Group nodes by frameId
// Group nodes by frameId
const nodesByFrame: Map<string, Set<number>> = new Map();

for (let i: number = 0; i < nodes.length; i++) {
const { callFrame } = nodes[i];
const frameId: string = computeCallFrameId(callFrame);
Expand All @@ -90,9 +131,9 @@ function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary)
}
}

// Summarize per-frame data
// Summarize per-frame data
for (const [frameId, contributors] of nodesByFrame) {
// To compute the total time spent in a node, we need to sum the self time of all contributors.
// We can't simply add up total times because a frame can recurse.
let selfTime: number = 0;
let totalTime: number = 0;

Expand All @@ -107,6 +148,8 @@ function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary)
selfTime += localTime.self;
}

// Traverse children to compute total time
// Traverse children to compute total time
const queue: Set<number> = new Set(contributors);
for (const nodeIndex of queue) {
totalTime += localTimes[nodeIndex].self;
Expand Down Expand Up @@ -134,7 +177,6 @@ function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary)
url,
lineNumber,
columnNumber,

selfTime,
totalTime
});
Expand All @@ -147,27 +189,48 @@ function addProfileToSummary(profile: ICpuProfile, accumulator: IProfileSummary)
return accumulator;
}

const { parentPort } = worker_threads;
if (parentPort) {
const pp: MessagePort = parentPort;
const messageHandler = (message: IMessageToWorker): void => {
if (message === false) {
// Shutdown signal.
parentPort.removeListener('message', messageHandler);
parentPort.close();
pp.removeListener('message', messageHandler);
pp.close();
return;
}

try {
const summary: IProfileSummary = new Map();
addFileToSummary(message, summary);
pp.postMessage({ file: message, data: summary });
} catch (error: unknown) {
if (error instanceof Error) {
pp.postMessage({
file: message,
data: error.stack ?? error.message
});
} else {
pp.postMessage({
file: message,
data: String(error)
});
}
parentPort.postMessage({ file: message, data: summary });
} catch (error) {
parentPort.postMessage({
file: message,
data: error.stack || error.message
});
} catch (error: unknown) {
if (error instanceof Error) {
parentPort.postMessage({
file: message,
data: error.stack ?? error.message
});
} else {
parentPort.postMessage({
file: message,
data: String(error)
});
}
}
};

parentPort.on('message', messageHandler);
pp.on('message', messageHandler);
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@rushstack/cpu-profile-summarizer",
"comment": "Improve error handling in the worker in cases when the error thrown is not an instance of `Error`.",
"type": "none"
}
],
"packageName": "@rushstack/cpu-profile-summarizer"
}
Loading