Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions rd-net/Lifetimes/Collections/Viewable/IScheduler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,16 @@ public interface IScheduler
/// </summary>
bool OutOfOrderExecution { get; }
}

public interface IRunWhileScheduler : IScheduler
{
/// <summary>
/// Pumps the scheduler while given condition is satisfied or until timeout elapses.
/// </summary>
/// <param name="condition">A delegate to be executed over and over while it returns true.</param>
/// <param name="timeout">Maximum time to spend pumping. Use <see cref="TimeSpan.MaxValue"/> for no limit.</param>
/// <param name="throwOnTimeout">If true, throws when timeout elapses; if false, returns false on timeout.</param>
/// <returns>True if the condition was reached (condition returned false), false if timeout elapsed (when throwOnTimeout is false).</returns>
bool RunWhile(Func<bool> condition, TimeSpan timeout, bool throwOnTimeout = false);
}
}
22 changes: 21 additions & 1 deletion rd-net/Lifetimes/Collections/Viewable/SingleThreadScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
Expand Down Expand Up @@ -37,7 +38,7 @@ public int CompareTo(PrioritizedAction? other)
/// Task scheduler that either creates separate thread (via <see cref="RunOnSeparateThread"/> or use current
/// (via <see cref="CreateOverExisting"/>). All enqueued tasks are executed sequentially.
/// </summary>
public class SingleThreadScheduler : TaskScheduler, IScheduler
public class SingleThreadScheduler : TaskScheduler, IRunWhileScheduler
{
class ActionQueue
{
Expand Down Expand Up @@ -159,6 +160,25 @@ private void Run()
}
}

public bool RunWhile(Func<bool> condition, TimeSpan timeout, bool throwOnTimeout = false)
{
var stopwatch = timeout == TimeSpan.MaxValue ? null : Stopwatch.StartNew();

while (condition())
{
if (stopwatch != null && stopwatch.Elapsed >= timeout)
{
if (throwOnTimeout)
throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Elapsed}.");
return false;
}

ExecuteOneAction(blockIfNoActionAvailable: false);
}

return true;
}

public bool PumpAndWaitFor(Lifetime lifetime, TimeSpan timeout, Func<bool> condition)
{
var shouldPump = IsActive;
Expand Down
20 changes: 19 additions & 1 deletion rd-net/Lifetimes/Collections/Viewable/SynchronousScheduler.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading.Tasks;
using JetBrains.Lifetimes;

Expand All @@ -10,7 +11,7 @@ namespace JetBrains.Collections.Viewable
/// Perfect candidate for <see cref="Task.ContinueWith(System.Action{System.Threading.Tasks.Task,object},object)"/>
/// if you want to guarantee synchronous continuation
/// </summary>
public class SynchronousScheduler : TaskScheduler, IScheduler
public class SynchronousScheduler : TaskScheduler, IRunWhileScheduler
{
public static readonly SynchronousScheduler Instance = new SynchronousScheduler();

Expand Down Expand Up @@ -41,6 +42,23 @@ private static void Execute(Action action)
public bool IsActive => ourActive > 0;
public bool OutOfOrderExecution => false;

public bool RunWhile(Func<bool> condition, TimeSpan timeout, bool throwOnTimeout = false)
{
// SynchronousScheduler executes actions inline when queued, so by the time
// RunWhile is called the condition is typically already satisfied.
var stopwatch = timeout == TimeSpan.MaxValue ? null : Stopwatch.StartNew();
while (condition())
{
if (stopwatch != null && stopwatch.Elapsed >= timeout)
{
if (throwOnTimeout)
throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Elapsed}.");
return false;
}
}
return true;
}



#region Implementation of TaskScheduler
Expand Down
43 changes: 18 additions & 25 deletions rd-net/RdFramework.Reflection/ProxyGeneratorUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ public static Task<T> ToTask<T>(IRdTask<T> task)


/// <summary>
/// Sync call which allow nested call execution with help of <see cref="SwitchingScheduler"/>
/// Sync call which allow nested call execution with help of <see cref="IRunWhileScheduler"/>
/// </summary>
public static TRes SyncNested<TReq, TRes>(RdCall<TReq, TRes> call, TReq request, RpcTimeouts? timeouts = null)
{
return SyncNested(call, Lifetime.Eternal, request, timeouts);
}

/// <summary>
/// Sync call which allow nested call execution with help of <see cref="SwitchingScheduler"/>
/// Sync call which allow nested call execution with help of <see cref="IRunWhileScheduler"/>
/// </summary>
public static TRes SyncNested<TReq, TRes>(RdCall<TReq, TRes> call, Lifetime lifetime, TReq request, RpcTimeouts? timeouts = null)
{
Expand All @@ -61,37 +61,30 @@ public static TRes SyncNested<TReq, TRes>(RdCall<TReq, TRes> call, Lifetime life
// If you want to mitigate this limitation, keep in mind that if you make a sync call from background thread
// with some small probability your call can be merged in the sync execution of other call. Usually it is not
// desired behaviour as you can accidentally obtain undesired locks.
call.GetProtoOrThrow().Scheduler.AssertThread();
var protocol = call.GetProtoOrThrow();
protocol.Scheduler.AssertThread();

var nestedCallsScheduler = new LifetimeDefinition();
var responseScheduler = new RdSimpleDispatcher(nestedCallsScheduler.Lifetime, Log.GetLog(call.GetType()));
var scheduler = protocol.Scheduler as IRunWhileScheduler;
Assertion.Assert(scheduler != null, "Scheduler must implement IRunWhileScheduler for nested calls. Scheduler type: {0}", protocol.Scheduler.GetType());

using (new SwitchingScheduler.SwitchCookie(responseScheduler))
{
var task = call.Start(lifetime, request, responseScheduler);
var task = call.Start(lifetime, request, null);

task.Result.Advise(nestedCallsScheduler.Lifetime, result => { nestedCallsScheduler.Terminate(); });
RpcTimeouts timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts);

RpcTimeouts timeoutsToUse = RpcTimeouts.GetRpcTimeouts(timeouts);
responseScheduler.MessageTimeout = timeoutsToUse.ErrorAwaitTime;
var stopwatch = Stopwatch.StartNew();

var stopwatch = Stopwatch.StartNew();
responseScheduler.Run();
if (!task.Result.HasValue())
{
throw new TimeoutException($"Sync execution of rpc `{call.Location}` is timed out in {timeoutsToUse.ErrorAwaitTime.TotalMilliseconds} ms");
}
// Pump messages while waiting for the result (no hard timeout; use thresholds for logging only)
scheduler.RunWhile(() => !task.Result.HasValue(), TimeSpan.MaxValue);

stopwatch.Stop();
stopwatch.Stop();

var freezeTime = stopwatch.ElapsedMilliseconds;
if (freezeTime > timeoutsToUse.WarnAwaitTime.TotalMilliseconds)
{
Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime);
}
var freezeTime = stopwatch.ElapsedMilliseconds;
if (freezeTime > timeoutsToUse.ErrorAwaitTime.TotalMilliseconds)
Log.Root.Error("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime);
else if (freezeTime > timeoutsToUse.WarnAwaitTime.TotalMilliseconds)
Log.Root.Warn("Sync execution of rpc `{0}` executed too long: {1} ms", call.Location, freezeTime);

return task.Result.Value.Unwrap();
}
return task.Result.Value.Unwrap();
}

public static RpcTimeouts CreateRpcTimeouts(long ticksWarning, long ticksError)
Expand Down
3 changes: 1 addition & 2 deletions rd-net/RdFramework.Reflection/ReflectionRdActivator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,7 @@ public static void SetHandlerTaskVoid<TReq>(RdCall<TReq, Unit> endpoint, Func<Li
/// </summary>
public static void SetHandler<TReq, TRes>(RdCall<TReq, TRes> endpoint, Func<Lifetime, TReq, RdTask<TRes>> handler)
{
var scheduler = new SwitchingScheduler(endpoint);
endpoint.SetRdTask(handler, scheduler, scheduler);
endpoint.SetRdTask(handler);
}

private object? ActivateMember(Type memberType, string memberName)
Expand Down
81 changes: 0 additions & 81 deletions rd-net/RdFramework.Reflection/SwitchingScheduler.cs

This file was deleted.

47 changes: 45 additions & 2 deletions rd-net/RdFramework/Impl/RdSimpleDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
using JetBrains.Collections.Viewable;
using JetBrains.Diagnostics;
using JetBrains.Lifetimes;
using JetBrains.Util;

namespace JetBrains.Rd.Impl
{
public class RdSimpleDispatcher : IScheduler
public class RdSimpleDispatcher : IRunWhileScheduler
{
private readonly Lifetime myLifetime;
private readonly ILog myLogger;
Expand Down Expand Up @@ -83,10 +84,52 @@ public void Queue(Action action)
myEvent.Set();
}

public bool RunWhile(Func<bool> condition, TimeSpan timeout, bool throwOnTimeout = false)
{
var stopwatch = timeout == TimeSpan.MaxValue ? (LocalStopwatch?)null : LocalStopwatch.StartNew();

while (condition())
{
if (stopwatch.HasValue && stopwatch.Value.Elapsed >= timeout)
{
if (throwOnTimeout)
throw new TimeoutException($"RunWhile timed out after {timeout}. Elapsed: {stopwatch.Value.Elapsed}.");
return false;
}

Action? nextTask = null;
lock (myTasks)
{
if (myTasks.Count > 0)
nextTask = myTasks.Dequeue();
}

if (nextTask != null)
{
try
{
myLogger.Trace(FormatLogMessage("Process incoming task in RunWhile"));
nextTask();
}
catch (Exception e)
{
myLogger.Error(e, FormatLogMessage("Exception during RunWhile task processing"));
}
}
else
{
// Wait for a short time for new tasks
var waitTime = 5; // milliseconds
myEvent.WaitOne(waitTime);
}
}
return true;
}

private string FormatLogMessage(string message)
{
if (myId == null) return message;
return $"{myId}: {message}";
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@ public async Task<int> GetLongRunningInt(int arg, Lifetime cancellationLifetime)

public Task AlwaysCancelled()
{
return Task.Run(() =>
{
Thread.Sleep(100);
throw new OperationCanceledException();
});
return Task.Run(() => throw new OperationCanceledException());
}
}

Expand Down
2 changes: 0 additions & 2 deletions rd-net/Test.RdFramework/Reflection/ProxyGeneratorModelTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ public async Task TestAsync()
[Test, Repeat(10), Description("Repeat test as it reveal cancellation race")]
public async Task TestSyncCall()
{
SwitchingScheduler.Disable(TestLifetime);

await YieldToClient();
var client = CFacade.ActivateProxy<IModelOwner>(TestLifetime, ClientProtocol);

Expand Down
Loading