From 945da1fc83c98e63415ad0175983f450b7f1b62c Mon Sep 17 00:00:00 2001 From: Alexander Ulitin Date: Mon, 23 Feb 2026 14:07:38 +0100 Subject: [PATCH 1/2] rd-reflection: better pumping on sync calls There is currently a race condition with the switching scheduler approach. If two calls start simultaneously from both ends with a small time gap, then a deadlock is possible. One way to address this issue is to introduce explicit pumping behavior for certain schedulers. This change is required for ReSharper OOP. The schedulers defined in Rd are modified only for testing purposes. --- .../Collections/Viewable/IScheduler.cs | 12 +++ .../Viewable/SingleThreadScheduler.cs | 22 ++++- .../Viewable/SynchronousScheduler.cs | 20 ++++- .../ProxyGeneratorUtil.cs | 43 +++++----- .../ReflectionRdActivator.cs | 3 +- .../SwitchingScheduler.cs | 81 ------------------- rd-net/RdFramework/Impl/RdSimpleDispatcher.cs | 47 ++++++++++- .../Reflection/ProxyGeneratorModelTest.cs | 2 - 8 files changed, 116 insertions(+), 114 deletions(-) delete mode 100644 rd-net/RdFramework.Reflection/SwitchingScheduler.cs diff --git a/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs b/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs index 1ea5689e8..c70982498 100644 --- a/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs +++ b/rd-net/Lifetimes/Collections/Viewable/IScheduler.cs @@ -24,4 +24,16 @@ public interface IScheduler /// bool OutOfOrderExecution { get; } } + + public interface IRunWhileScheduler : IScheduler + { + /// + /// Pumps the scheduler while given condition is satisfied or until timeout elapses. + /// + /// A delegate to be executed over and over while it returns true. + /// Maximum time to spend pumping. Use for no limit. + /// If true, throws when timeout elapses; if false, returns false on timeout. + /// True if the condition was reached (condition returned false), false if timeout elapsed (when throwOnTimeout is false). + bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false); + } } \ No newline at end of file diff --git a/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs b/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs index dc202e17f..b4ce8e595 100644 --- a/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs +++ b/rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using JetBrains.Annotations; @@ -37,7 +38,7 @@ public int CompareTo(PrioritizedAction? other) /// Task scheduler that either creates separate thread (via or use current /// (via ). All enqueued tasks are executed sequentially. /// - public class SingleThreadScheduler : TaskScheduler, IScheduler + public class SingleThreadScheduler : TaskScheduler, IRunWhileScheduler { class ActionQueue { @@ -159,6 +160,25 @@ private void Run() } } + public bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false) + { + var stopwatch = timeout == TimeSpan.MaxValue ? null : Stopwatch.StartNew(); + + while (condition()) + { + if (stopwatch != null && stopwatch.Elapsed >= timeout) + { + if (throwOnTimeout) + throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Elapsed}."); + return false; + } + + ExecuteOneAction(blockIfNoActionAvailable: false); + } + + return true; + } + public bool PumpAndWaitFor(Lifetime lifetime, TimeSpan timeout, Func condition) { var shouldPump = IsActive; diff --git a/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs b/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs index 7adb7d43b..0eb6eefbb 100644 --- a/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs +++ b/rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Threading.Tasks; using JetBrains.Lifetimes; @@ -10,7 +11,7 @@ namespace JetBrains.Collections.Viewable /// Perfect candidate for /// if you want to guarantee synchronous continuation /// - public class SynchronousScheduler : TaskScheduler, IScheduler + public class SynchronousScheduler : TaskScheduler, IRunWhileScheduler { public static readonly SynchronousScheduler Instance = new SynchronousScheduler(); @@ -41,6 +42,23 @@ private static void Execute(Action action) public bool IsActive => ourActive > 0; public bool OutOfOrderExecution => false; + public bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false) + { + // SynchronousScheduler executes actions inline when queued, so by the time + // RunWhile is called the condition is typically already satisfied. + var stopwatch = timeout == TimeSpan.MaxValue ? null : Stopwatch.StartNew(); + while (condition()) + { + if (stopwatch != null && stopwatch.Elapsed >= timeout) + { + if (throwOnTimeout) + throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Elapsed}."); + return false; + } + } + return true; + } + #region Implementation of TaskScheduler diff --git a/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs b/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs index 84ec8db44..cbb7278d5 100644 --- a/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs +++ b/rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs @@ -43,7 +43,7 @@ public static Task ToTask(IRdTask task) /// - /// Sync call which allow nested call execution with help of + /// Sync call which allow nested call execution with help of /// public static TRes SyncNested(RdCall call, TReq request, RpcTimeouts? timeouts = null) { @@ -51,7 +51,7 @@ public static TRes SyncNested(RdCall call, TReq request, } /// - /// Sync call which allow nested call execution with help of + /// Sync call which allow nested call execution with help of /// public static TRes SyncNested(RdCall call, Lifetime lifetime, TReq request, RpcTimeouts? timeouts = null) { @@ -61,37 +61,30 @@ public static TRes SyncNested(RdCall call, Lifetime life // If you want to mitigate this limitation, keep in mind that if you make a sync call from background thread // with some small probability your call can be merged in the sync execution of other call. Usually it is not // desired behaviour as you can accidentally obtain undesired locks. - call.GetProtoOrThrow().Scheduler.AssertThread(); + var protocol = call.GetProtoOrThrow(); + protocol.Scheduler.AssertThread(); - var nestedCallsScheduler = new LifetimeDefinition(); - var responseScheduler = new RdSimpleDispatcher(nestedCallsScheduler.Lifetime, Log.GetLog(call.GetType())); + var scheduler = protocol.Scheduler as IRunWhileScheduler; + Assertion.Assert(scheduler != null, "Scheduler must implement IRunWhileScheduler for nested calls. Scheduler type: {0}", protocol.Scheduler.GetType()); - using (new SwitchingScheduler.SwitchCookie(responseScheduler)) - { - var task = call.Start(lifetime, request, responseScheduler); + var task = call.Start(lifetime, request, null); - task.Result.Advise(nestedCallsScheduler.Lifetime, result => { nestedCallsScheduler.Terminate(); }); + RpcTimeouts timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts); - RpcTimeouts timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts); - responseScheduler.MessageTimeout = timeoutsToUse.ErrorAwaitTime; + var stopwatch = Stopwatch.StartNew(); - var stopwatch = Stopwatch.StartNew(); - responseScheduler.Run(); - if (!task.Result.HasValue()) - { - throw new TimeoutException($"Sync execution of rpc `{call.Location}` is timed out in {timeoutsToUse.ErrorAwaitTime.TotalMilliseconds} ms"); - } + // Pump messages while waiting for the result (no hard timeout; use thresholds for logging only) + scheduler.RunWhile(() => !task.Result.HasValue(), TimeSpan.MaxValue); - stopwatch.Stop(); + stopwatch.Stop(); - var freezeTime = stopwatch.ElapsedMilliseconds; - if (freezeTime > timeoutsToUse.WarnAwaitTime.TotalMilliseconds) - { - Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime); - } + var freezeTime = stopwatch.ElapsedMilliseconds; + if (freezeTime > timeoutsToUse.ErrorAwaitTime.TotalMilliseconds) + Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime); + else if (freezeTime > timeoutsToUse.WarnAwaitTime.TotalMilliseconds) + Log.Root.Warn("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime); - return task.Result.Value.Unwrap(); - } + return task.Result.Value.Unwrap(); } public static RpcTimeouts CreateRpcTimeouts(long ticksWarning, long ticksError) diff --git a/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs b/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs index 708507a52..3b6fd8f53 100644 --- a/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs +++ b/rd-net/RdFramework.Reflection/ReflectionRdActivator.cs @@ -305,8 +305,7 @@ public static void SetHandlerTaskVoid(RdCall endpoint, Func
  • public static void SetHandler(RdCall endpoint, Func> handler) { - var scheduler = new SwitchingScheduler(endpoint); - endpoint.SetRdTask(handler, scheduler, scheduler); + endpoint.SetRdTask(handler); } private object? ActivateMember(Type memberType, string memberName) diff --git a/rd-net/RdFramework.Reflection/SwitchingScheduler.cs b/rd-net/RdFramework.Reflection/SwitchingScheduler.cs deleted file mode 100644 index a7da94c12..000000000 --- a/rd-net/RdFramework.Reflection/SwitchingScheduler.cs +++ /dev/null @@ -1,81 +0,0 @@ -using System; -using System.Collections.Generic; -using JetBrains.Collections.Viewable; -using JetBrains.Diagnostics; -using JetBrains.Lifetimes; -using JetBrains.Rd.Base; - -namespace JetBrains.Rd.Reflection -{ - /// - /// A special scheduler which can be globally temporarily switched to another implementation. - /// - public class SwitchingScheduler : IScheduler - { - private readonly IRdDynamic myFallbackSchedulerSource; - - private static readonly object ourLock = new object(); - private static int ourDisable; - private static readonly Stack ourSchedulersOverride = new Stack(); - - public bool IsActive => ActiveScheduler.IsActive; - public bool OutOfOrderExecution => ActiveScheduler.OutOfOrderExecution; - - public IScheduler ActiveScheduler - { - get - { - IScheduler scheduler; - lock (ourLock) - { - scheduler = myFallbackSchedulerSource.GetProtoOrThrow().Scheduler; - if (ourSchedulersOverride.Count > 0 && ourDisable == 0) - scheduler = ourSchedulersOverride.Peek(); - } - - return scheduler; - } - } - - public SwitchingScheduler(IRdDynamic fallbackSchedulerSource) - { - myFallbackSchedulerSource = fallbackSchedulerSource; - } - - public void Queue(Action action) - { - ActiveScheduler.Queue(action); - } - - public static void Disable(Lifetime time) - { - time.Bracket( - () => { lock (ourLock) ourDisable++; }, - () => { lock (ourLock) ourDisable--; }); - } - - public readonly struct SwitchCookie : IDisposable - { - /// - /// Default constructor detector - /// - private readonly bool myIsValid; - - public SwitchCookie(IScheduler scheduler) - { - myIsValid = true; - lock (ourLock) - ourSchedulersOverride.Push(scheduler); - } - - public void Dispose() - { - if (myIsValid) - { - lock (ourLock) - ourSchedulersOverride.Pop(); - } - } - } - } -} diff --git a/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs b/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs index dfdfde009..4abf0e338 100644 --- a/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs +++ b/rd-net/RdFramework/Impl/RdSimpleDispatcher.cs @@ -4,10 +4,11 @@ using JetBrains.Collections.Viewable; using JetBrains.Diagnostics; using JetBrains.Lifetimes; +using JetBrains.Util; namespace JetBrains.Rd.Impl { - public class RdSimpleDispatcher : IScheduler + public class RdSimpleDispatcher : IRunWhileScheduler { private readonly Lifetime myLifetime; private readonly ILog myLogger; @@ -83,10 +84,52 @@ public void Queue(Action action) myEvent.Set(); } + public bool RunWhile(Func condition, TimeSpan timeout, bool throwOnTimeout = false) + { + var stopwatch = timeout == TimeSpan.MaxValue ? (LocalStopwatch?)null : LocalStopwatch.StartNew(); + + while (condition()) + { + if (stopwatch.HasValue && stopwatch.Value.Elapsed >= timeout) + { + if (throwOnTimeout) + throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Value.Elapsed}."); + return false; + } + + Action? nextTask = null; + lock (myTasks) + { + if (myTasks.Count > 0) + nextTask = myTasks.Dequeue(); + } + + if (nextTask != null) + { + try + { + myLogger.Trace(FormatLogMessage("Process incoming task in RunWhile")); + nextTask(); + } + catch (Exception e) + { + myLogger.Error(e, FormatLogMessage("Exception during RunWhile task processing")); + } + } + else + { + // Wait for a short time for new tasks + var waitTime = 5; // milliseconds + myEvent.WaitOne(waitTime); + } + } + return true; + } + private string FormatLogMessage(string message) { if (myId == null) return message; return $"{myId}: {message}"; - } + } } } \ No newline at end of file diff --git a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs index 4b7dfdeb2..071341069 100644 --- a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs +++ b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs @@ -77,8 +77,6 @@ public async Task TestAsync() [Test, Repeat(10), Description("Repeat test as it reveal cancellation race")] public async Task TestSyncCall() { - SwitchingScheduler.Disable(TestLifetime); - await YieldToClient(); var client = CFacade.ActivateProxy(TestLifetime, ClientProtocol); From a55fb9dede954b268ed0984209c85580c37ac475 Mon Sep 17 00:00:00 2001 From: Alexander Ulitin Date: Mon, 23 Feb 2026 14:48:03 +0100 Subject: [PATCH 2/2] rd-reflection: flaky TestAsyncExternalCancellation test --- .../Reflection/ProxyGeneratorCancellationTest.cs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs index 61b9809d6..d4e6c833f 100644 --- a/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs +++ b/rd-net/Test.RdFramework/Reflection/ProxyGeneratorCancellationTest.cs @@ -46,11 +46,7 @@ public async Task GetLongRunningInt(int arg, Lifetime cancellationLifetime) public Task AlwaysCancelled() { - return Task.Run(() => - { - Thread.Sleep(100); - throw new OperationCanceledException(); - }); + return Task.Run(() => throw new OperationCanceledException()); } }