From 6650153c6f7e76e02be1e3343be3f65235ddc422 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:09:49 +0000 Subject: [PATCH 1/6] Initial plan From fe1d2a7126ee99a33cadd529ea15ae2aacbee194 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:20:34 +0000 Subject: [PATCH 2/6] Add basic delayJobTempPublish implementation with stowing mechanism Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com> --- common/thorhelper/thorwrite.cpp | 10 ++- thorlcr/mfilemanager/thmfilemanager.cpp | 107 ++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 1 deletion(-) diff --git a/common/thorhelper/thorwrite.cpp b/common/thorhelper/thorwrite.cpp index bf7d684b1db..155bd590a87 100644 --- a/common/thorhelper/thorwrite.cpp +++ b/common/thorhelper/thorwrite.cpp @@ -15,6 +15,7 @@ limitations under the License. ############################################################################## */ #include "jliball.hpp" +#include "jptree.hpp" #include "thorfile.hpp" @@ -176,7 +177,14 @@ void getDefaultWritePlane(StringBuffer & plane, unsigned helperFlags) { //NB: This can only access TDX flags because it is called from readers and writers if (helperFlags & TDXjobtemp) - getDefaultJobTempPlane(plane); + { + // Check if delayJobTempPublish is enabled - if so, use spill plane instead of data plane + bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); + if (delayJobTempPublish) + getDefaultSpillPlane(plane); + else + getDefaultJobTempPlane(plane); + } else if (helperFlags & TDXtemporary) getDefaultSpillPlane(plane); else diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index eb7b4d9f74b..09c246889e5 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -20,6 +20,7 @@ #include "jfile.hpp" #include "jiface.hpp" #include "jprop.hpp" +#include "jptree.hpp" #include "jutil.hpp" @@ -48,10 +49,27 @@ static IThorFileManager *fileManager = NULL; typedef OwningStringHTMapping CIDistributeFileMapping; + +// Structure to track stowed jobtemps +struct StowedJobTemp +{ + StringAttr logicalName; + StringAttr wuid; + Linked file; + + StowedJobTemp(const char *_logicalName, const char *_wuid, IDistributedFile *_file) + : logicalName(_logicalName), wuid(_wuid), file(_file) + { + } +}; + class CFileManager : public CSimpleInterface, implements IThorFileManager { OwningStringSuperHashTableOf fileMap; bool replicateOutputs; + CriticalSection stowedJobTempsCrit; + CIArrayOf stowedJobTemps; + StringAttr currentWuid; // Track the current workunit StringBuffer &_getPublishPhysicalName(CJobBase &job, const char *logicalName, unsigned partno, const char *groupName, IGroup *group, StringBuffer &res) @@ -225,6 +243,40 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager } } + // Helper method to publish all stowed jobtemps (called when transitioning between workunits) + void publishStowedJobTemps() + { + // Must be called with stowedJobTempsCrit held + ForEachItemIn(i, stowedJobTemps) + { + StowedJobTemp &stowed = stowedJobTemps.item(i); + try + { + // Publish the stowed file to Dali + IDistributedFile *file = stowed.file; + if (file) + { + const char *logicalName = stowed.logicalName; + // Create a temporary userDescriptor for the publishing + Owned userDesc = createUserDescriptor(); + userDesc->set(nullptr, nullptr); // Use default user + + // Attach the file to make it published + file->attach(logicalName, userDesc); + LOG(MCdebugInfo, "Published stowed jobtemp: %s (wuid: %s)", logicalName, stowed.wuid.get()); + } + } + catch (IException *e) + { + StringBuffer msg; + e->errorMessage(msg); + LOG(MCwarning, "Failed to publish stowed jobtemp %s: %s", stowed.logicalName.get(), msg.str()); + e->Release(); + } + } + stowedJobTemps.kill(); + } + public: IMPLEMENT_IINTERFACE_USING(CSimpleInterface); @@ -335,6 +387,35 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager if (fileMapping) return &fileMapping->get(); + // Check if this is a delayed jobtemp in the stowed set + bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); + if (delayJobTempPublish && temporary) + { + CriticalBlock block(stowedJobTempsCrit); + const char *jobWuid = job.queryWuid(); + + // Check if we've transitioned to a new workunit + if (!currentWuid.isEmpty() && !streq(currentWuid, jobWuid)) + { + // Publish all stowed jobtemps from the previous workunit + publishStowedJobTemps(); + currentWuid.set(jobWuid); + } + + // Look for the file in the stowed set + ForEachItemIn(i, stowedJobTemps) + { + StowedJobTemp &stowed = stowedJobTemps.item(i); + if (streq(stowed.logicalName, scopedName.str()) && streq(stowed.wuid, jobWuid)) + { + // Found in stowed set, remove and return + Owned file = stowed.file.getLink(); + stowedJobTemps.remove(i); + return file.getClear(); + } + } + } + Owned file = timedLookup(job, scopedName.str(), mode, privilegedUser, job.queryMaxLfnBlockTimeMins() * 60000); if (file && 0 == file->numParts()) { @@ -451,6 +532,8 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager desc.setown(createFileDescriptor()); if (temporary) desc->queryProperties().setPropBool("@temporary", temporary); + if (jobTemp) + desc->queryProperties().setPropBool("@jobTemp", jobTemp); else desc->setTraceName(logicalName); if (persistent) @@ -532,6 +615,30 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager { IPropertyTree &props = fileDesc.queryProperties(); bool temporary = props.getPropBool("@temporary"); + bool jobTemp = props.getPropBool("@jobTemp", false); + bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); + + // If delayJobTempPublish is enabled and this is a jobtemp, stow it instead of publishing + if (delayJobTempPublish && jobTemp && !job.queryUseCheckpoints()) + { + Owned file = queryDistributedFileDirectory().createNew(&fileDesc); + + CriticalBlock block(stowedJobTempsCrit); + // Check if we've transitioned to a new workunit + const char *jobWuid = job.queryWuid(); + if (!currentWuid.isEmpty() && !streq(currentWuid, jobWuid)) + { + // Publish all stowed jobtemps from the previous workunit + publishStowedJobTemps(); + } + currentWuid.set(jobWuid); + + // Stow the jobtemp + stowedJobTemps.append(*new StowedJobTemp(logicalName, jobWuid, file)); + fileMap.replace(*new CIDistributeFileMapping(logicalName, *LINK(file))); // cache for immediate access + return; + } + if (!temporary || job.queryUseCheckpoints()) queryDistributedFileDirectory().removeEntry(logicalName, job.queryUserDescriptor()); // thor clusters are backed up so if replicateOutputs set *always* assume a replicate From 461b4afa6c80f51384570c7a432ac7b77db40fbd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:22:39 +0000 Subject: [PATCH 3/6] Fix stowed jobtemp publishing and add cleanup on shutdown Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com> --- thorlcr/mfilemanager/thmfilemanager.cpp | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index 09c246889e5..0ad7ddd7ee7 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -257,10 +257,13 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager if (file) { const char *logicalName = stowed.logicalName; - // Create a temporary userDescriptor for the publishing + // Create a user descriptor for the publishing Owned userDesc = createUserDescriptor(); userDesc->set(nullptr, nullptr); // Use default user + // Remove any existing entry first + queryDistributedFileDirectory().removeEntry(logicalName, userDesc); + // Attach the file to make it published file->attach(logicalName, userDesc); LOG(MCdebugInfo, "Published stowed jobtemp: %s (wuid: %s)", logicalName, stowed.wuid.get()); @@ -284,6 +287,17 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager { replicateOutputs = globals->getPropBool("@replicateOutputs"); } + + ~CFileManager() + { + // Publish any remaining stowed jobtemps on shutdown + CriticalBlock block(stowedJobTempsCrit); + if (stowedJobTemps.ordinality() > 0) + { + LOG(MCdebugInfo, "Publishing %d remaining stowed jobtemps on shutdown", stowedJobTemps.ordinality()); + publishStowedJobTemps(); + } + } StringBuffer &mangleLFN(CJobBase &job, const char *lfn, StringBuffer &out) { out.append(lfn).append("__").append(job.queryWuid()); From 8268c4eb10893a79fc53fd41f1e8e6e822c14d0f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:24:56 +0000 Subject: [PATCH 4/6] Fix currentWuid initialization to handle first workunit correctly Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com> --- thorlcr/mfilemanager/thmfilemanager.cpp | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index 0ad7ddd7ee7..1dd104c1f3c 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -408,8 +408,13 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager CriticalBlock block(stowedJobTempsCrit); const char *jobWuid = job.queryWuid(); + // Initialize currentWuid on first use + if (currentWuid.isEmpty()) + { + currentWuid.set(jobWuid); + } // Check if we've transitioned to a new workunit - if (!currentWuid.isEmpty() && !streq(currentWuid, jobWuid)) + else if (!streq(currentWuid, jobWuid)) { // Publish all stowed jobtemps from the previous workunit publishStowedJobTemps(); @@ -638,14 +643,20 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager Owned file = queryDistributedFileDirectory().createNew(&fileDesc); CriticalBlock block(stowedJobTempsCrit); - // Check if we've transitioned to a new workunit const char *jobWuid = job.queryWuid(); - if (!currentWuid.isEmpty() && !streq(currentWuid, jobWuid)) + + // Initialize currentWuid on first use + if (currentWuid.isEmpty()) + { + currentWuid.set(jobWuid); + } + // Check if we've transitioned to a new workunit + else if (!streq(currentWuid, jobWuid)) { // Publish all stowed jobtemps from the previous workunit publishStowedJobTemps(); + currentWuid.set(jobWuid); } - currentWuid.set(jobWuid); // Stow the jobtemp stowedJobTemps.append(*new StowedJobTemp(logicalName, jobWuid, file)); From 82291973740c82b4ec7404dfd6da79f7986a2253 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:27:31 +0000 Subject: [PATCH 5/6] Fix user descriptor handling in publishStowedJobTemps Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com> --- thorlcr/mfilemanager/thmfilemanager.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index 1dd104c1f3c..4bc620920db 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -244,7 +244,7 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager } // Helper method to publish all stowed jobtemps (called when transitioning between workunits) - void publishStowedJobTemps() + void publishStowedJobTemps(IUserDescriptor *userDesc) { // Must be called with stowedJobTempsCrit held ForEachItemIn(i, stowedJobTemps) @@ -257,9 +257,6 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager if (file) { const char *logicalName = stowed.logicalName; - // Create a user descriptor for the publishing - Owned userDesc = createUserDescriptor(); - userDesc->set(nullptr, nullptr); // Use default user // Remove any existing entry first queryDistributedFileDirectory().removeEntry(logicalName, userDesc); @@ -291,11 +288,14 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager ~CFileManager() { // Publish any remaining stowed jobtemps on shutdown + // Note: Using nullptr for userDesc will use default credentials CriticalBlock block(stowedJobTempsCrit); if (stowedJobTemps.ordinality() > 0) { LOG(MCdebugInfo, "Publishing %d remaining stowed jobtemps on shutdown", stowedJobTemps.ordinality()); - publishStowedJobTemps(); + // We don't have access to a job context here, so we'll just log and clear + // In a real shutdown scenario, jobtemps from an incomplete workunit may not need to be published + stowedJobTemps.kill(); } } StringBuffer &mangleLFN(CJobBase &job, const char *lfn, StringBuffer &out) @@ -417,7 +417,7 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager else if (!streq(currentWuid, jobWuid)) { // Publish all stowed jobtemps from the previous workunit - publishStowedJobTemps(); + publishStowedJobTemps(job.queryUserDescriptor()); currentWuid.set(jobWuid); } @@ -654,7 +654,7 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager else if (!streq(currentWuid, jobWuid)) { // Publish all stowed jobtemps from the previous workunit - publishStowedJobTemps(); + publishStowedJobTemps(job.queryUserDescriptor()); currentWuid.set(jobWuid); } From 8eb18b9b2a96afd15c31bff6d83e87630b95f561 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 7 Nov 2025 10:30:05 +0000 Subject: [PATCH 6/6] Cache delayJobTempPublish config and improve destructor logging Co-authored-by: jakesmith <902700+jakesmith@users.noreply.github.com> --- thorlcr/mfilemanager/thmfilemanager.cpp | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/thorlcr/mfilemanager/thmfilemanager.cpp b/thorlcr/mfilemanager/thmfilemanager.cpp index 4bc620920db..80cdd5e90b0 100644 --- a/thorlcr/mfilemanager/thmfilemanager.cpp +++ b/thorlcr/mfilemanager/thmfilemanager.cpp @@ -67,6 +67,7 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager { OwningStringSuperHashTableOf fileMap; bool replicateOutputs; + bool delayJobTempPublish; // Cached configuration option CriticalSection stowedJobTempsCrit; CIArrayOf stowedJobTemps; StringAttr currentWuid; // Track the current workunit @@ -283,18 +284,16 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager CFileManager() { replicateOutputs = globals->getPropBool("@replicateOutputs"); + delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); } ~CFileManager() { - // Publish any remaining stowed jobtemps on shutdown - // Note: Using nullptr for userDesc will use default credentials + // Clean up any remaining stowed jobtemps on shutdown CriticalBlock block(stowedJobTempsCrit); if (stowedJobTemps.ordinality() > 0) { - LOG(MCdebugInfo, "Publishing %d remaining stowed jobtemps on shutdown", stowedJobTemps.ordinality()); - // We don't have access to a job context here, so we'll just log and clear - // In a real shutdown scenario, jobtemps from an incomplete workunit may not need to be published + LOG(MCdebugInfo, "Clearing %d stowed jobtemps on shutdown (not publishing as workunit context is unavailable)", stowedJobTemps.ordinality()); stowedJobTemps.kill(); } } @@ -402,7 +401,6 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager return &fileMapping->get(); // Check if this is a delayed jobtemp in the stowed set - bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); if (delayJobTempPublish && temporary) { CriticalBlock block(stowedJobTempsCrit); @@ -635,7 +633,6 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager IPropertyTree &props = fileDesc.queryProperties(); bool temporary = props.getPropBool("@temporary"); bool jobTemp = props.getPropBool("@jobTemp", false); - bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false); // If delayJobTempPublish is enabled and this is a jobtemp, stow it instead of publishing if (delayJobTempPublish && jobTemp && !job.queryUseCheckpoints())