diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..393c5885 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,9 +28,11 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + monitor.tick(id, onTick, output, message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } } - } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..723ba26a 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,43 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.With; + +import java.io.PrintStream; + +@Getter public class StreamingMonitor { // impl your sync here -} + private final Object key = new Object(); + private final int N; + private final int ticksPerWriter; + private int cnt = 0; + private int nowId = 1; + + public StreamingMonitor(int size, int ticksPerWriter) { + this.N = size; + this.ticksPerWriter = size * ticksPerWriter; + } + + public synchronized void check() throws InterruptedException { + while(cnt < ticksPerWriter){ + wait(); + } + } + + public synchronized void tick(int idCome, Runnable onTick, PrintStream output, String message) throws InterruptedException { + + while (idCome != nowId || cnt >= ticksPerWriter) { + wait(); + } + + nowId = Integer.max(1, (nowId + 1) % (N+1)); + cnt += 1; + + output.print(message); + onTick.run(); + + notifyAll(); + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..4be7562d 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,6 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import javax.management.monitor.Monitor; import java.util.List; public class Synchronizer { @@ -22,12 +23,25 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - // add monitor and sync + StreamingMonitor streamingMonitor = new StreamingMonitor(tasks.size(), ticksPerWriter); + + for (var stream : tasks){ + stream.attachMonitor(streamingMonitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + try { + streamingMonitor.check(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + } -} +} \ No newline at end of file