diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 8e3ddff04e0..5bb245bf2d7 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2706,17 +2706,17 @@ void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & t filter.addScope(""); filter.setIncludeNesting(2); filter.addSource("global"); - filter.addOutputStatistic(StSizeSpillFile); - filter.addOutputStatistic(StSizePeakSpillFile); + filter.addOutputStatistic(StSizeActiveSpillFile); + filter.addOutputStatistic(StPeakSizeNodeSpillFile); filter.finishedFilter(); Owned it = &wu->getScopeIterator(filter); for (it->first(); it->isValid(); ) { stat_type value = 0; - if (it->getStat(StSizeSpillFile, value)) + if (it->getStat(StSizeActiveSpillFile, value)) { totalSizeSpill += value; - if (it->getStat(StSizePeakSpillFile, value)) + if (it->getStat(StPeakSizeNodeSpillFile, value)) { if (value>peakSizeSpill) peakSizeSpill = value; @@ -2736,8 +2736,8 @@ void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scop gatherSpillSize(wu, scope, totalSizeSpill, peakSizeSpill); if (totalSizeSpill) { - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizePeakSpillFile, nullptr, peakSizeSpill, 1, 0, StatsMergeReplace); - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeSpillFile, nullptr, totalSizeSpill, 1, 0, StatsMergeReplace); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StPeakSizeNodeSpillFile, nullptr, peakSizeSpill, 1, 0, StatsMergeReplace); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeActiveSpillFile, nullptr, totalSizeSpill, 1, 0, StatsMergeMax); } } //--------------------------------------------------------------------------------------------------------------------- diff --git a/initfiles/etc/DIR_NAME/environment.xml.in b/initfiles/etc/DIR_NAME/environment.xml.in index 14a6ada1e06..f3df86d1d22 100644 --- a/initfiles/etc/DIR_NAME/environment.xml.in +++ b/initfiles/etc/DIR_NAME/environment.xml.in @@ -922,6 +922,7 @@ daliServers="mydali" description="Thor process" fileCacheLimit="1800" + globalMemorySize="200" heapRetainMemory="false" heapUseHugePages="false" heapUseTransparentHugePages="true" @@ -935,7 +936,7 @@ replicateAsync="false" replicateOutputs="false" slaveport="20100" - slavesPerNode="1" + slavesPerNode="2" watchdogEnabled="true" watchdogProgressEnabled="true"> diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index f45a74a72f3..fc444194f1f 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -97,6 +97,7 @@ enum StatisticMeasure SMeasureId, // An Id for an element SMeasureFilename, // A filename SMeasureCost, // Used to measure cost + SMeasurePeakSize, SMeasureMax, }; @@ -269,7 +270,6 @@ enum StatisticKind StCycleLeafFetchCycles, StTimeBlobFetch, StCycleBlobFetchCycles, - StSizePeakSpillFile, StTimeAgentQueue, StCycleAgentQueueCycles, StTimeIBYTIDelay, @@ -279,6 +279,8 @@ enum StatisticKind StWhenK8sLaunched, StWhenK8sStarted, StWhenK8sReady, + StPeakSizeNodeSpillFile, + StSizeActiveSpillFile, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index ec92e234aaa..d7cfc4dd152 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -70,7 +70,7 @@ void setStatisticsComponentName(StatisticCreatorType processType, const char * p //-------------------------------------------------------------------------------------------------------------------- // Textual forms of the different enumerations, first items are for none and all. -static constexpr const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", "en", "txt", "bool", "id", "fname", "cost", NULL }; +static constexpr const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", "en", "txt", "bool", "id", "fname", "cost", "peaksz", NULL }; static constexpr const char * const creatorTypeNames[]= { "", "all", "unknown", "hthor", "roxie", "roxie:s", "thor", "thor:m", "thor:s", "eclcc", "esp", "summary", NULL }; static constexpr const char * const scopeTypeNames[] = { "", "all", "global", "graph", "subgraph", "activity", "allocator", "section", "compile", "dfu", "edge", "function", "workflow", "child", "file", "channel", "unknown", nullptr }; @@ -406,6 +406,7 @@ StringBuffer & formatStatistic(StringBuffer & out, unsigned __int64 value, Stati case SMeasureCount: return out.append(value); case SMeasureSize: + case SMeasurePeakSize: return formatSize(out, value); case SMeasureLoad: return formatLoad(out, value); @@ -488,6 +489,7 @@ stat_type readStatisticValue(const char * cur, const char * * end, StatisticMeas break; case SMeasureCount: case SMeasureSize: + case SMeasurePeakSize: //Allow K, M, G as scaling suffixes if (next[0] == 'K') { @@ -650,6 +652,7 @@ const char * queryMeasurePrefix(StatisticMeasure measure) case SMeasureTimestampUs: return "When"; case SMeasureCount: return "Num"; case SMeasureSize: return "Size"; + case SMeasurePeakSize: return "PeakSize"; case SMeasureLoad: return "Load"; case SMeasureSkew: return "Skew"; case SMeasureNode: return "Node"; @@ -713,6 +716,7 @@ static constexpr StatsMergeAction queryMergeMode(StatisticMeasure measure) (measure == SMeasureId) ? StatsMergeKeepNonZero : (measure == SMeasureFilename) ? StatsMergeKeepNonZero : (measure == SMeasureCost) ? StatsMergeSum : + (measure == SMeasurePeakSize) ? StatsMergeMax : StatsMergeSum; } @@ -784,6 +788,7 @@ static constexpr StatsMergeAction queryMergeMode(StatisticMeasure measure) #define CYCLESTAT(y) St##Cycle##y##Cycles, SMeasureCycle, StatsMergeSum, St##Time##y, St##Cycle##y##Cycles, { NAMES(Cycle, y##Cycles) }, { TAGS(Cycle, y##Cycles) } #define ENUMSTAT(y) STAT(Enum, y, SMeasureEnum) #define COSTSTAT(y) STAT(Cost, y, SMeasureCost) +#define PEAKSIZESTAT(y) STAT(PeakSize, y, SMeasurePeakSize) //-------------------------------------------------------------------------------------------------------------------- class StatisticMeta @@ -951,7 +956,6 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { CYCLESTAT(LeafFetch) }, { TIMESTAT(BlobFetch) }, { CYCLESTAT(BlobFetch) }, - { SIZESTAT(PeakSpillFile) }, { TIMESTAT(AgentQueue) }, { CYCLESTAT(AgentQueue) }, { TIMESTAT(IBYTIDelay) }, @@ -961,6 +965,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { WHENFIRSTSTAT(K8sLaunched) }, { WHENFIRSTSTAT(K8sStarted) }, { WHENFIRSTSTAT(K8sReady) }, + { PEAKSIZESTAT(NodeSpillFile) }, + { SIZESTAT(ActiveSpillFile)}, }; @@ -1211,6 +1217,7 @@ inline void mergeUpdate(StatisticMeasure measure, unsigned __int64 & value, cons case SMeasureLoad: case SMeasureSkew: case SMeasureCycle: + case SMeasurePeakSize: value += otherValue; break; case SMeasureTimestampUs: @@ -2835,6 +2842,7 @@ static bool isSignificantRange(StatisticKind kind, unsigned __int64 range, unsig insignificantDiff = 1000; // Ignore 1us timing difference between nodes break; case SMeasureSize: + case SMeasurePeakSize: insignificantDiff = 1024; break; } diff --git a/testing/regress/ecl/graphspill3.ecl b/testing/regress/ecl/graphspill3.ecl new file mode 100755 index 00000000000..4ef23a999e2 --- /dev/null +++ b/testing/regress/ecl/graphspill3.ecl @@ -0,0 +1,20 @@ +#option('pickBestEngine', false); + +numRecs := 10; +rec := RECORD + unsigned8 id; + unsigned8 other; + string984 rest := 'rest'; + string24 to1k := '1k'; +END; + +ds := DATASET(numRecs, TRANSFORM(rec, SELF.id := HASH(COUNTER), SELF.other := COUNTER), DISTRIBUTED); + +sd1 := SORT(NOFOLD(ds), id); +sd2 := SORT(NOFOLD(ds), other); +sd2p := PROJECT(sd2, TRANSFORM({sd2.id}, SELF := LEFT)); + +PARALLEL( + OUTPUT(DEDUP(sd1, other, HASH)); + OUTPUT(DEDUP(sd2p, id, HASH)); +); diff --git a/thorlcr/activities/thdiskbase.cpp b/thorlcr/activities/thdiskbase.cpp index a1e722532dc..dae7d5d870f 100644 --- a/thorlcr/activities/thdiskbase.cpp +++ b/thorlcr/activities/thdiskbase.cpp @@ -441,12 +441,6 @@ void CWriteMasterBase::slaveDone(size32_t slaveIdx, MemoryBuffer &mb) } } -void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats) -{ - CMasterActivity::getActivityStats(stats); -} - - ///////////////// rowcount_t getCount(CActivityBase &activity, unsigned partialResults, rowcount_t limit, mptag_t mpTag) { diff --git a/thorlcr/activities/thdiskbase.ipp b/thorlcr/activities/thdiskbase.ipp index fd7dad626c1..d3dad2be550 100644 --- a/thorlcr/activities/thdiskbase.ipp +++ b/thorlcr/activities/thdiskbase.ipp @@ -64,7 +64,6 @@ public: virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave); virtual void done(); virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb); - virtual void getActivityStats(IStatisticGatherer & stats); }; diff --git a/thorlcr/activities/thdiskbaseslave.cpp b/thorlcr/activities/thdiskbaseslave.cpp index 4bd4a3e15e8..86534f6b624 100644 --- a/thorlcr/activities/thdiskbaseslave.cpp +++ b/thorlcr/activities/thdiskbaseslave.cpp @@ -556,6 +556,8 @@ void CDiskWriteSlaveActivityBase::gatherActiveStats(CRuntimeStatisticCollection { PARENT::gatherActiveStats(activeStats); mergeStats(activeStats, outputIO, diskWriteRemoteStatistics); + if (tmpUsage && outputIO ) // Update tmpUsage file size (Needed to calc inter-graph spill stats) + tmpUsage->setSize(outputIO->getStatistic(StSizeDiskWrite)); activeStats.setStatistic(StPerReplicated, replicateDone); } diff --git a/thorlcr/graph/thgraph.cpp b/thorlcr/graph/thgraph.cpp index 9246c04734b..02c08dfc448 100644 --- a/thorlcr/graph/thgraph.cpp +++ b/thorlcr/graph/thgraph.cpp @@ -2375,22 +2375,19 @@ void CGraphTempHandler::clearTemps() tmpFiles.kill(); } -void CGraphTempHandler::serializeUsageStats(MemoryBuffer &mb, graph_id gid) +offset_t CGraphTempHandler::getUsageStats() { CriticalBlock b(crit); Owned iter = getIterator(); offset_t activeSpillSize = 0; - offset_t graphSpillSize = 0; ForEach(*iter) { CFileUsageEntry &entry = iter->query(); - if (entry.queryGraphId() == gid) - graphSpillSize += entry.getSize(); activeSpillSize += entry.getSize(); } - mb.append(graphSpillSize); - mb.append(activeSpillSize); + return activeSpillSize; } + ///// class CGraphExecutor; diff --git a/thorlcr/graph/thgraph.hpp b/thorlcr/graph/thgraph.hpp index 967dcda7a45..fabed36d9a3 100644 --- a/thorlcr/graph/thgraph.hpp +++ b/thorlcr/graph/thgraph.hpp @@ -221,12 +221,7 @@ interface IGraphTempHandler : extends IInterface virtual void deregisterFile(const char *name, bool kept=false) = 0; virtual void clearTemps() = 0; virtual IFileUsageIterator *getIterator() = 0; - virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) = 0; - static void serializeNullUsageStats(MemoryBuffer &mb) - { - mb.append((offset_t)0); - mb.append((offset_t)0); - } + virtual offset_t getUsageStats() = 0; }; class CGraphDependency : public CInterface @@ -591,7 +586,7 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter }; return new CIterator(tmpFiles); } - virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) override; + virtual offset_t getUsageStats() override; }; class graph_decl CGraphStub : public CInterface, implements IThorChildGraph diff --git a/thorlcr/graph/thgraphmaster.cpp b/thorlcr/graph/thgraphmaster.cpp index 8d98a68b7ab..39e2f795cdd 100644 --- a/thorlcr/graph/thgraphmaster.cpp +++ b/thorlcr/graph/thgraphmaster.cpp @@ -2691,7 +2691,7 @@ bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract) sendActivityInitData(); // has to be done at least once // NB: At this point, on the slaves, the graphs will start } - totalActiveSpillSize = graphSpillSize = 0; + totalActiveSpillSize = peakNodeSpillFile = 0; CGraphBase::preStart(parentExtractSz, parentExtract); if (isGlobal()) { @@ -2722,11 +2722,6 @@ void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb) sdMb.setBuffer(len, (void *)d); act->slaveDone(node, sdMb); } - offset_t activeSpillSize, nodeGraphSpill; - mb.read(nodeGraphSpill); - mb.read(activeSpillSize); - totalActiveSpillSize += activeSpillSize; - graphSpillSize += nodeGraphSpill; } void CMasterGraph::getFinalProgress() @@ -2806,7 +2801,7 @@ void CMasterGraph::getFinalProgress() } } } - jobM->updateActiveSpillSize(graphSpillSize, totalActiveSpillSize); + jobM->updateActiveSpillSize(totalActiveSpillSize, peakNodeSpillFile); } void CMasterGraph::done() diff --git a/thorlcr/graph/thgraphmaster.ipp b/thorlcr/graph/thgraphmaster.ipp index 3f6fc009cbc..4cdbe94652c 100644 --- a/thorlcr/graph/thgraphmaster.ipp +++ b/thorlcr/graph/thgraphmaster.ipp @@ -94,7 +94,7 @@ class graphmaster_decl CMasterGraph : public CGraphBase bool sentGlobalInit = false; CThorStatsCollection graphStats; offset_t totalActiveSpillSize = 0; // total inter-graph spill - offset_t graphSpillSize = 0; + offset_t peakNodeSpillFile = 0; CReplyCancelHandler activityInitMsgHandler, bcastMsgHandler, executeReplyMsgHandler; @@ -202,10 +202,10 @@ public: dirty = true; } // Track spills - virtual void updateActiveSpillSize(offset_t graphSpillSize, offset_t activeSpillSize) + virtual void updateActiveSpillSize(offset_t graphSpillSize, offset_t peakNodeSpillSize) { totalSpillSize.fetch_add(graphSpillSize); - peakSpillSize.store_max(activeSpillSize); + peakSpillSize.store_max(peakNodeSpillSize); } virtual offset_t getTotalSpillSize() const { return totalSpillSize; } virtual offset_t getPeakSpillSize() const { return peakSpillSize; } diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 97a82881fa8..75e78ceaf52 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -1254,10 +1254,22 @@ void CSlaveGraph::done() bool CSlaveGraph::serializeStats(MemoryBuffer &mb) { unsigned beginPos = mb.length(); - mb.append(queryGraphId()); + graph_id gid = queryGraphId(); + mb.append(gid); CRuntimeStatisticCollection stats(graphStatistics); stats.setStatistic(StNumExecutions, numExecuted); + offset_t graphSpillSize = 0; + if (!owner) + graphSpillSize = queryJob().queryTempHandler()->getUsageStats(); + else + { + IGraphTempHandler *tempHandler = queryTempHandler(false); + if (tempHandler) + graphSpillSize = tempHandler->getUsageStats(); + } + + stats.mergeStatistic(StPeakSizeNodeSpillFile, graphSpillSize); stats.serialize(mb); unsigned cPos = mb.length(); @@ -1331,17 +1343,6 @@ void CSlaveGraph::serializeDone(MemoryBuffer &mb) } } mb.writeDirect(cPos, sizeof(count), &count); - - if (!owner) - queryJob().queryTempHandler()->serializeUsageStats(mb, gid); - else - { - IGraphTempHandler *tempHandler = queryTempHandler(false); - if (tempHandler) - tempHandler->serializeUsageStats(mb, gid); - else - IGraphTempHandler::serializeNullUsageStats(mb); - } } void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb) diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 4c9238e9d9c..f4556af3ea3 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1104,9 +1104,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, offset_t totalSpillSize = job->getTotalSpillSize(); if (totalSpillSize) { - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizeSpillFile, NULL, totalSpillSize, 1, 0, StatsMergeAppend); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizeActiveSpillFile, NULL, totalSpillSize, 1, 0, StatsMergeReplace); offset_t peakSpillSize = job->getPeakSpillSize(); - wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizePeakSpillFile, NULL, peakSpillSize, 1, 0, StatsMergeAppend); + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StPeakSizeNodeSpillFile, NULL, peakSpillSize, 1, 0, StatsMergeReplace); } removeJob(*job); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index d523779c525..3f9ac9b469a 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -87,7 +87,7 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows}, const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead}, basicActivityStatistics, diskReadRemoteStatistics); const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics); const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics); -const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizePeakSpillFile}, basicActivityStatistics); +const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StPeakSizeNodeSpillFile, StSizeActiveSpillFile}, basicActivityStatistics); const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);