Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion common/thorhelper/thorwrite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
limitations under the License.
############################################################################## */
#include "jliball.hpp"
#include "jptree.hpp"

#include "thorfile.hpp"

Expand Down Expand Up @@ -176,7 +177,14 @@ void getDefaultWritePlane(StringBuffer & plane, unsigned helperFlags)
{
//NB: This can only access TDX flags because it is called from readers and writers
if (helperFlags & TDXjobtemp)
getDefaultJobTempPlane(plane);
{
// Check if delayJobTempPublish is enabled - if so, use spill plane instead of data plane
bool delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false);
if (delayJobTempPublish)
getDefaultSpillPlane(plane);
else
getDefaultJobTempPlane(plane);
}
else if (helperFlags & TDXtemporary)
getDefaultSpillPlane(plane);
else
Expand Down
129 changes: 129 additions & 0 deletions thorlcr/mfilemanager/thmfilemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "jfile.hpp"
#include "jiface.hpp"
#include "jprop.hpp"
#include "jptree.hpp"
#include "jutil.hpp"


Expand Down Expand Up @@ -48,10 +49,28 @@
static IThorFileManager *fileManager = NULL;

typedef OwningStringHTMapping<IDistributedFile> CIDistributeFileMapping;

// Structure to track stowed jobtemps
struct StowedJobTemp
{
StringAttr logicalName;
StringAttr wuid;
Linked<IDistributedFile> file;

StowedJobTemp(const char *_logicalName, const char *_wuid, IDistributedFile *_file)
: logicalName(_logicalName), wuid(_wuid), file(_file)
{
}
};

class CFileManager : public CSimpleInterface, implements IThorFileManager
{
OwningStringSuperHashTableOf<CIDistributeFileMapping> fileMap;
bool replicateOutputs;
bool delayJobTempPublish; // Cached configuration option
CriticalSection stowedJobTempsCrit;
CIArrayOf<StowedJobTemp> stowedJobTemps;
StringAttr currentWuid; // Track the current workunit


StringBuffer &_getPublishPhysicalName(CJobBase &job, const char *logicalName, unsigned partno, const char *groupName, IGroup *group, StringBuffer &res)
Expand Down Expand Up @@ -225,12 +244,58 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager
}
}

// Helper method to publish all stowed jobtemps (called when transitioning between workunits)
void publishStowedJobTemps(IUserDescriptor *userDesc)
{
// Must be called with stowedJobTempsCrit held
ForEachItemIn(i, stowedJobTemps)
{
StowedJobTemp &stowed = stowedJobTemps.item(i);
try
{
// Publish the stowed file to Dali
IDistributedFile *file = stowed.file;
if (file)
{
const char *logicalName = stowed.logicalName;

// Remove any existing entry first
queryDistributedFileDirectory().removeEntry(logicalName, userDesc);

// Attach the file to make it published
file->attach(logicalName, userDesc);
LOG(MCdebugInfo, "Published stowed jobtemp: %s (wuid: %s)", logicalName, stowed.wuid.get());
}
}
catch (IException *e)
{
StringBuffer msg;
e->errorMessage(msg);
LOG(MCwarning, "Failed to publish stowed jobtemp %s: %s", stowed.logicalName.get(), msg.str());
e->Release();
}
}
stowedJobTemps.kill();
}

public:
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);

CFileManager()
{
replicateOutputs = globals->getPropBool("@replicateOutputs");
delayJobTempPublish = getComponentConfigSP()->getPropBool("@delayJobTempPublish", false);
}

~CFileManager()
{
// Clean up any remaining stowed jobtemps on shutdown
CriticalBlock block(stowedJobTempsCrit);
if (stowedJobTemps.ordinality() > 0)
{
LOG(MCdebugInfo, "Clearing %d stowed jobtemps on shutdown (not publishing as workunit context is unavailable)", stowedJobTemps.ordinality());
stowedJobTemps.kill();
}
}
StringBuffer &mangleLFN(CJobBase &job, const char *lfn, StringBuffer &out)
{
Expand Down Expand Up @@ -335,6 +400,39 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager
if (fileMapping)
return &fileMapping->get();

// Check if this is a delayed jobtemp in the stowed set
if (delayJobTempPublish && temporary)
{
CriticalBlock block(stowedJobTempsCrit);
const char *jobWuid = job.queryWuid();

// Initialize currentWuid on first use
if (currentWuid.isEmpty())
{
currentWuid.set(jobWuid);
}
// Check if we've transitioned to a new workunit
else if (!streq(currentWuid, jobWuid))
{
// Publish all stowed jobtemps from the previous workunit
publishStowedJobTemps(job.queryUserDescriptor());
currentWuid.set(jobWuid);
}

// Look for the file in the stowed set
ForEachItemIn(i, stowedJobTemps)
{
StowedJobTemp &stowed = stowedJobTemps.item(i);
if (streq(stowed.logicalName, scopedName.str()) && streq(stowed.wuid, jobWuid))
{
// Found in stowed set, remove and return
Owned<IDistributedFile> file = stowed.file.getLink();
stowedJobTemps.remove(i);
return file.getClear();
}
}
}

Owned<IDistributedFile> file = timedLookup(job, scopedName.str(), mode, privilegedUser, job.queryMaxLfnBlockTimeMins() * 60000);
if (file && 0 == file->numParts())
{
Expand Down Expand Up @@ -451,6 +549,8 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager
desc.setown(createFileDescriptor());
if (temporary)
desc->queryProperties().setPropBool("@temporary", temporary);
if (jobTemp)
desc->queryProperties().setPropBool("@jobTemp", jobTemp);
else
desc->setTraceName(logicalName);
if (persistent)
Expand Down Expand Up @@ -532,6 +632,35 @@ class CFileManager : public CSimpleInterface, implements IThorFileManager
{
IPropertyTree &props = fileDesc.queryProperties();
bool temporary = props.getPropBool("@temporary");
bool jobTemp = props.getPropBool("@jobTemp", false);

// If delayJobTempPublish is enabled and this is a jobtemp, stow it instead of publishing
if (delayJobTempPublish && jobTemp && !job.queryUseCheckpoints())
{
Owned<IDistributedFile> file = queryDistributedFileDirectory().createNew(&fileDesc);

CriticalBlock block(stowedJobTempsCrit);
const char *jobWuid = job.queryWuid();

// Initialize currentWuid on first use
if (currentWuid.isEmpty())
{
currentWuid.set(jobWuid);
}
// Check if we've transitioned to a new workunit
else if (!streq(currentWuid, jobWuid))
{
// Publish all stowed jobtemps from the previous workunit
publishStowedJobTemps(job.queryUserDescriptor());
currentWuid.set(jobWuid);
}

// Stow the jobtemp
stowedJobTemps.append(*new StowedJobTemp(logicalName, jobWuid, file));
fileMap.replace(*new CIDistributeFileMapping(logicalName, *LINK(file))); // cache for immediate access
return;
}

if (!temporary || job.queryUseCheckpoints())
queryDistributedFileDirectory().removeEntry(logicalName, job.queryUserDescriptor());
// thor clusters are backed up so if replicateOutputs set *always* assume a replicate
Expand Down