-
Notifications
You must be signed in to change notification settings - Fork 22
Async EDT.schedule #216
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Async EDT.schedule #216
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| package jetbrains.jetpad.base.edt; | ||
|
|
||
| import jetbrains.jetpad.base.Async; | ||
| import jetbrains.jetpad.base.function.Supplier; | ||
|
|
||
| public abstract class DefaultAsyncEdt implements EventDispatchThread { | ||
| @Override | ||
| public final Async<Void> schedule(Runnable r) throws EdtException { | ||
| return asyncSchedule(RunnableWithAsync.fromRunnable(r)); | ||
| } | ||
|
|
||
| @Override | ||
| public final <ResultT> Async<ResultT> schedule(Supplier<ResultT> s) throws EdtException { | ||
| return asyncSchedule(RunnableWithAsync.fromSupplier(s)); | ||
| } | ||
|
|
||
| @Override | ||
| public <ResultT> Async<ResultT> flatSchedule(Supplier<Async<ResultT>> s) throws EdtException { | ||
| return asyncSchedule(RunnableWithAsync.fromAsyncSupplier(s)); | ||
| } | ||
|
|
||
| protected abstract <ResultT> Async<ResultT> asyncSchedule(RunnableWithAsync<ResultT> runnableWithAsync); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,12 +15,16 @@ | |
| */ | ||
| package jetbrains.jetpad.base.edt; | ||
|
|
||
| import jetbrains.jetpad.base.Async; | ||
| import jetbrains.jetpad.base.Registration; | ||
| import jetbrains.jetpad.base.function.Supplier; | ||
|
|
||
| public interface EventDispatchThread { | ||
|
|
||
| long getCurrentTimeMillis(); | ||
| void schedule(Runnable r) throws EdtException; | ||
| Async<Void> schedule(Runnable r) throws EdtException; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may be we need to clarify threading of callbacks and kill/finish semantics? |
||
| <ResultT> Async<ResultT> schedule(Supplier<ResultT> s) throws EdtException; | ||
| <ResultT> Async<ResultT> flatSchedule(Supplier<Async<ResultT>> s) throws EdtException; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you illustrate how this method will be used? |
||
| Registration schedule(int delay, Runnable r) throws EdtException; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. may be schedule(int delay, Runnable r) should return AsyncWithRegistration? |
||
| Registration scheduleRepeating(int period, Runnable r) throws EdtException; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,16 +15,21 @@ | |
| */ | ||
| package jetbrains.jetpad.base.edt; | ||
|
|
||
| import jetbrains.jetpad.base.Async; | ||
| import jetbrains.jetpad.base.Registration; | ||
| import jetbrains.jetpad.base.ThrowableHandlers; | ||
| import jetbrains.jetpad.base.function.Supplier; | ||
|
|
||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.RejectedExecutionException; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.ThreadFactory; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.logging.Logger; | ||
|
|
||
| public final class ExecutorEdtManager implements EdtManager, EventDispatchThread { | ||
|
|
@@ -58,7 +63,7 @@ public void finish() { | |
| @Override | ||
| public void kill() { | ||
| ensureCanShutdown(); | ||
| myEdt.getExecutor().shutdownNow(); | ||
| myEdt.kill(); | ||
| waitTermination(); | ||
| } | ||
|
|
||
|
|
@@ -94,9 +99,27 @@ public long getCurrentTimeMillis() { | |
| } | ||
|
|
||
| @Override | ||
| public void schedule(Runnable runnable) { | ||
| public <ResultT> Async<ResultT> schedule(Supplier<ResultT> s) { | ||
| try { | ||
| myEdt.schedule(runnable); | ||
| return myEdt.schedule(s); | ||
| } catch (RejectedExecutionException e) { | ||
| throw new EdtException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public <ResultT> Async<ResultT> flatSchedule(Supplier<Async<ResultT>> s) { | ||
| try { | ||
| return myEdt.flatSchedule(s); | ||
| } catch (RejectedExecutionException e) { | ||
| throw new EdtException(e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public Async<Void> schedule(Runnable runnable) { | ||
| try { | ||
| return myEdt.schedule(runnable); | ||
| } catch (RejectedExecutionException e) { | ||
| throw new EdtException(e); | ||
| } | ||
|
|
@@ -133,8 +156,10 @@ public String toString() { | |
| return "ExecutorEdtManager@" + Integer.toHexString(hashCode()) + myThreadFactory.getPrintName(); | ||
| } | ||
|
|
||
| private static class ExecutorEdt implements EventDispatchThread { | ||
| private static class ExecutorEdt extends DefaultAsyncEdt { | ||
| private final ScheduledExecutorService myExecutor; | ||
| private final ConcurrentMap<Integer, RunnableWithAsync<?>> myUnresolvedAsyncs = new ConcurrentHashMap<>(); | ||
| private final AtomicInteger myCounter = new AtomicInteger(0); | ||
|
|
||
| ExecutorEdt(ThreadFactory threadFactory) { | ||
| myExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory); | ||
|
|
@@ -146,8 +171,17 @@ public long getCurrentTimeMillis() { | |
| } | ||
|
|
||
| @Override | ||
| public void schedule(Runnable r) { | ||
| myExecutor.submit(handleFailure(r)); | ||
| protected <ResultT> Async<ResultT> asyncSchedule(final RunnableWithAsync<ResultT> task) { | ||
| final int i = myCounter.incrementAndGet(); | ||
| myUnresolvedAsyncs.put(i, task); | ||
| myExecutor.submit(new Runnable() { | ||
| @Override | ||
| public void run() { | ||
| task.run(); | ||
| myUnresolvedAsyncs.remove(i); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can shutdownNow() interrupt this runnable between task.run() and myUnresolvedAsyncs().remove? If so, kill() method can throw exception about failing already completed async |
||
| } | ||
| }); | ||
| return task; | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -183,6 +217,13 @@ ExecutorService getExecutor() { | |
| public String toString() { | ||
| return "ExecutorEdt@" + Integer.toHexString(hashCode()) + " " + Thread.currentThread().getName(); | ||
| } | ||
|
|
||
| private void kill() { | ||
| myExecutor.shutdownNow(); | ||
| for (RunnableWithAsync<?> async : myUnresolvedAsyncs.values()) { | ||
| async.fail(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static class FutureRegistration extends Registration { | ||
|
|
@@ -236,4 +277,4 @@ Object getEdtId() { | |
| return myEdtId; | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As far as I understand kill() contract, it should fail all asyncs that was scheduled in corresponding edt. As far as I see, this implementation fails to follow such contract.