From 3de5f507ecae7f1f4133c0e27b15c4c2e23368aa Mon Sep 17 00:00:00 2001 From: Stepan Tarasevich Date: Tue, 21 Nov 2017 17:42:57 +0300 Subject: [PATCH 1/2] Make EDT accept suppliers and return asyncs --- .../base/edt/AwtEventDispatchThread.java | 10 ++- .../jetpad/base/edt/DefaultAsyncEdt.java | 18 ++++ .../jetpad/base/edt/EdtManagerPool.java | 13 ++- .../jetpad/base/edt/EventDispatchThread.java | 7 +- .../jetpad/base/edt/ExecutorEdtManager.java | 46 ++++++++-- .../base/edt/JsEventDispatchThread.java | 10 ++- .../base/edt/NullEventDispatchThread.java | 9 +- .../jetpad/base/edt/RunnableWithAsync.java | 83 +++++++++++++++++++ .../jetpad/base/edt/RunningEdtManager.java | 26 +++--- .../jetpad/base/edt/BufferingEdtManager.java | 5 +- .../base/edt/BufferingEdtManagerTest.java | 16 +++- .../jetpad/base/edt/ConcurrentTestEdt.java | 18 +++- .../jetpad/base/edt/EdtTestUtil.java | 66 +++++++++++++++ .../base/edt/ExecutorEdtManagerTest.java | 76 ++++++++++++++++- .../base/edt/RunningEdtManagerTest.java | 18 +++- .../jetpad/base/edt/TestEdtManager.java | 2 +- .../base/edt/TestEventDispatchThread.java | 36 ++++++-- .../base/edt/TestEventDispatchThreadTest.java | 33 +++++++- .../jetbrains/jetpad/test/BaseTestCase.java | 2 +- 19 files changed, 443 insertions(+), 51 deletions(-) create mode 100644 util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java create mode 100644 util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java create mode 100644 util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/AwtEventDispatchThread.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/AwtEventDispatchThread.java index 3d7ae569..1906f1e2 100644 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/AwtEventDispatchThread.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/AwtEventDispatchThread.java @@ -15,6 +15,7 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; import javax.swing.SwingUtilities; @@ -22,7 +23,7 @@ import java.awt.event.ActionEvent; import java.awt.event.ActionListener; -public final class AwtEventDispatchThread implements EventDispatchThread { +public final class AwtEventDispatchThread extends DefaultAsyncEdt { public static final AwtEventDispatchThread INSTANCE = new AwtEventDispatchThread(); private AwtEventDispatchThread() { @@ -34,8 +35,9 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(Runnable r) { - SwingUtilities.invokeLater(r); + protected Async asyncSchedule(RunnableWithAsync runnableWithAsync) { + SwingUtilities.invokeLater(runnableWithAsync); + return runnableWithAsync; } @Override @@ -75,4 +77,4 @@ protected void doRemove() { } }; } -} \ No newline at end of file +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java new file mode 100644 index 00000000..d2a07291 --- /dev/null +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java @@ -0,0 +1,18 @@ +package jetbrains.jetpad.base.edt; + +import jetbrains.jetpad.base.Async; +import jetbrains.jetpad.base.function.Supplier; + +public abstract class DefaultAsyncEdt implements EventDispatchThread { + @Override + public final Async schedule(Runnable r) throws EdtException { + return asyncSchedule(RunnableWithAsync.fromRunnable(r)); + } + + @Override + public final Async schedule(Supplier s) throws EdtException { + return asyncSchedule(RunnableWithAsync.fromSupplier(s)); + } + + protected abstract Async asyncSchedule(RunnableWithAsync runnableWithAsync); +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java index ae9f0f39..6e8698d6 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java @@ -15,7 +15,9 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; +import jetbrains.jetpad.base.function.Supplier; import java.util.concurrent.CountDownLatch; import java.util.logging.Logger; @@ -156,8 +158,13 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(Runnable runnable) { - myManager.getEdt().schedule(runnable); + public Async schedule(Runnable runnable) { + return myManager.getEdt().schedule(runnable); + } + + @Override + public Async schedule(Supplier s) { + return myManager.getEdt().schedule(s); } @Override @@ -176,4 +183,4 @@ public String toString() { + ("".equals(myName) ? "" : " (" + myName + ")"); } } -} \ No newline at end of file +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java index 16d03446..c8ab9f05 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java @@ -15,12 +15,15 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; +import jetbrains.jetpad.base.function.Supplier; public interface EventDispatchThread { long getCurrentTimeMillis(); - void schedule(Runnable r) throws EdtException; + Async schedule(Runnable r) throws EdtException; + Async schedule(Supplier s) throws EdtException; Registration schedule(int delay, Runnable r) throws EdtException; Registration scheduleRepeating(int period, Runnable r) throws EdtException; -} \ No newline at end of file +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java index 4d6edf83..00ed1760 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java @@ -15,9 +15,13 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; import jetbrains.jetpad.base.ThrowableHandlers; +import jetbrains.jetpad.base.function.Supplier; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -25,6 +29,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; public final class ExecutorEdtManager implements EdtManager, EventDispatchThread { @@ -58,7 +63,7 @@ public void finish() { @Override public void kill() { ensureCanShutdown(); - myEdt.getExecutor().shutdownNow(); + myEdt.kill(); waitTermination(); } @@ -94,9 +99,18 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(Runnable runnable) { + public Async schedule(Supplier s) { try { - myEdt.schedule(runnable); + return myEdt.schedule(s); + } catch (RejectedExecutionException e) { + throw new EdtException(e); + } + } + + @Override + public Async schedule(Runnable runnable) { + try { + return myEdt.schedule(runnable); } catch (RejectedExecutionException e) { throw new EdtException(e); } @@ -133,8 +147,10 @@ public String toString() { return "ExecutorEdtManager@" + Integer.toHexString(hashCode()) + myThreadFactory.getPrintName(); } - private static class ExecutorEdt implements EventDispatchThread { + private static class ExecutorEdt extends DefaultAsyncEdt { private final ScheduledExecutorService myExecutor; + private final ConcurrentMap> myUnresolvedAsyncs = new ConcurrentHashMap<>(); + private final AtomicInteger myCounter = new AtomicInteger(0); ExecutorEdt(ThreadFactory threadFactory) { myExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); @@ -146,8 +162,17 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(Runnable r) { - myExecutor.submit(handleFailure(r)); + protected Async asyncSchedule(final RunnableWithAsync task) { + final int i = myCounter.incrementAndGet(); + myUnresolvedAsyncs.put(i, task); + myExecutor.submit(new Runnable() { + @Override + public void run() { + task.run(); + myUnresolvedAsyncs.remove(i); + } + }); + return task; } @Override @@ -183,6 +208,13 @@ ExecutorService getExecutor() { public String toString() { return "ExecutorEdt@" + Integer.toHexString(hashCode()) + " " + Thread.currentThread().getName(); } + + private void kill() { + myExecutor.shutdownNow(); + for (RunnableWithAsync async : myUnresolvedAsyncs.values()) { + async.fail(); + } + } } private static class FutureRegistration extends Registration { @@ -236,4 +268,4 @@ Object getEdtId() { return myEdtId; } } -} \ No newline at end of file +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/JsEventDispatchThread.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/JsEventDispatchThread.java index f6cdba31..6f82f4e5 100644 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/JsEventDispatchThread.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/JsEventDispatchThread.java @@ -18,12 +18,13 @@ import com.google.gwt.core.client.JavaScriptException; import com.google.gwt.core.client.Scheduler; import com.google.gwt.user.client.Timer; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; import jetbrains.jetpad.base.ThrowableHandlers; import java.util.logging.Logger; -public final class JsEventDispatchThread implements EventDispatchThread { +public final class JsEventDispatchThread extends DefaultAsyncEdt { private static final Logger LOG = Logger.getLogger(JsEventDispatchThread.class.getName()); public static final JsEventDispatchThread INSTANCE = new JsEventDispatchThread(); @@ -37,13 +38,14 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(final Runnable r) { + protected Async asyncSchedule(final RunnableWithAsync runnableWithAsync) { Scheduler.get().scheduleFinally(new Scheduler.ScheduledCommand() { @Override public void execute() { - doExecute(r); + doExecute(runnableWithAsync); } }); + return runnableWithAsync; } @Override @@ -97,4 +99,4 @@ protected void doRemove() { } }; } -} \ No newline at end of file +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/NullEventDispatchThread.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/NullEventDispatchThread.java index fe921747..1a0b4177 100644 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/NullEventDispatchThread.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/NullEventDispatchThread.java @@ -15,16 +15,19 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; +import jetbrains.jetpad.base.Asyncs; import jetbrains.jetpad.base.Registration; -public final class NullEventDispatchThread implements EventDispatchThread { +public final class NullEventDispatchThread extends DefaultAsyncEdt { @Override public long getCurrentTimeMillis() { return 0L; } @Override - public void schedule(Runnable r) { + protected Async asyncSchedule(RunnableWithAsync runnableWithAsync) { + return Asyncs.constant(null); } @Override @@ -36,4 +39,4 @@ public Registration schedule(int delay, Runnable r) { public Registration scheduleRepeating(int period, Runnable r) { return Registration.EMPTY; } -} \ No newline at end of file +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java new file mode 100644 index 00000000..481a27a7 --- /dev/null +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java @@ -0,0 +1,83 @@ +package jetbrains.jetpad.base.edt; + +import jetbrains.jetpad.base.Async; +import jetbrains.jetpad.base.Registration; +import jetbrains.jetpad.base.ThreadSafeAsync; +import jetbrains.jetpad.base.function.Consumer; +import jetbrains.jetpad.base.function.Function; +import jetbrains.jetpad.base.function.Supplier; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class RunnableWithAsync implements Runnable, Async { + private final Supplier mySupplier; + private final ThreadSafeAsync myAsync; + private final AtomicBoolean myFulfilled = new AtomicBoolean(false); + + static RunnableWithAsync fromRunnable(final Runnable r) { + final ThreadSafeAsync async = new ThreadSafeAsync<>(); + Supplier s = new Supplier() { + @Override + public Void get() { + r.run(); + return null; + } + }; + return new RunnableWithAsync<>(s, async); + } + + static RunnableWithAsync fromSupplier(final Supplier s) { + return new RunnableWithAsync<>(s, new ThreadSafeAsync()); + } + + private RunnableWithAsync(Supplier r, ThreadSafeAsync async) { + mySupplier = r; + myAsync = async; + } + + @Override + public void run() { + try { + ResultT resultT = mySupplier.get(); + if (myFulfilled.compareAndSet(false, true)) { + myAsync.success(resultT); + } + } catch (Throwable t) { + if (myFulfilled.compareAndSet(false, true)) { + myAsync.failure(t); + } + throw t; + } + } + + @Override + public Registration onSuccess(Consumer successHandler) { + return myAsync.onSuccess(successHandler); + } + + @Override + public Registration onResult(Consumer successHandler, Consumer failureHandler) { + return myAsync.onResult(successHandler, failureHandler); + } + + @Override + public Registration onFailure(Consumer failureHandler) { + return myAsync.onFailure(failureHandler); + } + + @Override + public Async map(Function success) { + return myAsync.map(success); + } + + @Override + public Async flatMap(Function> success) { + return myAsync.flatMap(success); + } + + void fail() { + if (myFulfilled.compareAndSet(false, true)) { + myAsync.failure(new RuntimeException("Intentionally failed")); + } + } +} diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/RunningEdtManager.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/RunningEdtManager.java index c157ba25..e8639598 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/RunningEdtManager.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/RunningEdtManager.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.List; -public class RunningEdtManager implements EdtManager, EventDispatchThread { +public class RunningEdtManager extends DefaultAsyncEdt implements EdtManager { private String myName; - private List myTasks = new ArrayList<>(); + private List> myTasks = new ArrayList<>(); private boolean myFinished = false; private boolean myExecuting = false; private boolean myFlushing = false; @@ -54,6 +54,9 @@ public final void finish() { @Override public final void kill() { checkCanStop(); + for (RunnableWithAsync task : myTasks) { + task.fail(); + } myTasks.clear(); shutdown(); } @@ -67,12 +70,6 @@ public long getCurrentTimeMillis() { return System.currentTimeMillis(); } - @Override - public final void schedule(Runnable r) { - checkCanSchedule(); - doSchedule(r); - } - @Override public final Registration schedule(int delay, Runnable r) { checkCanSchedule(); @@ -166,11 +163,17 @@ public boolean isEmpty() { return myTasks.isEmpty(); } - void addTaskToQueue(Runnable r) { + void addTaskToQueue(RunnableWithAsync r) { myTasks.add(r); } - protected void doSchedule(Runnable r) { + @Override + protected final RunnableWithAsync asyncSchedule(RunnableWithAsync r) { + checkCanSchedule(); + return doSchedule(r); + } + + protected RunnableWithAsync doSchedule(RunnableWithAsync r) { if (myExecuting) { myTasks.add(r); } else { @@ -182,6 +185,7 @@ protected void doSchedule(Runnable r) { flush(); } } + return r; } protected Registration doSchedule(int delay, Runnable r) { @@ -196,4 +200,4 @@ protected Registration doScheduleRepeating(int period, Runnable r) { public String toString() { return "RunningEdtManager@" + Integer.toHexString(hashCode()) + ("".equals(myName) ? "" : " (" + myName + ")"); } -} \ No newline at end of file +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManager.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManager.java index 449f1bc6..961ce700 100755 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManager.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManager.java @@ -24,8 +24,9 @@ public BufferingEdtManager(String name) { } @Override - protected void doSchedule(Runnable r) { + protected RunnableWithAsync doSchedule(RunnableWithAsync r) { addTaskToQueue(r); + return r; } @Override @@ -33,4 +34,4 @@ public String toString() { return "BufferingEdtManager@" + Integer.toHexString(hashCode()) + ("".equals(getName()) ? "" : " (" + getName()+ ")"); } -} \ No newline at end of file +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManagerTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManagerTest.java index fecd98c5..156cabef 100755 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManagerTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/BufferingEdtManagerTest.java @@ -15,7 +15,9 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Value; +import jetbrains.jetpad.base.function.Supplier; import jetbrains.jetpad.test.BaseTestCase; import org.junit.Before; import org.junit.Test; @@ -102,4 +104,16 @@ public void run() { assertEquals(new Integer(1), taskCompleted.get()); assertTrue(manager.isStopped()); } -} \ No newline at end of file + + @Test + public void killFailsAsync() { + Async async = manager.schedule(new Supplier() { + @Override + public Integer get() { + return 42; + } + }); + + EdtTestUtil.killAndAssertFailure(manager, async); + } +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java index 71eb32b3..88db8519 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java @@ -15,7 +15,9 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; +import jetbrains.jetpad.base.function.Supplier; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -48,10 +50,20 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(Runnable r) throws EdtException { + public Async schedule(Runnable r) throws EdtException { myLock.lock(); try { - myEdt.schedule(r); + return myEdt.schedule(r); + } finally { + myLock.unlock(); + } + } + + @Override + public Async schedule(Supplier s) throws EdtException { + myLock.lock(); + try { + return myEdt.schedule(s); } finally { myLock.unlock(); } @@ -129,4 +141,4 @@ public boolean isStopped() { myLock.unlock(); } } -} \ No newline at end of file +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java new file mode 100644 index 00000000..ab2e94d3 --- /dev/null +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java @@ -0,0 +1,66 @@ +package jetbrains.jetpad.base.edt; + +import jetbrains.jetpad.base.Async; +import jetbrains.jetpad.base.ThrowableHandlers; +import jetbrains.jetpad.base.Value; +import jetbrains.jetpad.base.function.Supplier; + +import static jetbrains.jetpad.base.AsyncMatchers.failed; +import static jetbrains.jetpad.base.AsyncMatchers.result; +import static jetbrains.jetpad.base.AsyncMatchers.unfinished; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; + +final class EdtTestUtil { + private static final int VALUE = 42; + static Supplier getDefaultSupplier() { + return new Supplier() { + @Override + public Integer get() { + return VALUE; + } + }; + } + + static void killAndAssertFailure(EdtManager manager, Async... asyncs) { + for (Async async : asyncs) { + assertThat(async, unfinished()); + } + manager.kill(); + for (Async async : asyncs) { + assertThat(async, failed()); + } + } + + static void assertAsyncFulfilled(EventDispatchThread edt, Runnable flush) { + Async async = edt.schedule(new Supplier() { + @Override + public Integer get() { + return VALUE; + } + }); + flush.run(); + assertThat(async, result(is(VALUE))); + } + + static void assertAsyncRejected(final EventDispatchThread edt, final Runnable flush) { + final Value> asyncValue = new Value<>(); + ThrowableHandlers.asInProduction(new Runnable() { + @Override + public void run() { + Async async = edt.schedule(new Supplier() { + @Override + public Integer get() { + throw new RuntimeException(); + } + }); + flush.run(); + asyncValue.set(async); + } + }); + assertThat(asyncValue.get(), failed()); + } + + private EdtTestUtil() { + } +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java index fe8e6b87..fde7f32c 100755 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java @@ -15,9 +15,11 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Runnables; import jetbrains.jetpad.base.ThrowableHandlers; import jetbrains.jetpad.base.function.Consumer; +import jetbrains.jetpad.base.function.Supplier; import jetbrains.jetpad.test.BaseTestCase; import jetbrains.jetpad.test.Slow; import org.junit.Assert; @@ -27,6 +29,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncFulfilled; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncRejected; +import static jetbrains.jetpad.base.edt.EdtTestUtil.getDefaultSupplier; +import static jetbrains.jetpad.base.edt.EdtTestUtil.killAndAssertFailure; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -79,6 +85,46 @@ public void accept(EdtManager edtManager) { }); } + @Test + public void killFailsRunningAsyncs() { + scheduleInfiniteAndKill(); + } + + @Test + public void killFailsNotStartedAsyncs() { + scheduleInfiniteAndKill(getDefaultSupplier()); + } + + private void scheduleInfiniteAndKill(Supplier... restTasks) { + ExecutorEdtManager manager = new ExecutorEdtManager("Manager"); + final CountDownLatch runnableStartedLatch = new CountDownLatch(1); + + Async[] asyncs = new Async[restTasks.length + 1]; + + asyncs[0] = scheduleInfiniteTask(manager, runnableStartedLatch); + for (int i = 0; i < restTasks.length; i++) { + asyncs[i + 1] = manager.schedule(restTasks[i]); + } + await(runnableStartedLatch); + + killAndAssertFailure(manager, asyncs); + } + + private Async scheduleInfiniteTask(ExecutorEdtManager manager, final CountDownLatch runnableStartedLatch) { + return manager.schedule(new Supplier() { + @Override + public Integer get() { + runnableStartedLatch.countDown(); + try { + new CountDownLatch(1).await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return 42; + } + }); + } + private void shutdownFromItself(final Consumer shutdowner) { final EdtManager manager = new ExecutorEdtManager("MyEdt2"); final AtomicBoolean caught = new AtomicBoolean(false); @@ -131,4 +177,32 @@ public void run() { assertTrue(manager.isStopped()); } -} \ No newline at end of file + + @Test + public void asyncFulfilled() { + ExecutorEdtManager edtManager = new ExecutorEdtManager("EdtManager"); + assertAsyncFulfilled(edtManager, awaitExecution(edtManager)); + } + + @Test + public void asyncRejected() { + ExecutorEdtManager edtManager = new ExecutorEdtManager("EdtManager"); + assertAsyncRejected(edtManager, awaitExecution(edtManager)); + } + + private Runnable awaitExecution(final ExecutorEdtManager edtManager) { + return new Runnable() { + @Override + public void run() { + final CountDownLatch latch = new CountDownLatch(1); + edtManager.schedule(new Runnable() { + @Override + public void run() { + latch.countDown(); + } + }); + await(latch); + } + }; + } +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java index da1da1e7..4b722295 100755 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java @@ -20,12 +20,18 @@ import jetbrains.jetpad.test.BaseTestCase; import org.junit.Test; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncFulfilled; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncRejected; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class RunningEdtManagerTest extends BaseTestCase { private RunningEdtManager manager = new RunningEdtManager(); + private static final Runnable EMPTY = new Runnable() { + @Override + public void run() { } + }; @Test public void sequentialAddition() { @@ -55,6 +61,16 @@ public void run() { assertTrue(manager.isEmpty()); } + @Test + public void asyncFulfill() { + assertAsyncFulfilled(manager, EMPTY); + } + + @Test + public void asyncReject() { + assertAsyncRejected(manager, EMPTY); + } + @Test public void exceptionInTask() { ThrowableHandlers.asInProduction(new Runnable() { @@ -207,4 +223,4 @@ public void run() { assertEquals(1, (int) taskCompleted.get()); assertTrue(manager.isStopped()); } -} \ No newline at end of file +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEdtManager.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEdtManager.java index 0e9d1d3b..f785181d 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEdtManager.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEdtManager.java @@ -50,4 +50,4 @@ public boolean isStopped() { public String toString() { return "manager for " + myEdt; } -} \ No newline at end of file +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThread.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThread.java index 5e5b55e8..3e4ab27a 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThread.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThread.java @@ -15,6 +15,7 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; import jetbrains.jetpad.base.ThrowableHandlers; import jetbrains.jetpad.base.Value; @@ -22,7 +23,7 @@ import java.util.ArrayList; import java.util.List; -public final class TestEventDispatchThread implements EventDispatchThread { +public final class TestEventDispatchThread extends DefaultAsyncEdt { private final String myName; private final Runnable myThreadSafeChecker; @@ -156,12 +157,28 @@ public long getCurrentTimeMillis() { } @Override - public void schedule(Runnable r) { - schedule(0, r); + protected Async asyncSchedule(RunnableWithAsync runnableWithAsync) { + schedule(0, runnableWithAsync); + return runnableWithAsync; } @Override public Registration schedule(int delay, Runnable r) { + myThreadSafeChecker.run(); + checkCanSchedule(); + myModificationCount++; + final RunnableRecord record = new RunnableRecord(myCurrentTime + delay, RunnableWithAsync.fromRunnable(r)); + myRecords.add(record); + return new Registration() { + @Override + protected void doRemove() { + myRecords.remove(record); + } + }; + } + + + private Registration schedule(int delay, RunnableWithAsync r) { myThreadSafeChecker.run(); checkCanSchedule(); myModificationCount++; @@ -231,9 +248,16 @@ public void kill() { myThreadSafeChecker.run(); checkCanStop(); checkInsideTask(); + failCurrentRecords(); shutdown(); } + private void failCurrentRecords() { + for (RunnableRecord record : myRecords) { + record.myRunnable.fail(); + } + } + private void shutdown() { myThreadSafeChecker.run(); myRecords.clear(); @@ -246,11 +270,11 @@ public boolean isFinished() { private static class RunnableRecord { private final int myTargetTime; - private final Runnable myRunnable; + private final RunnableWithAsync myRunnable; - private RunnableRecord(int targetTime, Runnable runnable) { + private RunnableRecord(int targetTime, RunnableWithAsync runnable) { myTargetTime = targetTime; myRunnable = runnable; } } -} \ No newline at end of file +} diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java index 0e1769e8..088a9dff 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java @@ -15,6 +15,7 @@ */ package jetbrains.jetpad.base.edt; +import jetbrains.jetpad.base.Async; import jetbrains.jetpad.base.Registration; import jetbrains.jetpad.base.ThrowableHandlers; import jetbrains.jetpad.base.Value; @@ -27,12 +28,24 @@ import java.util.Arrays; import java.util.List; +import static jetbrains.jetpad.base.AsyncMatchers.failed; +import static jetbrains.jetpad.base.AsyncMatchers.unfinished; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncFulfilled; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncRejected; +import static jetbrains.jetpad.base.edt.EdtTestUtil.getDefaultSupplier; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.times; public class TestEventDispatchThreadTest extends BaseTestCase { private TestEventDispatchThread edt = new TestEventDispatchThread(); + private Runnable flush = new Runnable() { + @Override + public void run() { + edt.executeUpdates(); + } + }; @Before public void setUp() { @@ -185,4 +198,22 @@ public void run() { assertEquals(Arrays.asList(1, 2), executionOrder); } -} \ No newline at end of file + + @Test + public void fulfillAsync() { + assertAsyncFulfilled(edt, flush); + } + + @Test + public void rejectAsync() { + assertAsyncRejected(edt, flush); + } + + @Test + public void killFailsAsyncs() { + Async async = edt.schedule(getDefaultSupplier()); + assertThat(async, unfinished()); + edt.kill(); + assertThat(async, failed()); + } +} diff --git a/util/test/src/main/java/jetbrains/jetpad/test/BaseTestCase.java b/util/test/src/main/java/jetbrains/jetpad/test/BaseTestCase.java index 851710ac..2dd1a158 100755 --- a/util/test/src/main/java/jetbrains/jetpad/test/BaseTestCase.java +++ b/util/test/src/main/java/jetbrains/jetpad/test/BaseTestCase.java @@ -47,4 +47,4 @@ public static Level resetLogsLevel(Level level) { } return oldLevel; } -} \ No newline at end of file +} From 1e33940e966c8b54df598ffd4ef8829b7a23bee1 Mon Sep 17 00:00:00 2001 From: Stepan Tarasevich Date: Wed, 22 Nov 2017 16:52:13 +0300 Subject: [PATCH 2/2] Add flatSchedule(Supplier>) to EDT interface --- .../jetpad/base/edt/DefaultAsyncEdt.java | 5 ++ .../jetpad/base/edt/EdtManagerPool.java | 5 ++ .../jetpad/base/edt/EventDispatchThread.java | 1 + .../jetpad/base/edt/ExecutorEdtManager.java | 9 ++ .../jetpad/base/edt/RunnableWithAsync.java | 83 +++++++++++++++---- .../jetpad/base/edt/ConcurrentTestEdt.java | 10 +++ .../jetpad/base/edt/EdtTestUtil.java | 11 +++ .../base/edt/ExecutorEdtManagerTest.java | 7 ++ .../base/edt/RunningEdtManagerTest.java | 6 ++ .../base/edt/TestEventDispatchThreadTest.java | 6 ++ 10 files changed, 125 insertions(+), 18 deletions(-) diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java index d2a07291..f370f7a6 100644 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/DefaultAsyncEdt.java @@ -14,5 +14,10 @@ public final Async schedule(Supplier s) throws EdtEx return asyncSchedule(RunnableWithAsync.fromSupplier(s)); } + @Override + public Async flatSchedule(Supplier> s) throws EdtException { + return asyncSchedule(RunnableWithAsync.fromAsyncSupplier(s)); + } + protected abstract Async asyncSchedule(RunnableWithAsync runnableWithAsync); } diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java index 6e8698d6..266656fa 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/EdtManagerPool.java @@ -167,6 +167,11 @@ public Async schedule(Supplier s) { return myManager.getEdt().schedule(s); } + @Override + public Async flatSchedule(Supplier> s) { + return myManager.getEdt().flatSchedule(s); + } + @Override public Registration schedule(int delay, Runnable runnable) { return myManager.getEdt().schedule(delay, runnable); diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java index c8ab9f05..c5bd2187 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/EventDispatchThread.java @@ -24,6 +24,7 @@ public interface EventDispatchThread { long getCurrentTimeMillis(); Async schedule(Runnable r) throws EdtException; Async schedule(Supplier s) throws EdtException; + Async flatSchedule(Supplier> s) throws EdtException; Registration schedule(int delay, Runnable r) throws EdtException; Registration scheduleRepeating(int period, Runnable r) throws EdtException; } diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java index 00ed1760..65949ad9 100755 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/ExecutorEdtManager.java @@ -107,6 +107,15 @@ public Async schedule(Supplier s) { } } + @Override + public Async flatSchedule(Supplier> s) { + try { + return myEdt.flatSchedule(s); + } catch (RejectedExecutionException e) { + throw new EdtException(e); + } + } + @Override public Async schedule(Runnable runnable) { try { diff --git a/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java b/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java index 481a27a7..ab8b4450 100644 --- a/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java +++ b/util/base/src/main/java/jetbrains/jetpad/base/edt/RunnableWithAsync.java @@ -1,6 +1,7 @@ package jetbrains.jetpad.base.edt; import jetbrains.jetpad.base.Async; +import jetbrains.jetpad.base.Asyncs; import jetbrains.jetpad.base.Registration; import jetbrains.jetpad.base.ThreadSafeAsync; import jetbrains.jetpad.base.function.Consumer; @@ -10,12 +11,11 @@ import java.util.concurrent.atomic.AtomicBoolean; public class RunnableWithAsync implements Runnable, Async { - private final Supplier mySupplier; + private final AtomicBoolean myFulfilled; + private final Runnable myAction; private final ThreadSafeAsync myAsync; - private final AtomicBoolean myFulfilled = new AtomicBoolean(false); static RunnableWithAsync fromRunnable(final Runnable r) { - final ThreadSafeAsync async = new ThreadSafeAsync<>(); Supplier s = new Supplier() { @Override public Void get() { @@ -23,31 +23,78 @@ public Void get() { return null; } }; - return new RunnableWithAsync<>(s, async); + return fromSupplier(s); } static RunnableWithAsync fromSupplier(final Supplier s) { - return new RunnableWithAsync<>(s, new ThreadSafeAsync()); + return plain(s); } - private RunnableWithAsync(Supplier r, ThreadSafeAsync async) { - mySupplier = r; + static RunnableWithAsync fromAsyncSupplier(final Supplier> s) { + return async(s); + } + + private static Runnable fromPlainSupplier(final Supplier supplier, final ThreadSafeAsync async, + final AtomicBoolean fulfilled) { + return new Runnable() { + @Override + public void run() { + try { + T resultT = supplier.get(); + if (fulfilled.compareAndSet(false, true)) { + async.success(resultT); + } + } catch (Throwable t) { + if (fulfilled.compareAndSet(false, true)) { + async.failure(t); + } + throw t; + } + } + }; + } + + private static Runnable fromAsyncSupplier(final Supplier> supplier, final ThreadSafeAsync async, + final AtomicBoolean fulfilled) { + return new Runnable() { + @Override + public void run() { + try { + Async resultT = supplier.get(); + if (fulfilled.compareAndSet(false, true)) { + Asyncs.delegate(resultT, async); + } + } catch (Throwable t) { + if (fulfilled.compareAndSet(false, true)) { + async.failure(t); + } + throw t; + } + } + }; + } + + private static RunnableWithAsync plain(Supplier s) { + ThreadSafeAsync async = new ThreadSafeAsync<>(); + AtomicBoolean fulfilled = new AtomicBoolean(); + return new RunnableWithAsync<>(fromPlainSupplier(s, async, fulfilled), async, fulfilled); + } + + private static RunnableWithAsync async(Supplier> s) { + ThreadSafeAsync async = new ThreadSafeAsync<>(); + AtomicBoolean fulfilled = new AtomicBoolean(); + return new RunnableWithAsync<>(fromAsyncSupplier(s, async, fulfilled), async, fulfilled); + } + + private RunnableWithAsync(Runnable action, ThreadSafeAsync async, AtomicBoolean fulfilled) { + myAction = action; myAsync = async; + myFulfilled = fulfilled; } @Override public void run() { - try { - ResultT resultT = mySupplier.get(); - if (myFulfilled.compareAndSet(false, true)) { - myAsync.success(resultT); - } - } catch (Throwable t) { - if (myFulfilled.compareAndSet(false, true)) { - myAsync.failure(t); - } - throw t; - } + myAction.run(); } @Override diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java index 88db8519..74f2c875 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/ConcurrentTestEdt.java @@ -69,6 +69,16 @@ public Async schedule(Supplier s) throws EdtExceptio } } + @Override + public Async flatSchedule(Supplier> s) throws EdtException { + myLock.lock(); + try { + return myEdt.flatSchedule(s); + } finally { + myLock.unlock(); + } + } + @Override public Registration schedule(int delay, Runnable r) throws EdtException { myLock.lock(); diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java index ab2e94d3..450c3722 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/EdtTestUtil.java @@ -43,6 +43,17 @@ public Integer get() { assertThat(async, result(is(VALUE))); } + static void assertFlatAsyncFulfilled(final EventDispatchThread edt, Runnable flush) { + Async async = edt.flatSchedule(new Supplier>() { + @Override + public Async get() { + return edt.schedule(getDefaultSupplier()); + } + }); + flush.run(); + assertThat(async, result(is(VALUE))); + } + static void assertAsyncRejected(final EventDispatchThread edt, final Runnable flush) { final Value> asyncValue = new Value<>(); ThrowableHandlers.asInProduction(new Runnable() { diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java index fde7f32c..749d049f 100755 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/ExecutorEdtManagerTest.java @@ -31,6 +31,7 @@ import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncFulfilled; import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncRejected; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertFlatAsyncFulfilled; import static jetbrains.jetpad.base.edt.EdtTestUtil.getDefaultSupplier; import static jetbrains.jetpad.base.edt.EdtTestUtil.killAndAssertFailure; import static org.junit.Assert.assertFalse; @@ -184,6 +185,12 @@ public void asyncFulfilled() { assertAsyncFulfilled(edtManager, awaitExecution(edtManager)); } + @Test + public void flatAsyncFulfilled() { + ExecutorEdtManager edtManager = new ExecutorEdtManager("EdtManager"); + assertFlatAsyncFulfilled(edtManager, awaitExecution(edtManager)); + } + @Test public void asyncRejected() { ExecutorEdtManager edtManager = new ExecutorEdtManager("EdtManager"); diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java index 4b722295..441e4591 100755 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/RunningEdtManagerTest.java @@ -22,6 +22,7 @@ import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncFulfilled; import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncRejected; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertFlatAsyncFulfilled; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -66,6 +67,11 @@ public void asyncFulfill() { assertAsyncFulfilled(manager, EMPTY); } + @Test + public void flatAsyncFulfill() { + assertFlatAsyncFulfilled(manager, EMPTY); + } + @Test public void asyncReject() { assertAsyncRejected(manager, EMPTY); diff --git a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java index 088a9dff..d8de6b66 100644 --- a/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java +++ b/util/base/src/test/java/jetbrains/jetpad/base/edt/TestEventDispatchThreadTest.java @@ -32,6 +32,7 @@ import static jetbrains.jetpad.base.AsyncMatchers.unfinished; import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncFulfilled; import static jetbrains.jetpad.base.edt.EdtTestUtil.assertAsyncRejected; +import static jetbrains.jetpad.base.edt.EdtTestUtil.assertFlatAsyncFulfilled; import static jetbrains.jetpad.base.edt.EdtTestUtil.getDefaultSupplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -204,6 +205,11 @@ public void fulfillAsync() { assertAsyncFulfilled(edt, flush); } + @Test + public void fulfillFlatAsync() { + assertFlatAsyncFulfilled(edt, flush); + } + @Test public void rejectAsync() { assertAsyncRejected(edt, flush);