From 854134733feb3133e5a48162e394e539bf10fb5e Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 16 May 2023 16:26:56 +0100 Subject: [PATCH 1/2] HPCC-28959 Update spill stats whilst graph active & on failure Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 12 ++++++------ system/jlib/jstatcodes.h | 4 +++- system/jlib/jstats.cpp | 12 ++++++++++-- thorlcr/activities/thdiskbase.cpp | 6 ------ thorlcr/activities/thdiskbase.ipp | 1 - thorlcr/activities/thdiskbaseslave.cpp | 2 ++ thorlcr/graph/thgraph.cpp | 21 ++++++++++++++++----- thorlcr/graph/thgraph.hpp | 4 +++- thorlcr/graph/thgraphmaster.cpp | 12 ++++++------ thorlcr/graph/thgraphmaster.ipp | 6 +++--- thorlcr/graph/thgraphslave.cpp | 11 ++++++++++- thorlcr/master/thgraphmanager.cpp | 4 ++-- thorlcr/thorutil/thormisc.cpp | 2 +- 13 files changed, 62 insertions(+), 35 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 8e3ddff04e0..5bb245bf2d7 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2706,17 +2706,17 @@ void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & t filter.addScope(""); filter.setIncludeNesting(2); filter.addSource("global"); - filter.addOutputStatistic(StSizeSpillFile); - filter.addOutputStatistic(StSizePeakSpillFile); + filter.addOutputStatistic(StSizeActiveSpillFile); + filter.addOutputStatistic(StPeakSizeNodeSpillFile); filter.finishedFilter(); Owned it = &wu->getScopeIterator(filter); for (it->first(); it->isValid(); ) { stat_type value = 0; - if (it->getStat(StSizeSpillFile, value)) + if (it->getStat(StSizeActiveSpillFile, value)) { totalSizeSpill += value; - if (it->getStat(StSizePeakSpillFile, value)) + if (it->getStat(StPeakSizeNodeSpillFile, value)) { if (value>peakSizeSpill) peakSizeSpill = value; @@ -2736,8 +2736,8 @@ void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scop gatherSpillSize(wu, scope, totalSizeSpill, peakSizeSpill); if (totalSizeSpill) { - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizePeakSpillFile, nullptr, peakSizeSpill, 1, 0, StatsMergeReplace); - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeSpillFile, nullptr, totalSizeSpill, 1, 0, StatsMergeReplace); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StPeakSizeNodeSpillFile, nullptr, peakSizeSpill, 1, 0, StatsMergeReplace); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeActiveSpillFile, nullptr, totalSizeSpill, 1, 0, StatsMergeMax); } } //--------------------------------------------------------------------------------------------------------------------- diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index f45a74a72f3..fc444194f1f 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -97,6 +97,7 @@ enum StatisticMeasure SMeasureId, // An Id for an element SMeasureFilename, // A filename SMeasureCost, // Used to measure cost + SMeasurePeakSize, SMeasureMax, }; @@ -269,7 +270,6 @@ enum StatisticKind StCycleLeafFetchCycles, StTimeBlobFetch, StCycleBlobFetchCycles, - StSizePeakSpillFile, StTimeAgentQueue, StCycleAgentQueueCycles, StTimeIBYTIDelay, @@ -279,6 +279,8 @@ enum StatisticKind StWhenK8sLaunched, StWhenK8sStarted, StWhenK8sReady, + StPeakSizeNodeSpillFile, + StSizeActiveSpillFile, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index ec92e234aaa..d7cfc4dd152 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -70,7 +70,7 @@ void setStatisticsComponentName(StatisticCreatorType processType, const char * p //-------------------------------------------------------------------------------------------------------------------- // Textual forms of the different enumerations, first items are for none and all. -static constexpr const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", "en", "txt", "bool", "id", "fname", "cost", NULL }; +static constexpr const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", "en", "txt", "bool", "id", "fname", "cost", "peaksz", NULL }; static constexpr const char * const creatorTypeNames[]= { "", "all", "unknown", "hthor", "roxie", "roxie:s", "thor", "thor:m", "thor:s", "eclcc", "esp", "summary", NULL }; static constexpr const char * const scopeTypeNames[] = { "", "all", "global", "graph", "subgraph", "activity", "allocator", "section", "compile", "dfu", "edge", "function", "workflow", "child", "file", "channel", "unknown", nullptr }; @@ -406,6 +406,7 @@ StringBuffer & formatStatistic(StringBuffer & out, unsigned __int64 value, Stati case SMeasureCount: return out.append(value); case SMeasureSize: + case SMeasurePeakSize: return formatSize(out, value); case SMeasureLoad: return formatLoad(out, value); @@ -488,6 +489,7 @@ stat_type readStatisticValue(const char * cur, const char * * end, StatisticMeas break; case SMeasureCount: case SMeasureSize: + case SMeasurePeakSize: //Allow K, M, G as scaling suffixes if (next[0] == 'K') { @@ -650,6 +652,7 @@ const char * queryMeasurePrefix(StatisticMeasure measure) case SMeasureTimestampUs: return "When"; case SMeasureCount: return "Num"; case SMeasureSize: return "Size"; + case SMeasurePeakSize: return "PeakSize"; case SMeasureLoad: return "Load"; case SMeasureSkew: return "Skew"; case SMeasureNode: return "Node"; @@ -713,6 +716,7 @@ static constexpr StatsMergeAction queryMergeMode(StatisticMeasure measure) (measure == SMeasureId) ? StatsMergeKeepNonZero : (measure == SMeasureFilename) ? StatsMergeKeepNonZero : (measure == SMeasureCost) ? StatsMergeSum : + (measure == SMeasurePeakSize) ? StatsMergeMax : StatsMergeSum; } @@ -784,6 +788,7 @@ static constexpr StatsMergeAction queryMergeMode(StatisticMeasure measure) #define CYCLESTAT(y) St##Cycle##y##Cycles, SMeasureCycle, StatsMergeSum, St##Time##y, St##Cycle##y##Cycles, { NAMES(Cycle, y##Cycles) }, { TAGS(Cycle, y##Cycles) } #define ENUMSTAT(y) STAT(Enum, y, SMeasureEnum) #define COSTSTAT(y) STAT(Cost, y, SMeasureCost) +#define PEAKSIZESTAT(y) STAT(PeakSize, y, SMeasurePeakSize) //-------------------------------------------------------------------------------------------------------------------- class StatisticMeta @@ -951,7 +956,6 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(LeafFetch) }, { TIMESTAT(BlobFetch) }, { CYCLESTAT(BlobFetch) }, - { SIZESTAT(PeakSpillFile) }, { TIMESTAT(AgentQueue) }, { CYCLESTAT(AgentQueue) }, { TIMESTAT(IBYTIDelay) }, @@ -961,6 +965,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { WHENFIRSTSTAT(K8sLaunched) }, { WHENFIRSTSTAT(K8sStarted) }, { WHENFIRSTSTAT(K8sReady) }, + { PEAKSIZESTAT(NodeSpillFile) }, + { SIZESTAT(ActiveSpillFile)}, }; @@ -1211,6 +1217,7 @@ inline void mergeUpdate(StatisticMeasure measure, unsigned __int64 & value, cons case SMeasureLoad: case SMeasureSkew: case SMeasureCycle: + case SMeasurePeakSize: value += otherValue; break; case SMeasureTimestampUs: @@ -2835,6 +2842,7 @@ static bool isSignificantRange(StatisticKind kind, unsigned __int64 range, unsig insignificantDiff = 1000; // Ignore 1us timing difference between nodes break; case SMeasureSize: + case SMeasurePeakSize: insignificantDiff = 1024; break; } diff --git a/thorlcr/activities/thdiskbase.cpp b/thorlcr/activities/thdiskbase.cpp index a1e722532dc..dae7d5d870f 100644 --- a/thorlcr/activities/thdiskbase.cpp +++ b/thorlcr/activities/thdiskbase.cpp @@ -441,12 +441,6 @@ void CWriteMasterBase::slaveDone(size32_t slaveIdx, MemoryBuffer &mb) } } -void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats) -{ - CMasterActivity::getActivityStats(stats); -} - - ///////////////// rowcount_t getCount(CActivityBase &activity, unsigned partialResults, rowcount_t limit, mptag_t mpTag) { diff --git a/thorlcr/activities/thdiskbase.ipp b/thorlcr/activities/thdiskbase.ipp index fd7dad626c1..d3dad2be550 100644 --- a/thorlcr/activities/thdiskbase.ipp +++ b/thorlcr/activities/thdiskbase.ipp @@ -64,7 +64,6 @@ public: virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave); virtual void done(); virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb); - virtual void getActivityStats(IStatisticGatherer & stats); }; diff --git a/thorlcr/activities/thdiskbaseslave.cpp b/thorlcr/activities/thdiskbaseslave.cpp index 4bd4a3e15e8..86534f6b624 100644 --- a/thorlcr/activities/thdiskbaseslave.cpp +++ b/thorlcr/activities/thdiskbaseslave.cpp @@ -556,6 +556,8 @@ void CDiskWriteSlaveActivityBase::gatherActiveStats(CRuntimeStatisticCollection { PARENT::gatherActiveStats(activeStats); mergeStats(activeStats, outputIO, diskWriteRemoteStatistics); + if (tmpUsage && outputIO ) // Update tmpUsage file size (Needed to calc inter-graph spill stats) + tmpUsage->setSize(outputIO->getStatistic(StSizeDiskWrite)); activeStats.setStatistic(StPerReplicated, replicateDone); } diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index 9246c04734b..f1b0999e4f3 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -2375,21 +2375,32 @@ void CGraphTempHandler::clearTemps() tmpFiles.kill(); } -void CGraphTempHandler::serializeUsageStats(MemoryBuffer &mb, graph_id gid) +void CGraphTempHandler::getUsageStats(graph_id gid, offset_t & graphSpillSize) { CriticalBlock b(crit); Owned iter = getIterator(); - offset_t activeSpillSize = 0; - offset_t graphSpillSize = 0; + graphSpillSize = 0; ForEach(*iter) { CFileUsageEntry &entry = iter->query(); if (entry.queryGraphId() == gid) graphSpillSize += entry.getSize(); - activeSpillSize += entry.getSize(); } +} + +void CGraphTempHandler::serializeUsageStats(MemoryBuffer &mb, graph_id gid) +{ + offset_t graphSpillSize; + getUsageStats(gid, graphSpillSize); mb.append(graphSpillSize); - mb.append(activeSpillSize); +} + +void CGraphTempHandler::setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) +{ + offset_t graphSpillSize; + getUsageStats(gid, graphSpillSize); + rsc.setStatistic(StSizeActiveSpillFile, graphSpillSize); + rsc.setStatistic(StPeakSizeNodeSpillFile, graphSpillSize); // StatsMergeMax should track the peak } ///// diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index 967dcda7a45..cce9224ea71 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -225,8 +225,8 @@ interface IGraphTempHandler : extends IInterface static void serializeNullUsageStats(MemoryBuffer &mb) { mb.append((offset_t)0); - mb.append((offset_t)0); } + virtual void setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) = 0; }; class CGraphDependency : public CInterface @@ -560,6 +560,7 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter mutable CriticalSection crit; bool errorOnMissing; + void getUsageStats(graph_id gid, offset_t & graphSpillSize); public: IMPLEMENT_IINTERFACE; @@ -592,6 +593,7 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter return new CIterator(tmpFiles); } virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) override; + virtual void setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) override; }; class graph_decl CGraphStub : public CInterface, implements IThorChildGraph diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 8d98a68b7ab..639b8b5f741 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -2691,7 +2691,7 @@ bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract) sendActivityInitData(); // has to be done at least once // NB: At this point, on the slaves, the graphs will start } - totalActiveSpillSize = graphSpillSize = 0; + totalActiveSpillSize = peakNodeSpillFile = 0; CGraphBase::preStart(parentExtractSz, parentExtract); if (isGlobal()) { @@ -2722,11 +2722,11 @@ void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb) sdMb.setBuffer(len, (void *)d); act->slaveDone(node, sdMb); } - offset_t activeSpillSize, nodeGraphSpill; + offset_t nodeGraphSpill; mb.read(nodeGraphSpill); - mb.read(activeSpillSize); - totalActiveSpillSize += activeSpillSize; - graphSpillSize += nodeGraphSpill; + totalActiveSpillSize += nodeGraphSpill; + if (nodeGraphSpill>peakNodeSpillFile) + peakNodeSpillFile = nodeGraphSpill; } void CMasterGraph::getFinalProgress() @@ -2806,7 +2806,7 @@ void CMasterGraph::getFinalProgress() } } } - jobM->updateActiveSpillSize(graphSpillSize, totalActiveSpillSize); + jobM->updateActiveSpillSize(totalActiveSpillSize, peakNodeSpillFile); } void CMasterGraph::done() diff --git a/thorlcr/graph/thgraphmaster.ipp b/thorlcr/graph/thgraphmaster.ipp index 3f6fc009cbc..4cdbe94652c 100644 --- a/thorlcr/graph/thgraphmaster.ipp +++ b/thorlcr/graph/thgraphmaster.ipp @@ -94,7 +94,7 @@ class graphmaster_decl CMasterGraph : public CGraphBase bool sentGlobalInit = false; CThorStatsCollection graphStats; offset_t totalActiveSpillSize = 0; // total inter-graph spill - offset_t graphSpillSize = 0; + offset_t peakNodeSpillFile = 0; CReplyCancelHandler activityInitMsgHandler, bcastMsgHandler, executeReplyMsgHandler; @@ -202,10 +202,10 @@ public: dirty = true; } // Track spills - virtual void updateActiveSpillSize(offset_t graphSpillSize, offset_t activeSpillSize) + virtual void updateActiveSpillSize(offset_t graphSpillSize, offset_t peakNodeSpillSize) { totalSpillSize.fetch_add(graphSpillSize); - peakSpillSize.store_max(activeSpillSize); + peakSpillSize.store_max(peakNodeSpillSize); } virtual offset_t getTotalSpillSize() const { return totalSpillSize; } virtual offset_t getPeakSpillSize() const { return peakSpillSize; } diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 97a82881fa8..a37ec6130b5 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1254,10 +1254,19 @@ void CSlaveGraph::done() bool CSlaveGraph::serializeStats(MemoryBuffer &mb) { unsigned beginPos = mb.length(); - mb.append(queryGraphId()); + graph_id gid = queryGraphId(); + mb.append(gid); CRuntimeStatisticCollection stats(graphStatistics); stats.setStatistic(StNumExecutions, numExecuted); + if (!owner) + queryJob().queryTempHandler()->setUsageStats(stats, gid); + else + { + IGraphTempHandler *tempHandler = queryTempHandler(false); + if (tempHandler) + tempHandler->setUsageStats(stats, gid); + } stats.serialize(mb); unsigned cPos = mb.length(); diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 4c9238e9d9c..f4556af3ea3 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1104,9 +1104,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, offset_t totalSpillSize = job->getTotalSpillSize(); if (totalSpillSize) { - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizeSpillFile, NULL, totalSpillSize, 1, 0, StatsMergeAppend); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizeActiveSpillFile, NULL, totalSpillSize, 1, 0, StatsMergeReplace); offset_t peakSpillSize = job->getPeakSpillSize(); - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizePeakSpillFile, NULL, peakSpillSize, 1, 0, StatsMergeAppend); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StPeakSizeNodeSpillFile, NULL, peakSpillSize, 1, 0, StatsMergeReplace); } removeJob(*job); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index d523779c525..3f9ac9b469a 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -87,7 +87,7 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead}, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizePeakSpillFile}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StPeakSizeNodeSpillFile, StSizeActiveSpillFile}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics); From 2bb0202b291f70990c25fa0c87bba6d5e721ccb6 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Tue, 23 May 2023 12:58:36 +0100 Subject: [PATCH 2/2] wip --- initfiles/etc/DIR_NAME/environment.xml.in | 3 ++- testing/regress/ecl/graphspill3.ecl | 20 ++++++++++++++++++++ thorlcr/graph/thgraph.cpp | 22 ++++------------------ thorlcr/graph/thgraph.hpp | 11 ++--------- thorlcr/graph/thgraphmaster.cpp | 5 ----- thorlcr/graph/thgraphslave.cpp | 18 +++++------------- 6 files changed, 33 insertions(+), 46 deletions(-) create mode 100755 testing/regress/ecl/graphspill3.ecl diff --git a/initfiles/etc/DIR_NAME/environment.xml.in b/initfiles/etc/DIR_NAME/environment.xml.in index 14a6ada1e06..f3df86d1d22 100644 --- a/initfiles/etc/DIR_NAME/environment.xml.in +++ b/initfiles/etc/DIR_NAME/environment.xml.in @@ -922,6 +922,7 @@ daliServers="mydali" description="Thor process" fileCacheLimit="1800" + globalMemorySize="200" heapRetainMemory="false" heapUseHugePages="false" heapUseTransparentHugePages="true" @@ -935,7 +936,7 @@ replicateAsync="false" replicateOutputs="false" slaveport="20100" - slavesPerNode="1" + slavesPerNode="2" watchdogEnabled="true" watchdogProgressEnabled="true"> diff --git a/testing/regress/ecl/graphspill3.ecl b/testing/regress/ecl/graphspill3.ecl new file mode 100755 index 00000000000..4ef23a999e2 --- /dev/null +++ b/testing/regress/ecl/graphspill3.ecl @@ -0,0 +1,20 @@ +#option('pickBestEngine', false); + +numRecs := 10; +rec := RECORD + unsigned8 id; + unsigned8 other; + string984 rest := 'rest'; + string24 to1k := '1k'; +END; + +ds := DATASET(numRecs, TRANSFORM(rec, SELF.id := HASH(COUNTER), SELF.other := COUNTER), DISTRIBUTED); + +sd1 := SORT(NOFOLD(ds), id); +sd2 := SORT(NOFOLD(ds), other); +sd2p := PROJECT(sd2, TRANSFORM({sd2.id}, SELF := LEFT)); + +PARALLEL( + OUTPUT(DEDUP(sd1, other, HASH)); + OUTPUT(DEDUP(sd2p, id, HASH)); +); diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index f1b0999e4f3..02c08dfc448 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -2375,33 +2375,19 @@ void CGraphTempHandler::clearTemps() tmpFiles.kill(); } -void CGraphTempHandler::getUsageStats(graph_id gid, offset_t & graphSpillSize) +offset_t CGraphTempHandler::getUsageStats() { CriticalBlock b(crit); Owned iter = getIterator(); - graphSpillSize = 0; + offset_t activeSpillSize = 0; ForEach(*iter) { CFileUsageEntry &entry = iter->query(); - if (entry.queryGraphId() == gid) - graphSpillSize += entry.getSize(); + activeSpillSize += entry.getSize(); } + return activeSpillSize; } -void CGraphTempHandler::serializeUsageStats(MemoryBuffer &mb, graph_id gid) -{ - offset_t graphSpillSize; - getUsageStats(gid, graphSpillSize); - mb.append(graphSpillSize); -} - -void CGraphTempHandler::setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) -{ - offset_t graphSpillSize; - getUsageStats(gid, graphSpillSize); - rsc.setStatistic(StSizeActiveSpillFile, graphSpillSize); - rsc.setStatistic(StPeakSizeNodeSpillFile, graphSpillSize); // StatsMergeMax should track the peak -} ///// class CGraphExecutor; diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index cce9224ea71..fabed36d9a3 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -221,12 +221,7 @@ interface IGraphTempHandler : extends IInterface virtual void deregisterFile(const char *name, bool kept=false) = 0; virtual void clearTemps() = 0; virtual IFileUsageIterator *getIterator() = 0; - virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) = 0; - static void serializeNullUsageStats(MemoryBuffer &mb) - { - mb.append((offset_t)0); - } - virtual void setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) = 0; + virtual offset_t getUsageStats() = 0; }; class CGraphDependency : public CInterface @@ -560,7 +555,6 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter mutable CriticalSection crit; bool errorOnMissing; - void getUsageStats(graph_id gid, offset_t & graphSpillSize); public: IMPLEMENT_IINTERFACE; @@ -592,8 +586,7 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter }; return new CIterator(tmpFiles); } - virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) override; - virtual void setUsageStats(CRuntimeStatisticCollection &rsc, graph_id gid) override; + virtual offset_t getUsageStats() override; }; class graph_decl CGraphStub : public CInterface, implements IThorChildGraph diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 639b8b5f741..39e2f795cdd 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -2722,11 +2722,6 @@ void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb) sdMb.setBuffer(len, (void *)d); act->slaveDone(node, sdMb); } - offset_t nodeGraphSpill; - mb.read(nodeGraphSpill); - totalActiveSpillSize += nodeGraphSpill; - if (nodeGraphSpill>peakNodeSpillFile) - peakNodeSpillFile = nodeGraphSpill; } void CMasterGraph::getFinalProgress() diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index a37ec6130b5..75e78ceaf52 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1259,14 +1259,17 @@ bool CSlaveGraph::serializeStats(MemoryBuffer &mb) CRuntimeStatisticCollection stats(graphStatistics); stats.setStatistic(StNumExecutions, numExecuted); + offset_t graphSpillSize = 0; if (!owner) - queryJob().queryTempHandler()->setUsageStats(stats, gid); + graphSpillSize = queryJob().queryTempHandler()->getUsageStats(); else { IGraphTempHandler *tempHandler = queryTempHandler(false); if (tempHandler) - tempHandler->setUsageStats(stats, gid); + graphSpillSize = tempHandler->getUsageStats(); } + + stats.mergeStatistic(StPeakSizeNodeSpillFile, graphSpillSize); stats.serialize(mb); unsigned cPos = mb.length(); @@ -1340,17 +1343,6 @@ void CSlaveGraph::serializeDone(MemoryBuffer &mb) } } mb.writeDirect(cPos, sizeof(count), &count); - - if (!owner) - queryJob().queryTempHandler()->serializeUsageStats(mb, gid); - else - { - IGraphTempHandler *tempHandler = queryTempHandler(false); - if (tempHandler) - tempHandler->serializeUsageStats(mb, gid); - else - IGraphTempHandler::serializeNullUsageStats(mb); - } } void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)