diff --git a/common/thorhelper/thorwrite.cpp b/common/thorhelper/thorwrite.cpp index bf7d684b1db..155bd590a87 100644 --- a/common/thorhelper/thorwrite.cpp +++ b/common/thorhelper/thorwrite.cpp @@ -15,6 +15,7 @@ limitations under the License. ############################################################################## */ #include "jliball.hpp" +#include "jptree.hpp" #include "thorfile.hpp" @@ -176,7 +177,14 @@ void getDefaultWritePlane(StringBuffer & plane, unsigned helperFlags) { //NB: This can only access TDX flags because it is called from readers and writers if (helperFlags & TDXjobtemp) - getDefaultJobTempPlane(plane); + { + // Check if delayJobTempPublish is enabled - if so, use spill plane instead of data plane + bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); + if (delayJobTempPublish) + getDefaultSpillPlane(plane); + else + getDefaultJobTempPlane(plane); + } else if (helperFlags & TDXtemporary) getDefaultSpillPlane(plane); else diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index eb7b4d9f74b..80cdd5e90b0 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -20,6 +20,7 @@ #include "jfile.hpp" #include "jiface.hpp" #include "jprop.hpp" +#include "jptree.hpp" #include "jutil.hpp" @@ -48,10 +49,28 @@ static IThorFileManager *fileManager = NULL; typedef OwningStringHTMapping CIDistributeFileMapping; + +// Structure to track stowed jobtemps +struct StowedJobTemp +{ + StringAttr logicalName; + StringAttr wuid; + Linked file; + + StowedJobTemp(const char *_logicalName, const char *_wuid, IDistributedFile *_file) + : logicalName(_logicalName), wuid(_wuid), file(_file) + { + } +}; + class CFileManager : public CSimpleInterface, implements IThorFileManager { OwningStringSuperHashTableOf fileMap; bool replicateOutputs; + bool delayJobTempPublish; // Cached configuration option + CriticalSection stowedJobTempsCrit; + CIArrayOf stowedJobTemps; + StringAttr currentWuid; // Track the current workunit StringBuffer &_getPublishPhysicalName(CJobBase &job, const char *logicalName, unsigned partno, const char *groupName, IGroup *group, StringBuffer &res) @@ -225,12 +244,58 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager } } + // Helper method to publish all stowed jobtemps (called when transitioning between workunits) + void publishStowedJobTemps(IUserDescriptor *userDesc) + { + // Must be called with stowedJobTempsCrit held + ForEachItemIn(i, stowedJobTemps) + { + StowedJobTemp &stowed = stowedJobTemps.item(i); + try + { + // Publish the stowed file to Dali + IDistributedFile *file = stowed.file; + if (file) + { + const char *logicalName = stowed.logicalName; + + // Remove any existing entry first + queryDistributedFileDirectory().removeEntry(logicalName, userDesc); + + // Attach the file to make it published + file->attach(logicalName, userDesc); + LOG(MCdebugInfo, "Published stowed jobtemp: %s (wuid: %s)", logicalName, stowed.wuid.get()); + } + } + catch (IException *e) + { + StringBuffer msg; + e->errorMessage(msg); + LOG(MCwarning, "Failed to publish stowed jobtemp %s: %s", stowed.logicalName.get(), msg.str()); + e->Release(); + } + } + stowedJobTemps.kill(); + } + public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); CFileManager() { replicateOutputs = globals->getPropBool("@replicateOutputs"); + delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); + } + + ~CFileManager() + { + // Clean up any remaining stowed jobtemps on shutdown + CriticalBlock block(stowedJobTempsCrit); + if (stowedJobTemps.ordinality() > 0) + { + LOG(MCdebugInfo, "Clearing %d stowed jobtemps on shutdown (not publishing as workunit context is unavailable)", stowedJobTemps.ordinality()); + stowedJobTemps.kill(); + } } StringBuffer &mangleLFN(CJobBase &job, const char *lfn, StringBuffer &out) { @@ -335,6 +400,39 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager if (fileMapping) return &fileMapping->get(); + // Check if this is a delayed jobtemp in the stowed set + if (delayJobTempPublish && temporary) + { + CriticalBlock block(stowedJobTempsCrit); + const char *jobWuid = job.queryWuid(); + + // Initialize currentWuid on first use + if (currentWuid.isEmpty()) + { + currentWuid.set(jobWuid); + } + // Check if we've transitioned to a new workunit + else if (!streq(currentWuid, jobWuid)) + { + // Publish all stowed jobtemps from the previous workunit + publishStowedJobTemps(job.queryUserDescriptor()); + currentWuid.set(jobWuid); + } + + // Look for the file in the stowed set + ForEachItemIn(i, stowedJobTemps) + { + StowedJobTemp &stowed = stowedJobTemps.item(i); + if (streq(stowed.logicalName, scopedName.str()) && streq(stowed.wuid, jobWuid)) + { + // Found in stowed set, remove and return + Owned file = stowed.file.getLink(); + stowedJobTemps.remove(i); + return file.getClear(); + } + } + } + Owned file = timedLookup(job, scopedName.str(), mode, privilegedUser, job.queryMaxLfnBlockTimeMins() * 60000); if (file && 0 == file->numParts()) { @@ -451,6 +549,8 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager desc.setown(createFileDescriptor()); if (temporary) desc->queryProperties().setPropBool("@temporary", temporary); + if (jobTemp) + desc->queryProperties().setPropBool("@jobTemp", jobTemp); else desc->setTraceName(logicalName); if (persistent) @@ -532,6 +632,35 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager { IPropertyTree &props = fileDesc.queryProperties(); bool temporary = props.getPropBool("@temporary"); + bool jobTemp = props.getPropBool("@jobTemp", false); + + // If delayJobTempPublish is enabled and this is a jobtemp, stow it instead of publishing + if (delayJobTempPublish && jobTemp && !job.queryUseCheckpoints()) + { + Owned file = queryDistributedFileDirectory().createNew(&fileDesc); + + CriticalBlock block(stowedJobTempsCrit); + const char *jobWuid = job.queryWuid(); + + // Initialize currentWuid on first use + if (currentWuid.isEmpty()) + { + currentWuid.set(jobWuid); + } + // Check if we've transitioned to a new workunit + else if (!streq(currentWuid, jobWuid)) + { + // Publish all stowed jobtemps from the previous workunit + publishStowedJobTemps(job.queryUserDescriptor()); + currentWuid.set(jobWuid); + } + + // Stow the jobtemp + stowedJobTemps.append(*new StowedJobTemp(logicalName, jobWuid, file)); + fileMap.replace(*new CIDistributeFileMapping(logicalName, *LINK(file))); // cache for immediate access + return; + } + if (!temporary || job.queryUseCheckpoints()) queryDistributedFileDirectory().removeEntry(logicalName, job.queryUserDescriptor()); // thor clusters are backed up so if replicateOutputs set *always* assume a replicate