diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..b1e8c48a 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,10 +26,24 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + synchronized (monitor) { + while (true) { + if (monitor.current>=monitor.total){ + monitor.notifyAll(); + return; + } + if ((monitor.current % monitor.N) + 1 == id) { + output.print(message); + onTick.run(); + monitor.current++; + monitor.notifyAll(); + } + try { + monitor.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..ed660d8c 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,13 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here + int N, ticksPerWriter; + int current = 0; + int total; + StreamingMonitor(int N,int ticksPerWriter){ + this.N=N; + this.ticksPerWriter=ticksPerWriter; + total=N*ticksPerWriter; + } + } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..39f037ff 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -22,12 +22,25 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks.size(),ticksPerWriter); + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + synchronized (monitor) { + while (monitor.current < monitor.total) { + try { + monitor.wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } } }