Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2706,17 +2706,17 @@ void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & t
filter.addScope("");
filter.setIncludeNesting(2);
filter.addSource("global");
filter.addOutputStatistic(StSizeSpillFile);
filter.addOutputStatistic(StSizePeakSpillFile);
filter.addOutputStatistic(StSizeActiveSpillFile);
filter.addOutputStatistic(StPeakSizeNodeSpillFile);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
for (it->first(); it->isValid(); )
{
stat_type value = 0;
if (it->getStat(StSizeSpillFile, value))
if (it->getStat(StSizeActiveSpillFile, value))
{
totalSizeSpill += value;
if (it->getStat(StSizePeakSpillFile, value))
if (it->getStat(StPeakSizeNodeSpillFile, value))
{
if (value>peakSizeSpill)
peakSizeSpill = value;
Expand All @@ -2736,8 +2736,8 @@ void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scop
gatherSpillSize(wu, scope, totalSizeSpill, peakSizeSpill);
if (totalSizeSpill)
{
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizePeakSpillFile, nullptr, peakSizeSpill, 1, 0, StatsMergeReplace);
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeSpillFile, nullptr, totalSizeSpill, 1, 0, StatsMergeReplace);
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StPeakSizeNodeSpillFile, nullptr, peakSizeSpill, 1, 0, StatsMergeReplace);
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeActiveSpillFile, nullptr, totalSizeSpill, 1, 0, StatsMergeMax);
}
}
//---------------------------------------------------------------------------------------------------------------------
Expand Down
3 changes: 2 additions & 1 deletion initfiles/etc/DIR_NAME/environment.xml.in
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,7 @@
daliServers="mydali"
description="Thor process"
fileCacheLimit="1800"
globalMemorySize="200"
heapRetainMemory="false"
heapUseHugePages="false"
heapUseTransparentHugePages="true"
Expand All @@ -935,7 +936,7 @@
replicateAsync="false"
replicateOutputs="false"
slaveport="20100"
slavesPerNode="1"
slavesPerNode="2"
watchdogEnabled="true"
watchdogProgressEnabled="true">
<SwapNode AutoSwapNode="false"/>
Expand Down
4 changes: 3 additions & 1 deletion system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ enum StatisticMeasure
SMeasureId, // An Id for an element
SMeasureFilename, // A filename
SMeasureCost, // Used to measure cost
SMeasurePeakSize,
SMeasureMax,
};

Expand Down Expand Up @@ -269,7 +270,6 @@ enum StatisticKind
StCycleLeafFetchCycles,
StTimeBlobFetch,
StCycleBlobFetchCycles,
StSizePeakSpillFile,
StTimeAgentQueue,
StCycleAgentQueueCycles,
StTimeIBYTIDelay,
Expand All @@ -279,6 +279,8 @@ enum StatisticKind
StWhenK8sLaunched,
StWhenK8sStarted,
StWhenK8sReady,
StPeakSizeNodeSpillFile,
StSizeActiveSpillFile,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
12 changes: 10 additions & 2 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ void setStatisticsComponentName(StatisticCreatorType processType, const char * p
//--------------------------------------------------------------------------------------------------------------------

// Textual forms of the different enumerations, first items are for none and all.
static constexpr const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", "en", "txt", "bool", "id", "fname", "cost", NULL };
static constexpr const char * const measureNames[] = { "", "all", "ns", "ts", "cnt", "sz", "cpu", "skw", "node", "ppm", "ip", "cy", "en", "txt", "bool", "id", "fname", "cost", "peaksz", NULL };
static constexpr const char * const creatorTypeNames[]= { "", "all", "unknown", "hthor", "roxie", "roxie:s", "thor", "thor:m", "thor:s", "eclcc", "esp", "summary", NULL };
static constexpr const char * const scopeTypeNames[] = { "", "all", "global", "graph", "subgraph", "activity", "allocator", "section", "compile", "dfu", "edge", "function", "workflow", "child", "file", "channel", "unknown", nullptr };

Expand Down Expand Up @@ -406,6 +406,7 @@ StringBuffer & formatStatistic(StringBuffer & out, unsigned __int64 value, Stati
case SMeasureCount:
return out.append(value);
case SMeasureSize:
case SMeasurePeakSize:
return formatSize(out, value);
case SMeasureLoad:
return formatLoad(out, value);
Expand Down Expand Up @@ -488,6 +489,7 @@ stat_type readStatisticValue(const char * cur, const char * * end, StatisticMeas
break;
case SMeasureCount:
case SMeasureSize:
case SMeasurePeakSize:
//Allow K, M, G as scaling suffixes
if (next[0] == 'K')
{
Expand Down Expand Up @@ -650,6 +652,7 @@ const char * queryMeasurePrefix(StatisticMeasure measure)
case SMeasureTimestampUs: return "When";
case SMeasureCount: return "Num";
case SMeasureSize: return "Size";
case SMeasurePeakSize: return "PeakSize";
case SMeasureLoad: return "Load";
case SMeasureSkew: return "Skew";
case SMeasureNode: return "Node";
Expand Down Expand Up @@ -713,6 +716,7 @@ static constexpr StatsMergeAction queryMergeMode(StatisticMeasure measure)
(measure == SMeasureId) ? StatsMergeKeepNonZero :
(measure == SMeasureFilename) ? StatsMergeKeepNonZero :
(measure == SMeasureCost) ? StatsMergeSum :
(measure == SMeasurePeakSize) ? StatsMergeMax :
StatsMergeSum;
}

Expand Down Expand Up @@ -784,6 +788,7 @@ static constexpr StatsMergeAction queryMergeMode(StatisticMeasure measure)
#define CYCLESTAT(y) St##Cycle##y##Cycles, SMeasureCycle, StatsMergeSum, St##Time##y, St##Cycle##y##Cycles, { NAMES(Cycle, y##Cycles) }, { TAGS(Cycle, y##Cycles) }
#define ENUMSTAT(y) STAT(Enum, y, SMeasureEnum)
#define COSTSTAT(y) STAT(Cost, y, SMeasureCost)
#define PEAKSIZESTAT(y) STAT(PeakSize, y, SMeasurePeakSize)
//--------------------------------------------------------------------------------------------------------------------

class StatisticMeta
Expand Down Expand Up @@ -951,7 +956,6 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ CYCLESTAT(LeafFetch) },
{ TIMESTAT(BlobFetch) },
{ CYCLESTAT(BlobFetch) },
{ SIZESTAT(PeakSpillFile) },
{ TIMESTAT(AgentQueue) },
{ CYCLESTAT(AgentQueue) },
{ TIMESTAT(IBYTIDelay) },
Expand All @@ -961,6 +965,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ WHENFIRSTSTAT(K8sLaunched) },
{ WHENFIRSTSTAT(K8sStarted) },
{ WHENFIRSTSTAT(K8sReady) },
{ PEAKSIZESTAT(NodeSpillFile) },
{ SIZESTAT(ActiveSpillFile)},
};


Expand Down Expand Up @@ -1211,6 +1217,7 @@ inline void mergeUpdate(StatisticMeasure measure, unsigned __int64 & value, cons
case SMeasureLoad:
case SMeasureSkew:
case SMeasureCycle:
case SMeasurePeakSize:
value += otherValue;
break;
case SMeasureTimestampUs:
Expand Down Expand Up @@ -2835,6 +2842,7 @@ static bool isSignificantRange(StatisticKind kind, unsigned __int64 range, unsig
insignificantDiff = 1000; // Ignore 1us timing difference between nodes
break;
case SMeasureSize:
case SMeasurePeakSize:
insignificantDiff = 1024;
break;
}
Expand Down
20 changes: 20 additions & 0 deletions testing/regress/ecl/graphspill3.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#option('pickBestEngine', false);

numRecs := 10;
rec := RECORD
unsigned8 id;
unsigned8 other;
string984 rest := 'rest';
string24 to1k := '1k';
END;

ds := DATASET(numRecs, TRANSFORM(rec, SELF.id := HASH(COUNTER), SELF.other := COUNTER), DISTRIBUTED);

sd1 := SORT(NOFOLD(ds), id);
sd2 := SORT(NOFOLD(ds), other);
sd2p := PROJECT(sd2, TRANSFORM({sd2.id}, SELF := LEFT));

PARALLEL(
OUTPUT(DEDUP(sd1, other, HASH));
OUTPUT(DEDUP(sd2p, id, HASH));
);
6 changes: 0 additions & 6 deletions thorlcr/activities/thdiskbase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -441,12 +441,6 @@ void CWriteMasterBase::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
}
}

void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
{
CMasterActivity::getActivityStats(stats);
}


/////////////////
rowcount_t getCount(CActivityBase &activity, unsigned partialResults, rowcount_t limit, mptag_t mpTag)
{
Expand Down
1 change: 0 additions & 1 deletion thorlcr/activities/thdiskbase.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public:
virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave);
virtual void done();
virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb);
virtual void getActivityStats(IStatisticGatherer & stats);
};


Expand Down
2 changes: 2 additions & 0 deletions thorlcr/activities/thdiskbaseslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,8 @@ void CDiskWriteSlaveActivityBase::gatherActiveStats(CRuntimeStatisticCollection
{
PARENT::gatherActiveStats(activeStats);
mergeStats(activeStats, outputIO, diskWriteRemoteStatistics);
if (tmpUsage && outputIO ) // Update tmpUsage file size (Needed to calc inter-graph spill stats)
tmpUsage->setSize(outputIO->getStatistic(StSizeDiskWrite));
activeStats.setStatistic(StPerReplicated, replicateDone);
}

Expand Down
9 changes: 3 additions & 6 deletions thorlcr/graph/thgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2375,22 +2375,19 @@ void CGraphTempHandler::clearTemps()
tmpFiles.kill();
}

void CGraphTempHandler::serializeUsageStats(MemoryBuffer &mb, graph_id gid)
offset_t CGraphTempHandler::getUsageStats()
{
CriticalBlock b(crit);
Owned<IFileUsageIterator> iter = getIterator();
offset_t activeSpillSize = 0;
offset_t graphSpillSize = 0;
ForEach(*iter)
{
CFileUsageEntry &entry = iter->query();
if (entry.queryGraphId() == gid)
graphSpillSize += entry.getSize();
activeSpillSize += entry.getSize();
}
mb.append(graphSpillSize);
mb.append(activeSpillSize);
return activeSpillSize;
}

/////

class CGraphExecutor;
Expand Down
9 changes: 2 additions & 7 deletions thorlcr/graph/thgraph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,12 +221,7 @@ interface IGraphTempHandler : extends IInterface
virtual void deregisterFile(const char *name, bool kept=false) = 0;
virtual void clearTemps() = 0;
virtual IFileUsageIterator *getIterator() = 0;
virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) = 0;
static void serializeNullUsageStats(MemoryBuffer &mb)
{
mb.append((offset_t)0);
mb.append((offset_t)0);
}
virtual offset_t getUsageStats() = 0;
};

class CGraphDependency : public CInterface
Expand Down Expand Up @@ -591,7 +586,7 @@ class graph_decl CGraphTempHandler : implements IGraphTempHandler, public CInter
};
return new CIterator(tmpFiles);
}
virtual void serializeUsageStats(MemoryBuffer &mb, graph_id gid) override;
virtual offset_t getUsageStats() override;
};

class graph_decl CGraphStub : public CInterface, implements IThorChildGraph
Expand Down
9 changes: 2 additions & 7 deletions thorlcr/graph/thgraphmaster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2691,7 +2691,7 @@ bool CMasterGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
sendActivityInitData(); // has to be done at least once
// NB: At this point, on the slaves, the graphs will start
}
totalActiveSpillSize = graphSpillSize = 0;
totalActiveSpillSize = peakNodeSpillFile = 0;
CGraphBase::preStart(parentExtractSz, parentExtract);
if (isGlobal())
{
Expand Down Expand Up @@ -2722,11 +2722,6 @@ void CMasterGraph::handleSlaveDone(unsigned node, MemoryBuffer &mb)
sdMb.setBuffer(len, (void *)d);
act->slaveDone(node, sdMb);
}
offset_t activeSpillSize, nodeGraphSpill;
mb.read(nodeGraphSpill);
mb.read(activeSpillSize);
totalActiveSpillSize += activeSpillSize;
graphSpillSize += nodeGraphSpill;
}

void CMasterGraph::getFinalProgress()
Expand Down Expand Up @@ -2806,7 +2801,7 @@ void CMasterGraph::getFinalProgress()
}
}
}
jobM->updateActiveSpillSize(graphSpillSize, totalActiveSpillSize);
jobM->updateActiveSpillSize(totalActiveSpillSize, peakNodeSpillFile);
}

void CMasterGraph::done()
Expand Down
6 changes: 3 additions & 3 deletions thorlcr/graph/thgraphmaster.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class graphmaster_decl CMasterGraph : public CGraphBase
bool sentGlobalInit = false;
CThorStatsCollection graphStats;
offset_t totalActiveSpillSize = 0; // total inter-graph spill
offset_t graphSpillSize = 0;
offset_t peakNodeSpillFile = 0;

CReplyCancelHandler activityInitMsgHandler, bcastMsgHandler, executeReplyMsgHandler;

Expand Down Expand Up @@ -202,10 +202,10 @@ public:
dirty = true;
}
// Track spills
virtual void updateActiveSpillSize(offset_t graphSpillSize, offset_t activeSpillSize)
virtual void updateActiveSpillSize(offset_t graphSpillSize, offset_t peakNodeSpillSize)
{
totalSpillSize.fetch_add(graphSpillSize);
peakSpillSize.store_max(activeSpillSize);
peakSpillSize.store_max(peakNodeSpillSize);
}
virtual offset_t getTotalSpillSize() const { return totalSpillSize; }
virtual offset_t getPeakSpillSize() const { return peakSpillSize; }
Expand Down
25 changes: 13 additions & 12 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1254,10 +1254,22 @@ void CSlaveGraph::done()
bool CSlaveGraph::serializeStats(MemoryBuffer &mb)
{
unsigned beginPos = mb.length();
mb.append(queryGraphId());
graph_id gid = queryGraphId();
mb.append(gid);

CRuntimeStatisticCollection stats(graphStatistics);
stats.setStatistic(StNumExecutions, numExecuted);
offset_t graphSpillSize = 0;
if (!owner)
graphSpillSize = queryJob().queryTempHandler()->getUsageStats();
else
{
IGraphTempHandler *tempHandler = queryTempHandler(false);
if (tempHandler)
graphSpillSize = tempHandler->getUsageStats();
}

stats.mergeStatistic(StPeakSizeNodeSpillFile, graphSpillSize);
stats.serialize(mb);

unsigned cPos = mb.length();
Expand Down Expand Up @@ -1331,17 +1343,6 @@ void CSlaveGraph::serializeDone(MemoryBuffer &mb)
}
}
mb.writeDirect(cPos, sizeof(count), &count);

if (!owner)
queryJob().queryTempHandler()->serializeUsageStats(mb, gid);
else
{
IGraphTempHandler *tempHandler = queryTempHandler(false);
if (tempHandler)
tempHandler->serializeUsageStats(mb, gid);
else
IGraphTempHandler::serializeNullUsageStats(mb);
}
}

void CSlaveGraph::getDone(MemoryBuffer &doneInfoMb)
Expand Down
4 changes: 2 additions & 2 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1104,9 +1104,9 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
offset_t totalSpillSize = job->getTotalSpillSize();
if (totalSpillSize)
{
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizeSpillFile, NULL, totalSpillSize, 1, 0, StatsMergeAppend);
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizeActiveSpillFile, NULL, totalSpillSize, 1, 0, StatsMergeReplace);
offset_t peakSpillSize = job->getPeakSpillSize();
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StSizePeakSpillFile, NULL, peakSpillSize, 1, 0, StatsMergeAppend);
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StPeakSizeNodeSpillFile, NULL, peakSpillSize, 1, 0, StatsMergeReplace);
}

removeJob(*job);
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ const StatisticsMapping joinActivityStatistics({StNumLeftRows, StNumRightRows},
const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead}, basicActivityStatistics, diskReadRemoteStatistics);
const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StSizePeakSpillFile}, basicActivityStatistics);
const StatisticsMapping graphStatistics({StNumExecutions, StSizeSpillFile, StPeakSizeNodeSpillFile, StSizeActiveSpillFile}, basicActivityStatistics);
const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);


Expand Down