diff --git a/ThreadPool/ThreadPool/MyThreadPoolTests/MyThreadPoolTests.cs b/ThreadPool/ThreadPool/MyThreadPoolTests/MyThreadPoolTests.cs new file mode 100644 index 0000000..f1553fb --- /dev/null +++ b/ThreadPool/ThreadPool/MyThreadPoolTests/MyThreadPoolTests.cs @@ -0,0 +1,152 @@ +namespace MyThreadPoolTests; + +public class Tests +{ + // + [Test] + public void ShouldOperationCanceledExceptionWhenShutDownIfTaskHasNotStart() + { + var pool = new MyThreadPool.MyThreadPool(1); + var task = pool.Submit(() => { Thread.Sleep(100); return 1; }); + static int ReturnTwo(int lol) => 2; + var continuation = task.ContinueWith(ReturnTwo); + pool.ShutDown(); + int ReturnResult() => continuation.Result; + Assert.Throws(() => ReturnResult()); + } + + [Test] + public void ShouldExpectedIsCancellationRequestedIsEqualTrueWhenShutDown() + { + var pool = new MyThreadPool.MyThreadPool(1); + pool.ShutDown(); + Assert.True(pool.Source.Token.IsCancellationRequested); + } + + private static IEnumerable TestRemoveCaseData() => new TestCaseData[] + { + }; + + private static IEnumerable CaseData() + { + MyThreadPool.MyThreadPool pool = new (10); + var list = new List>(); + var numberOfTasks = 15; + + for (int i = 0; i < numberOfTasks; i++) + { + var locali = i; + list.Add(pool.Submit(() => locali)); + } + + int counter = 0; + foreach (var value in list) + { + yield return new TestCaseData(value.Result, counter, pool); + counter++; + } + } + + // + [TestCaseSource(nameof(CaseData))] + public void ShouldResultIsEqualExpectedValue(int actual, int expected, MyThreadPool.MyThreadPool pool) + { + Assert.That(actual, Is.EqualTo(expected)); + pool.ShutDown(); + } + + // , ( ) + [Test] + public void ShouldNumberOfThreadIsEqualNumberThatWasSentToConstructor() + { + int numberOfThreads = 10; + MyThreadPool.MyThreadPool pool = new(numberOfThreads); + var task = pool.Submit(() => 1); + task = pool.Submit(() => 1); + Assert.That(pool.CountOfThreads, Is.EqualTo(numberOfThreads)); + pool.ShutDown(); + + numberOfThreads = 5; + pool = new(numberOfThreads); + Assert.That(pool.CountOfThreads, Is.EqualTo(numberOfThreads)); + pool.ShutDown(); + + numberOfThreads = 100; + pool = new(numberOfThreads); + task = pool.Submit(() => 1); + int a = task.Result; + Assert.That(pool.CountOfThreads, Is.EqualTo(numberOfThreads)); + pool.ShutDown(); + } + + // , + // 1000 , + // , + /* + [Test] + public void ShouldTheNewTaskWillBeCompletedNoEarlierThanTheOriginalOneIsCompleted() + { + var pool = new MyThreadPool.MyThreadPool(10); + + // > 1000 + var task = pool.Submit(() => { Thread.Sleep(1000); return 1; }); + + // , + + // - , , 100 ( 10 , 1) + static int ReturnTwo(int lol) => 2; + var continuation = task.ContinueWith(returnTwo); + + // 100 ( ShutDown) + // Result + Thread.Sleep(100); + pool.ShutDown(); + + bool isCompleted = false; + + // - . => ( , 100 ) + Assert.That(isCompleted, Is.EqualTo(continuation.IsCompleted)); + }*/ + + // + [Test] + public void ShouldNewTaskIsBeingExecuted() + { + var pool = new MyThreadPool.MyThreadPool(10); + var task = pool.Submit(() => 1); + + static int ReturnTwo(int lol) => 2; + var continuation = task.ContinueWith(ReturnTwo); + int result = 2; + Assert.That(continuation.Result, Is.EqualTo(result)); + + var continuationContinuation = continuation.ContinueWith((x) => x * x); + result = 4; + + Thread.Sleep(10); + Assert.That(continuationContinuation.Result, Is.EqualTo(result)); + pool.ShutDown(); + } + + [Test] + public void ShouldAggregateExceptionWhenDevideByZero() + { + var pool = new MyThreadPool.MyThreadPool(10); + int zero = 0; + var task = pool.Submit(() => 1 / zero); + int ReturnResult() => task.Result; + Assert.Throws(() => ReturnResult()); + pool.ShutDown(); + } + + /* + [Test] + public void ShouldExpectedFalseWhenIsCompletedForCancelledTask() + { + var pool = new MyThreadPool.MyThreadPool(10); + var task = pool.Submit(() => 1); + pool.ShutDown(); + bool result = false; + Assert.That(result, Is.EqualTo(task.IsCompleted)); + }*/ +} diff --git a/ThreadPool/ThreadPool/MyThreadPoolTests/MyThreadPoolTests.csproj b/ThreadPool/ThreadPool/MyThreadPoolTests/MyThreadPoolTests.csproj new file mode 100644 index 0000000..16e571d --- /dev/null +++ b/ThreadPool/ThreadPool/MyThreadPoolTests/MyThreadPoolTests.csproj @@ -0,0 +1,25 @@ + + + + net6.0 + enable + enable + + false + + Library + + + + + + + + + + + + + + + diff --git a/ThreadPool/ThreadPool/MyThreadPoolTests/Usings.cs b/ThreadPool/ThreadPool/MyThreadPoolTests/Usings.cs new file mode 100644 index 0000000..cefced4 --- /dev/null +++ b/ThreadPool/ThreadPool/MyThreadPoolTests/Usings.cs @@ -0,0 +1 @@ +global using NUnit.Framework; \ No newline at end of file diff --git a/ThreadPool/ThreadPool/ThreadPool.sln b/ThreadPool/ThreadPool/ThreadPool.sln new file mode 100644 index 0000000..216fddf --- /dev/null +++ b/ThreadPool/ThreadPool/ThreadPool.sln @@ -0,0 +1,31 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.2.32505.173 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ThreadPool", "ThreadPool\ThreadPool.csproj", "{2855D0B1-E08B-447E-8F19-04D6B93B842B}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "MyThreadPoolTests", "MyThreadPoolTests\MyThreadPoolTests.csproj", "{1700B832-9222-4906-9C1C-55450298057B}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2855D0B1-E08B-447E-8F19-04D6B93B842B}.Release|Any CPU.Build.0 = Release|Any CPU + {1700B832-9222-4906-9C1C-55450298057B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {1700B832-9222-4906-9C1C-55450298057B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {1700B832-9222-4906-9C1C-55450298057B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {1700B832-9222-4906-9C1C-55450298057B}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {341821D8-69C8-40C8-8ACF-1DED0D7FD0ED} + EndGlobalSection +EndGlobal diff --git a/ThreadPool/ThreadPool/ThreadPool/IMyTask.cs b/ThreadPool/ThreadPool/ThreadPool/IMyTask.cs new file mode 100644 index 0000000..488c824 --- /dev/null +++ b/ThreadPool/ThreadPool/ThreadPool/IMyTask.cs @@ -0,0 +1,26 @@ +namespace MyThreadPool; + +/// +/// Interface for tasks accepted for execution +/// +/// Type of return value +public interface IMyTask +{ + /// + /// Returns true if the task is completed + /// + public bool IsCompleted { get; } + + /// + /// Returns the result of the task execution + /// + public TResult Result { get; } + + /// + /// + /// + /// + /// + /// + public IMyTask ContinueWith(Func func); +} diff --git a/ThreadPool/ThreadPool/ThreadPool/MyTask.cs b/ThreadPool/ThreadPool/ThreadPool/MyTask.cs new file mode 100644 index 0000000..d28bbc7 --- /dev/null +++ b/ThreadPool/ThreadPool/ThreadPool/MyTask.cs @@ -0,0 +1,142 @@ +namespace MyThreadPool; + +using System.Collections.Concurrent; + +public class MyTask : IMyTask +{ + private readonly Func function; + + private readonly Object lockObject = new(); + + private Exception? exception = null; + + private readonly ConcurrentQueue queueOfTasksToComplete = new(); + + private volatile bool isCompleted; + + private TResult? result; + + private readonly MyThreadPool threadPool; + + public MyTask(Func function, MyThreadPool threadPool) + { + this.function = function; + this.threadPool = threadPool; + } + + /// + public bool IsCompleted => isCompleted; + + public void Work() + { + if (threadPool.Source.Token.IsCancellationRequested) + { + lock (lockObject) + { + if (threadPool.Source.Token.IsCancellationRequested) + { + Monitor.Pulse(lockObject); + } + } + + return; + } + + try + { + result = function(); + } + catch (Exception ex) + { + exception = ex; + } + finally + { + isCompleted = true; + } + + lock (lockObject) + { + // Сообщаем потоку, ждущему на Monitor.Wait, что задача вполнена + Monitor.Pulse(lockObject); + queueOfTasksToComplete.TryDequeue(out Action? result); + if (result != null) + { + threadPool.QueueWorkItem(result!); + } + } + } + + /// + public TResult Result + { + get + { + + if (isCompleted) + { + // Если соответствующий задаче метод завершился с исключением, то бросаем AggregateException + if (exception != null) + { + throw new AggregateException(exception); + } + + return result!; + } + + threadPool.Source.Token.ThrowIfCancellationRequested(); + + // Если результат еще не посчитан + if (!isCompleted) + { + lock (lockObject) + { + // Ждём когда рузльтат будет вычислен, блокируя при этом поток + while (!threadPool.Source.Token.IsCancellationRequested && !IsCompleted) + { + Monitor.Wait(lockObject); + } + + threadPool.Source.Token.ThrowIfCancellationRequested(); + } + } + + // Если соответствующий задаче метод завершился с исключением, то бросаем AggregateException + if (exception != null) + { + throw new AggregateException(exception); + } + + return result!; + } + + private set { } + } + + /// + public IMyTask ContinueWith(Func func) + { + var task = new MyTask(() => func(Result), threadPool); + if (isCompleted) + { + threadPool.QueueForTaskItem(task); + return task; + } + + lock (lockObject) + { + if (isCompleted) + { + threadPool.QueueForTaskItem(task); + return task; + } + else + { + // выгружаем в очередь + queueOfTasksToComplete.Enqueue(task.Work); + } + } + + return task; + } +} \ No newline at end of file diff --git a/ThreadPool/ThreadPool/ThreadPool/MyThreadPool.cs b/ThreadPool/ThreadPool/ThreadPool/MyThreadPool.cs new file mode 100644 index 0000000..ed4a880 --- /dev/null +++ b/ThreadPool/ThreadPool/ThreadPool/MyThreadPool.cs @@ -0,0 +1,152 @@ +namespace MyThreadPool; + +/// +/// Class for implementing a thread pool +/// +public class MyThreadPool : IDisposable +{ + // Queue for storing tasks in a pool + private readonly Queue queue = new(); + + // object for sending a cancellation signal to the CancellationToken token + private readonly CancellationTokenSource source = new(); + + /// + /// Get CancellationTokenSource + /// + public CancellationTokenSource Source => source; + + private bool disposed; + + // List for storing pool threads + private readonly List threads = new(); + + public int CountOfThreads => threads.Count; + + // Method that puts the method in the execution queue + public void QueueWorkItem(Action func) + { + lock (queue) + { + // Добавляем метод в очередь + queue.Enqueue(func); + + // Сообщаем потоку, ждущему на Monitor.Wait, что нужно выполнить метод + Monitor.Pulse(queue); + } + } + + // Method that puts the method in the execution queue + public void QueueForTaskItem(MyTask task) + { + QueueWorkItem(task.Work); + } + + /// + /// Method for returning a task to be accepted for execution. + /// + /// Type of return value + /// Method + /// Task presented as an IMyTask interface + public IMyTask Submit(Func func) + { + // Создаем объект класса MyTask + var task = new MyTask(func, this); + + // Ставим задачу на исполнение в пул потоков + QueueWorkItem(task.Work); + + return task; + } + + /// + /// Constructor of the MyThreadPool class + /// + /// Number of threads in the pool + public MyThreadPool(int n) + { + for (int i = 0; i < n; i++) + { + threads.Add(new Thread(() => + { + Action? result; + // Пока операция не была отменена - выполняем работу + while (!source.Token.IsCancellationRequested) + { + lock (queue) + { + while (queue.Count == 0 && !source.Token.IsCancellationRequested) + { + // Ждем пока очередь станет непустой или операция будет отменена + Monitor.Wait(queue); + } + + // Берем из очереди метод соответствующий задаче + queue.TryDequeue(out result); + } + + if (result != null) + { + // Исполняем метод + result!(); + } + } + })); + } + + // При создании объекта в нем должно начать работу n потоков + foreach(var thread in threads) + { + thread.Start(); + } + } + + /// + /// Method for shutting down threads + /// + public void ShutDown() + { + lock(queue) + { + // Должны вернуть управление когда остановятся все потоки + // Хотим оставноить рабочий поток + source.Cancel(); + + // Сообщаем об этом потоку, который может ждать на Monitor.Wait отмены операции + Monitor.PulseAll(queue); + } + + + // Блокируем поток до завершения потоков + foreach (var thread in threads) + { + thread.Join(); + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + protected virtual void Dispose(bool disposing) + { + if (this.disposed) + { + return; + } + + if (disposing) + { + this.ShutDown(); + } + + disposed = true; + } + + ~MyThreadPool() + { + Dispose(false); + } +} diff --git a/ThreadPool/ThreadPool/ThreadPool/ThreadPool.csproj b/ThreadPool/ThreadPool/ThreadPool/ThreadPool.csproj new file mode 100644 index 0000000..644c780 --- /dev/null +++ b/ThreadPool/ThreadPool/ThreadPool/ThreadPool.csproj @@ -0,0 +1,10 @@ + + + + Library + net6.0 + enable + enable + + +