diff --git a/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs b/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs
index 1ea5689e8..c70982498 100644
--- a/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs
+++ b/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs
@@ -24,4 +24,16 @@ public interface IScheduler
///
bool OutOfOrderExecution { get; }
}
+
+ public interface IRunWhileScheduler : IScheduler
+ {
+ ///
+ /// Pumps the scheduler while given condition is satisfied or until timeout elapses.
+ ///
+ /// A delegate to be executed over and over while it returns true.
+ /// Maximum time to spend pumping. Use for no limit.
+ /// If true, throws when timeout elapses; if false, returns false on timeout.
+ /// True if the condition was reached (condition returned false), false if timeout elapsed (when throwOnTimeout is false).
+ bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false);
+ }
}
\ No newline at end of file
diff --git a/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs b/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs
index dc202e17f..b4ce8e595 100644
--- a/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs
+++ b/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
@@ -37,7 +38,7 @@ public int CompareTo(PrioritizedAction? other)
/// Task scheduler that either creates separate thread (via or use current
/// (via ). All enqueued tasks are executed sequentially.
///
- public class SingleThreadScheduler : TaskScheduler, IScheduler
+ public class SingleThreadScheduler : TaskScheduler, IRunWhileScheduler
{
class ActionQueue
{
@@ -159,6 +160,25 @@ private void Run()
}
}
+ public bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false)
+ {
+ var stopwatch = timeout == TimeSpan.MaxValue ? null : Stopwatch.StartNew();
+
+ while (condition())
+ {
+ if (stopwatch != null && stopwatch.Elapsed >= timeout)
+ {
+ if (throwOnTimeout)
+ throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Elapsed}.");
+ return false;
+ }
+
+ ExecuteOneAction(blockIfNoActionAvailable: false);
+ }
+
+ return true;
+ }
+
public bool PumpAndWaitFor(Lifetime lifetime, TimeSpan timeout, Func condition)
{
var shouldPump = IsActive;
diff --git a/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs b/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs
index 7adb7d43b..0eb6eefbb 100644
--- a/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs
+++ b/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Threading.Tasks;
using JetBrains.Lifetimes;
@@ -10,7 +11,7 @@ namespace JetBrains.Collections.Viewable
/// Perfect candidate for
/// if you want to guarantee synchronous continuation
///
- public class SynchronousScheduler : TaskScheduler, IScheduler
+ public class SynchronousScheduler : TaskScheduler, IRunWhileScheduler
{
public static readonly SynchronousScheduler Instance = new SynchronousScheduler();
@@ -41,6 +42,23 @@ private static void Execute(Action action)
public bool IsActive => ourActive > 0;
public bool OutOfOrderExecution => false;
+ public bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false)
+ {
+ // SynchronousScheduler executes actions inline when queued, so by the time
+ // RunWhile is called the condition is typically already satisfied.
+ var stopwatch = timeout == TimeSpan.MaxValue ? null : Stopwatch.StartNew();
+ while (condition())
+ {
+ if (stopwatch != null && stopwatch.Elapsed >= timeout)
+ {
+ if (throwOnTimeout)
+ throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Elapsed}.");
+ return false;
+ }
+ }
+ return true;
+ }
+
#region Implementation of TaskScheduler
diff --git a/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs b/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs
index 84ec8db44..cbb7278d5 100644
--- a/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs
+++ b/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs
@@ -43,7 +43,7 @@ public static Task ToTask(IRdTask task)
///
- /// Sync call which allow nested call execution with help of
+ /// Sync call which allow nested call execution with help of
///
public static TRes SyncNested(RdCall call, TReq request, RpcTimeouts? timeouts = null)
{
@@ -51,7 +51,7 @@ public static TRes SyncNested(RdCall call, TReq request,
}
///
- /// Sync call which allow nested call execution with help of
+ /// Sync call which allow nested call execution with help of
///
public static TRes SyncNested(RdCall call, Lifetime lifetime, TReq request, RpcTimeouts? timeouts = null)
{
@@ -61,37 +61,30 @@ public static TRes SyncNested(RdCall call, Lifetime life
// If you want to mitigate this limitation, keep in mind that if you make a sync call from background thread
// with some small probability your call can be merged in the sync execution of other call. Usually it is not
// desired behaviour as you can accidentally obtain undesired locks.
- call.GetProtoOrThrow().Scheduler.AssertThread();
+ var protocol = call.GetProtoOrThrow();
+ protocol.Scheduler.AssertThread();
- var nestedCallsScheduler = new LifetimeDefinition();
- var responseScheduler = new RdSimpleDispatcher(nestedCallsScheduler.Lifetime, Log.GetLog(call.GetType()));
+ var scheduler = protocol.Scheduler as IRunWhileScheduler;
+ Assertion.Assert(scheduler != null, "Scheduler must implement IRunWhileScheduler for nested calls. Scheduler type: {0}", protocol.Scheduler.GetType());
- using (new SwitchingScheduler.SwitchCookie(responseScheduler))
- {
- var task = call.Start(lifetime, request, responseScheduler);
+ var task = call.Start(lifetime, request, null);
- task.Result.Advise(nestedCallsScheduler.Lifetime, result => { nestedCallsScheduler.Terminate(); });
+ RpcTimeouts timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts);
- RpcTimeouts timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts);
- responseScheduler.MessageTimeout = timeoutsToUse.ErrorAwaitTime;
+ var stopwatch = Stopwatch.StartNew();
- var stopwatch = Stopwatch.StartNew();
- responseScheduler.Run();
- if (!task.Result.HasValue())
- {
- throw new TimeoutException($"Sync execution of rpc `{call.Location}` is timed out in {timeoutsToUse.ErrorAwaitTime.TotalMilliseconds} ms");
- }
+ // Pump messages while waiting for the result (no hard timeout; use thresholds for logging only)
+ scheduler.RunWhile(() => !task.Result.HasValue(), TimeSpan.MaxValue);
- stopwatch.Stop();
+ stopwatch.Stop();
- var freezeTime = stopwatch.ElapsedMilliseconds;
- if (freezeTime > timeoutsToUse.WarnAwaitTime.TotalMilliseconds)
- {
- Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime);
- }
+ var freezeTime = stopwatch.ElapsedMilliseconds;
+ if (freezeTime > timeoutsToUse.ErrorAwaitTime.TotalMilliseconds)
+ Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime);
+ else if (freezeTime > timeoutsToUse.WarnAwaitTime.TotalMilliseconds)
+ Log.Root.Warn("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime);
- return task.Result.Value.Unwrap();
- }
+ return task.Result.Value.Unwrap();
}
public static RpcTimeouts CreateRpcTimeouts(long ticksWarning, long ticksError)
diff --git a/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs b/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs
index 708507a52..3b6fd8f53 100644
--- a/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs
+++ b/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs
@@ -305,8 +305,7 @@ public static void SetHandlerTaskVoid(RdCall endpoint, Func
public static void SetHandler(RdCall endpoint, Func> handler)
{
- var scheduler = new SwitchingScheduler(endpoint);
- endpoint.SetRdTask(handler, scheduler, scheduler);
+ endpoint.SetRdTask(handler);
}
private object? ActivateMember(Type memberType, string memberName)
diff --git a/rd-net/RdFramework.Reflection/SwitchingScheduler.cs b/rd-net/RdFramework.Reflection/SwitchingScheduler.cs
deleted file mode 100644
index a7da94c12..000000000
--- a/rd-net/RdFramework.Reflection/SwitchingScheduler.cs
+++ /dev/null
@@ -1,81 +0,0 @@
-using System;
-using System.Collections.Generic;
-using JetBrains.Collections.Viewable;
-using JetBrains.Diagnostics;
-using JetBrains.Lifetimes;
-using JetBrains.Rd.Base;
-
-namespace JetBrains.Rd.Reflection
-{
- ///
- /// A special scheduler which can be globally temporarily switched to another implementation.
- ///
- public class SwitchingScheduler : IScheduler
- {
- private readonly IRdDynamic myFallbackSchedulerSource;
-
- private static readonly object ourLock = new object();
- private static int ourDisable;
- private static readonly Stack ourSchedulersOverride = new Stack();
-
- public bool IsActive => ActiveScheduler.IsActive;
- public bool OutOfOrderExecution => ActiveScheduler.OutOfOrderExecution;
-
- public IScheduler ActiveScheduler
- {
- get
- {
- IScheduler scheduler;
- lock (ourLock)
- {
- scheduler = myFallbackSchedulerSource.GetProtoOrThrow().Scheduler;
- if (ourSchedulersOverride.Count > 0 && ourDisable == 0)
- scheduler = ourSchedulersOverride.Peek();
- }
-
- return scheduler;
- }
- }
-
- public SwitchingScheduler(IRdDynamic fallbackSchedulerSource)
- {
- myFallbackSchedulerSource = fallbackSchedulerSource;
- }
-
- public void Queue(Action action)
- {
- ActiveScheduler.Queue(action);
- }
-
- public static void Disable(Lifetime time)
- {
- time.Bracket(
- () => { lock (ourLock) ourDisable++; },
- () => { lock (ourLock) ourDisable--; });
- }
-
- public readonly struct SwitchCookie : IDisposable
- {
- ///
- /// Default constructor detector
- ///
- private readonly bool myIsValid;
-
- public SwitchCookie(IScheduler scheduler)
- {
- myIsValid = true;
- lock (ourLock)
- ourSchedulersOverride.Push(scheduler);
- }
-
- public void Dispose()
- {
- if (myIsValid)
- {
- lock (ourLock)
- ourSchedulersOverride.Pop();
- }
- }
- }
- }
-}
diff --git a/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs b/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs
index dfdfde009..4abf0e338 100644
--- a/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs
+++ b/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs
@@ -4,10 +4,11 @@
using JetBrains.Collections.Viewable;
using JetBrains.Diagnostics;
using JetBrains.Lifetimes;
+using JetBrains.Util;
namespace JetBrains.Rd.Impl
{
- public class RdSimpleDispatcher : IScheduler
+ public class RdSimpleDispatcher : IRunWhileScheduler
{
private readonly Lifetime myLifetime;
private readonly ILog myLogger;
@@ -83,10 +84,52 @@ public void Queue(Action action)
myEvent.Set();
}
+ public bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false)
+ {
+ var stopwatch = timeout == TimeSpan.MaxValue ? (LocalStopwatch?)null : LocalStopwatch.StartNew();
+
+ while (condition())
+ {
+ if (stopwatch.HasValue && stopwatch.Value.Elapsed >= timeout)
+ {
+ if (throwOnTimeout)
+ throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Value.Elapsed}.");
+ return false;
+ }
+
+ Action? nextTask = null;
+ lock (myTasks)
+ {
+ if (myTasks.Count > 0)
+ nextTask = myTasks.Dequeue();
+ }
+
+ if (nextTask != null)
+ {
+ try
+ {
+ myLogger.Trace(FormatLogMessage("Process incoming task in RunWhile"));
+ nextTask();
+ }
+ catch (Exception e)
+ {
+ myLogger.Error(e, FormatLogMessage("Exception during RunWhile task processing"));
+ }
+ }
+ else
+ {
+ // Wait for a short time for new tasks
+ var waitTime = 5; // milliseconds
+ myEvent.WaitOne(waitTime);
+ }
+ }
+ return true;
+ }
+
private string FormatLogMessage(string message)
{
if (myId == null) return message;
return $"{myId}: {message}";
- }
+ }
}
}
\ No newline at end of file
diff --git a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs
index 61b9809d6..d4e6c833f 100644
--- a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs
+++ b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs
@@ -46,11 +46,7 @@ public async Task GetLongRunningInt(int arg, Lifetime cancellationLifetime)
public Task AlwaysCancelled()
{
- return Task.Run(() =>
- {
- Thread.Sleep(100);
- throw new OperationCanceledException();
- });
+ return Task.Run(() => throw new OperationCanceledException());
}
}
diff --git a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs
index 4b7dfdeb2..071341069 100644
--- a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs
+++ b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs
@@ -77,8 +77,6 @@ public async Task TestAsync()
[Test, Repeat(10), Description("Repeat test as it reveal cancellation race")]
public async Task TestSyncCall()
{
- SwitchingScheduler.Disable(TestLifetime);
-
await YieldToClient();
var client = CFacade.ActivateProxy(TestLifetime, ClientProtocol);