diff --git a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
index 43d3b24bad..b1fcff3a53 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU.Examples/Org.Apache.REEF.IMRU.Examples.csproj
@@ -1,4 +1,4 @@
-
+
+
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs b/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
index 921c54bbb3..09a2ff685d 100644
--- a/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU.Tests/ImruDriverCancelTests.cs
@@ -185,6 +185,8 @@ private IConfiguration GetDriverConfig
+
+
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs
new file mode 100644
index 0000000000..3036727cea
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointHandler.cs
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+ ///
+ /// It is responsible for save and restore ITaskState object.
+ ///
+ [DefaultImplementation(typeof(IMRUCheckpointHandler))]
+ public interface IIMRUCheckpointHandler
+ {
+ ///
+ /// Persistent ITaskState object with the given ICodec.
+ ///
+ ///
+ void Persist(ITaskState taskState);
+
+ ///
+ /// Restore the data and decode it with the given ICodec.
+ ///
+ ///
+ ITaskState Restore();
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs
new file mode 100644
index 0000000000..b4dcda9a1c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IIMRUCheckpointResultHandler.cs
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler;
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.API
+{
+ ///
+ /// Internal interface that defineds methods for checkpoint.
+ ///
+ [DefaultImplementation(typeof(IMRUCheckpointResultHandler))]
+ internal interface IIMRUCheckpointResultHandler
+ {
+ ///
+ /// Set a flag to indicate the result has been handled.
+ ///
+ void MarkResulHandled();
+
+ ///
+ /// Check if the result flag has been set.
+ ///
+ bool IsResultHandled();
+
+ ///
+ /// Clear state files
+ ///
+ void Clear();
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
index e2ffc6d63b..f028daff61 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinition.cs
@@ -38,6 +38,7 @@ public sealed class IMRUJobDefinition
private readonly IConfiguration _mapInputPipelineDataConverterConfiguration;
private readonly IConfiguration _partitionedDatasetConfiguration;
private readonly IConfiguration _resultHandlerConfiguration;
+ private readonly IConfiguration _checkpointConfiguration;
private readonly IConfiguration _jobCancelSignalConfiguration;
private readonly int _numberOfMappers;
private readonly int _memoryPerMapper;
@@ -66,6 +67,7 @@ public sealed class IMRUJobDefinition
/// Configuration of partitioned
/// dataset
/// Configuration of result handler
+ /// Configuration of checkpoint
/// Per mapper configuration
/// Number of mappers
/// Per Mapper memory.
@@ -88,6 +90,7 @@ internal IMRUJobDefinition(
IConfiguration mapInputPipelineDataConverterConfiguration,
IConfiguration partitionedDatasetConfiguration,
IConfiguration resultHandlerConfiguration,
+ IConfiguration checkpointConfiguration,
IConfiguration jobCancelSignalConfiguration,
ISet perMapConfigGeneratorConfig,
int numberOfMappers,
@@ -119,6 +122,7 @@ internal IMRUJobDefinition(
_perMapConfigGeneratorConfig = perMapConfigGeneratorConfig;
_invokeGC = invokeGC;
_resultHandlerConfiguration = resultHandlerConfiguration;
+ _checkpointConfiguration = checkpointConfiguration;
_jobCancelSignalConfiguration = jobCancelSignalConfiguration;
}
@@ -287,6 +291,15 @@ internal IConfiguration ResultHandlerConfiguration
get { return _resultHandlerConfiguration; }
}
+ ///
+ /// Configuration of the checkpoint
+ ///
+ ///
+ internal IConfiguration CheckpointConfiguration
+ {
+ get { return _checkpointConfiguration; }
+ }
+
///
/// Configuration for job cancellation signal implementation
///
diff --git a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
index f3fe89c4ce..b750c47506 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/API/IMRUJobDefinitionBuilder.cs
@@ -50,6 +50,7 @@ public sealed class IMRUJobDefinitionBuilder
private IConfiguration _mapInputPipelineDataConverterConfiguration;
private IConfiguration _partitionedDatasetConfiguration;
private IConfiguration _resultHandlerConfiguration;
+ private IConfiguration _checkPointConfiguration;
private IConfiguration _jobCancellationConfiguration;
private readonly ISet _perMapConfigGeneratorConfig;
private bool _invokeGC;
@@ -68,6 +69,7 @@ public IMRUJobDefinitionBuilder()
_mapOutputPipelineDataConverterConfiguration = EmptyConfiguration;
_partitionedDatasetConfiguration = EmptyConfiguration;
_resultHandlerConfiguration = EmptyConfiguration;
+ _checkPointConfiguration = EmptyConfiguration;
_memoryPerMapper = 512;
_updateTaskMemory = 512;
_coresPerMapper = 1;
@@ -295,6 +297,17 @@ public IMRUJobDefinitionBuilder SetResultHandlerConfiguration(IConfiguration res
return this;
}
+ ///
+ /// Sets checkpoint Configuration
+ ///
+ /// Checkpoint config
+ ///
+ public IMRUJobDefinitionBuilder SetCheckpointConfiguration(IConfiguration checkpointConfig)
+ {
+ _checkPointConfiguration = checkpointConfig;
+ return this;
+ }
+
///
/// Whether to invoke Garbage Collector after each IMRU iteration
///
@@ -368,6 +381,7 @@ public IMRUJobDefinition Build()
_mapInputPipelineDataConverterConfiguration,
_partitionedDatasetConfiguration,
_resultHandlerConfiguration,
+ _checkPointConfiguration,
_jobCancellationConfiguration,
_perMapConfigGeneratorConfig,
_numberOfMappers,
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs
new file mode 100644
index 0000000000..a865b6ba23
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/CheckpointConfigurationModule.cs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.Tang.Formats;
+using Org.Apache.REEF.Tang.Util;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler
+{
+ ///
+ /// Configuration module builder for checkpoint handler
+ ///
+ public sealed class CheckpointConfigurationBuilder : ConfigurationModuleBuilder
+ {
+ ///
+ /// The check point sttate file path. In cluster, should be a path in remote storage.
+ ///
+ public static readonly RequiredParameter CheckpointFilePath = new RequiredParameter();
+
+ ///
+ /// Codec that is to encode and decode state object.
+ ///
+ public static readonly RequiredImpl> TaskStateCodec = new RequiredImpl>();
+
+ ///
+ /// Configuration module for checkpoint handler.
+ ///
+ public static readonly ConfigurationModule ConfigurationModule = new CheckpointConfigurationBuilder()
+ .BindNamedParameter(GenericType.Class, CheckpointFilePath)
+ .BindImplementation(GenericType.Class, GenericType.Class)
+ .BindImplementation(GenericType>.Class, TaskStateCodec)
+ .Build();
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs
new file mode 100644
index 0000000000..72598a2601
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointHandler.cs
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.IO.TempFileCreation;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Tang.Implementations.Tang;
+using Org.Apache.REEF.Utilities.Logging;
+using Org.Apache.REEF.Wake.Remote;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler
+{
+ ///
+ /// Default implementation of IIMRUCheckpointHandler
+ ///
+ public sealed class IMRUCheckpointHandler : IIMRUCheckpointHandler
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(IMRUCheckpointHandler));
+
+ private readonly IFileSystem _fileSystem;
+ private readonly ICodec _stateCodec;
+ private readonly string _checkpointFilePath;
+ private readonly Uri _checkpointFileUri;
+ private const string StateDir = "StateDir";
+ private const string StateFile = "StateFile";
+ private const string FlagFile = "FlagFile";
+ public const string StateFileExt = ".bin";
+ public const string FlagFileExt = ".txt";
+
+ ///
+ /// It is for storing and retrieving checkpoint data.
+ ///
+ /// The file path where the checkpoint data will be stored.
+ /// File system to load/upload checkpoint data
+ /// Codec that is used for decoding and encoding for State object.
+ [Inject]
+ private IMRUCheckpointHandler(
+ [Parameter(typeof(CheckpointFilePath))] string checkpointFilePath,
+ ICodec stateCodec,
+ IFileSystem fileSystem)
+ {
+ _fileSystem = fileSystem;
+ _stateCodec = stateCodec;
+ _checkpointFilePath = checkpointFilePath;
+ _checkpointFileUri = _fileSystem.CreateUriForPath(_checkpointFilePath);
+ Logger.Log(Level.Info, "State file path: {0}", checkpointFilePath);
+ }
+
+ ///
+ /// Save serialized checkpoint data to remote checkpoint file.
+ ///
+ ///
+ public void Persist(ITaskState taskState)
+ {
+ var localStateFile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("statefile", string.Empty);
+ var localFlagfile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("flagfile", string.Empty);
+
+ string tick = DateTime.Now.Ticks.ToString();
+ string stateFileDir = Path.Combine(_checkpointFilePath, StateDir + tick);
+ string remoteStateFileName = Path.Combine(stateFileDir, StateFile + tick + StateFileExt);
+ string remoteFlagFileName = Path.Combine(stateFileDir, FlagFile + tick + FlagFileExt);
+
+ var stateFileUri = _fileSystem.CreateUriForPath(remoteStateFileName);
+ var flagFileUri = _fileSystem.CreateUriForPath(remoteFlagFileName);
+
+ var data = _stateCodec.Encode(taskState);
+ File.WriteAllBytes(localStateFile, data);
+ File.WriteAllText(localFlagfile, remoteStateFileName);
+
+ _fileSystem.CopyFromLocal(localStateFile, stateFileUri);
+ _fileSystem.CopyFromLocal(localFlagfile, flagFileUri);
+
+ File.Delete(localStateFile);
+ File.Delete(localFlagfile);
+ }
+
+ ///
+ /// Read checkpoint data and deserialize it into ITaskState object.
+ ///
+ ///
+ public ITaskState Restore()
+ {
+ if (!string.IsNullOrEmpty(_checkpointFilePath))
+ {
+ var files = _fileSystem.GetChildren(_checkpointFileUri);
+ if (files != null)
+ {
+ var flagFiles = files.Where(f => f.AbsolutePath.Contains(FlagFile));
+ var uris = flagFiles.OrderByDescending(ff => _fileSystem.GetFileStatus(ff).ModificationTime).ToList();
+
+ return Restore(uris);
+ }
+ }
+ return null;
+ }
+
+ private ITaskState Restore(IList uris)
+ {
+ Uri latestFlagFile = uris.FirstOrDefault();
+ if (latestFlagFile != null)
+ {
+ var localLatestStatefile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("statefile", string.Empty);
+ var localLatestFlagfile = TangFactory.GetTang().NewInjector().GetInstance().GetTempFileName("flagfile", string.Empty);
+
+ _fileSystem.CopyToLocal(latestFlagFile, localLatestFlagfile);
+
+ try
+ {
+ string latestStateFile = File.ReadAllText(localLatestFlagfile);
+ Logger.Log(Level.Info, "latestStateFile -- : {0}", latestStateFile);
+ var latestStateFileUri = _fileSystem.CreateUriForPath(latestStateFile);
+ _fileSystem.CopyToLocal(latestStateFileUri, localLatestStatefile);
+ var currentState = File.ReadAllBytes(localLatestStatefile);
+ return _stateCodec.Decode(currentState);
+ }
+ catch (Exception e)
+ {
+ Logger.Log(Level.Info, "Exception in restoring from state file. Possible file corruption {0}", e);
+ uris.RemoveAt(0);
+ return Restore(uris);
+ }
+ finally
+ {
+ File.Delete(localLatestFlagfile);
+ File.Delete(localLatestStatefile);
+ }
+ }
+ return null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs
new file mode 100644
index 0000000000..c627104e88
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/CheckpointHandler/IMRUCheckpointResultHandler.cs
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using System;
+using System.IO;
+using System.Linq;
+using Org.Apache.REEF.IMRU.API;
+using Org.Apache.REEF.IMRU.OnREEF.Parameters;
+using Org.Apache.REEF.IO.FileSystem;
+using Org.Apache.REEF.Tang.Annotations;
+using Org.Apache.REEF.Utilities.Logging;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler
+{
+ ///
+ /// Internal class that handles checkpoint for result flag.
+ ///
+ internal class IMRUCheckpointResultHandler : IIMRUCheckpointResultHandler
+ {
+ private static readonly Logger Logger = Logger.GetLogger(typeof(IIMRUCheckpointResultHandler));
+
+ private const string RresultDir = "RresultDir";
+ private const string RresultFile = "result.txt";
+ private const string Done = "done";
+
+ private readonly IFileSystem _fileSystem;
+ private readonly Uri _resultFileUrl;
+ private readonly string _checkpointFilePath;
+
+ ///
+ /// It is for storing and retrieving checkpoint result flag.
+ ///
+ /// The file path where the checkpoint data will be stored.
+ /// File system to load/upload checkpoint data
+ [Inject]
+ private IMRUCheckpointResultHandler(
+ [Parameter(typeof(CheckpointFilePath))] string checkpointFilePath,
+ IFileSystem fileSystem)
+ {
+ _fileSystem = fileSystem;
+ _checkpointFilePath = checkpointFilePath;
+
+ if (!string.IsNullOrEmpty(_checkpointFilePath))
+ {
+ string resultFile = Path.Combine(_checkpointFilePath, RresultDir, RresultFile);
+ _resultFileUrl = _fileSystem.CreateUriForPath(resultFile);
+ }
+ Logger.Log(Level.Info, "State file path: {0}", checkpointFilePath);
+ }
+
+ ///
+ /// Set flag to show result is already written.
+ ///
+ public void MarkResulHandled()
+ {
+ if (!string.IsNullOrEmpty(_checkpointFilePath) && !_fileSystem.Exists(_resultFileUrl))
+ {
+ var resultLocalFile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4));
+ File.WriteAllText(resultLocalFile, Done);
+ _fileSystem.CopyFromLocal(resultLocalFile, _resultFileUrl);
+ File.Delete(resultLocalFile);
+ }
+ }
+
+ ///
+ /// Retrieve the result flag.
+ ///
+ ///
+ public bool IsResultHandled()
+ {
+ if (!string.IsNullOrEmpty(_checkpointFilePath) && _fileSystem.Exists(_resultFileUrl))
+ {
+ var resultLocalFile = Path.Combine(Path.GetTempPath(), Guid.NewGuid().ToString("N").Substring(0, 4));
+ _fileSystem.CopyToLocal(_resultFileUrl, resultLocalFile);
+ var result = File.ReadAllText(resultLocalFile);
+ Logger.Log(Level.Info, "GetResult: {0}", result);
+ return Done.Equals(result);
+ }
+ return false;
+ }
+
+ ///
+ /// Clear checkpoint files in checkpointFilePath. It should be only called once for the entire job.
+ ///
+ public void Clear()
+ {
+ if (!string.IsNullOrEmpty(_checkpointFilePath))
+ {
+ var filesToRemove = _fileSystem.GetChildren(_fileSystem.CreateUriForPath(_checkpointFilePath));
+ var streamToRemove = filesToRemove.Where(f => IsStateFile(f.AbsolutePath));
+ foreach (var stream in streamToRemove)
+ {
+ _fileSystem.Delete(stream);
+ }
+ }
+ }
+
+ private bool IsStateFile(string path)
+ {
+ return path.EndsWith(IMRUCheckpointHandler.StateFileExt) || path.EndsWith(IMRUCheckpointHandler.FlagFileExt);
+ }
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
index 44e3f55c4e..9b7834ed03 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Client/REEFIMRUClient.cs
@@ -144,6 +144,8 @@ IEnumerable IIMRUClient.Submit
@@ -234,5 +239,13 @@ internal IConfiguration ResultHandlerConfiguration
{
get { return _resultHandlerConfiguration; }
}
+
+ ///
+ /// Configuration of checkpoint
+ ///
+ internal IConfiguration CheckpointConfiguration
+ {
+ get { return _checkpointConfiguration; }
+ }
}
}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
index 3db1bcd4c1..674fbaa1cf 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Driver/IMRUDriver.cs
@@ -1054,6 +1054,7 @@ private IConfiguration GetMasterTaskConfiguration(string taskId)
.Build(),
_configurationManager.UpdateFunctionConfiguration,
_configurationManager.ResultHandlerConfiguration,
+ _configurationManager.CheckpointConfiguration,
GetGroupCommConfiguration())
.BindNamedParameter(GenericType.Class, _invokeGC.ToString())
.Build();
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
index 6c32c68d81..ec01d7e49a 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/IMRUTasks/UpdateTaskHost.cs
@@ -46,6 +46,7 @@ internal sealed class UpdateTaskHost : TaskHostB
private readonly IBroadcastSender> _dataAndControlMessageSender;
private readonly IUpdateFunction _updateTask;
private readonly IIMRUResultHandler _resultHandler;
+ private readonly IIMRUCheckpointResultHandler _checkpointResultHandler;
///
/// It indicates if the update task has completed and result has been written.
@@ -57,6 +58,7 @@ internal sealed class UpdateTaskHost : TaskHostB
/// The UpdateTask hosted in this REEF Task.
/// Used to setup the communications.
/// Result handler
+ /// Checkpoint handler
/// Task close Coordinator
/// Whether to call Garbage Collector after each iteration or not
/// task id
@@ -67,6 +69,7 @@ private UpdateTaskHost(
IIMRUResultHandler resultHandler,
TaskCloseCoordinator taskCloseCoordinator,
[Parameter(typeof(InvokeGC))] bool invokeGc,
+ IIMRUCheckpointResultHandler checkpointResultHandler,
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId) :
base(groupCommunicationsClient, taskCloseCoordinator, invokeGc)
{
@@ -76,7 +79,14 @@ private UpdateTaskHost(
_communicationGroupClient.GetBroadcastSender>(IMRUConstants.BroadcastOperatorName);
_dataReceiver = _communicationGroupClient.GetReduceReceiver(IMRUConstants.ReduceOperatorName);
_resultHandler = resultHandler;
- Logger.Log(Level.Info, "$$$$_resultHandler." + _resultHandler.GetType().AssemblyQualifiedName);
+ _checkpointResultHandler = checkpointResultHandler;
+
+ var taskIdSplit = taskId.Split('-');
+ var retryIndex = int.Parse(taskIdSplit[taskIdSplit.Length - 1]);
+ if (retryIndex == 0)
+ {
+ _checkpointResultHandler.Clear();
+ }
Logger.Log(Level.Info, "UpdateTaskHost initialized.");
}
@@ -96,37 +106,49 @@ protected override byte[] TaskBody(byte[] memento)
HandleTaskAppException(e);
}
- while (!_cancellationSource.IsCancellationRequested && updateResult.HasMapInput)
+ if (updateResult.HasResult)
{
- using (
- var message = new MapInputWithControlMessage(updateResult.MapInput,
- MapControlMessage.AnotherRound))
+ if (!_checkpointResultHandler.IsResultHandled())
{
- _dataAndControlMessageSender.Send(message);
+ _resultHandler.HandleResult(updateResult.Result);
+ _checkpointResultHandler.MarkResulHandled();
}
-
- if (_invokeGc)
+ _done = true;
+ }
+ else
+ {
+ while (!_cancellationSource.IsCancellationRequested && updateResult.HasMapInput)
{
- Logger.Log(Level.Verbose, "Calling Garbage Collector");
- GC.Collect();
- GC.WaitForPendingFinalizers();
- }
+ using (
+ var message = new MapInputWithControlMessage(updateResult.MapInput,
+ MapControlMessage.AnotherRound))
+ {
+ _dataAndControlMessageSender.Send(message);
+ }
- var input = _dataReceiver.Reduce(_cancellationSource);
+ if (_invokeGc)
+ {
+ Logger.Log(Level.Verbose, "Calling Garbage Collector");
+ GC.Collect();
+ GC.WaitForPendingFinalizers();
+ }
+ var input = _dataReceiver.Reduce(_cancellationSource);
+ try
+ {
+ updateResult = _updateTask.Update(input);
- try
- {
- updateResult = _updateTask.Update(input);
- if (updateResult.HasResult)
+ if (updateResult.HasResult)
+ {
+ _resultHandler.HandleResult(updateResult.Result);
+ _done = true;
+ _checkpointResultHandler.MarkResulHandled();
+ }
+ }
+ catch (Exception e)
{
- _resultHandler.HandleResult(updateResult.Result);
- _done = true;
+ HandleTaskAppException(e);
}
}
- catch (Exception e)
- {
- HandleTaskAppException(e);
- }
}
if (!_cancellationSource.IsCancellationRequested)
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CheckpointFilePath.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CheckpointFilePath.cs
new file mode 100644
index 0000000000..1c786d34ac
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/CheckpointFilePath.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("Checkpoint file name", "CheckpointFilePath", "")]
+ public sealed class CheckpointFilePath : Name
+ {
+ }
+}
diff --git a/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs
new file mode 100644
index 0000000000..e06363306c
--- /dev/null
+++ b/lang/cs/Org.Apache.REEF.IMRU/OnREEF/Parameters/SerializedCheckpointConfiguration.cs
@@ -0,0 +1,26 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+using Org.Apache.REEF.Tang.Annotations;
+
+namespace Org.Apache.REEF.IMRU.OnREEF.Parameters
+{
+ [NamedParameter("The serialized configuration for checkpoint.")]
+ internal sealed class SerializedCheckpointConfiguration : Name
+ {
+ }
+}
\ No newline at end of file
diff --git a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
index 99d2665006..f2f57d3bcd 100644
--- a/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
+++ b/lang/cs/Org.Apache.REEF.IMRU/Org.Apache.REEF.IMRU.csproj
@@ -1,4 +1,4 @@
-
+