diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..d39b5ae2 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,10 +26,14 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { + monitor.waitQueue(id); + + if (monitor.checkAllTicksPrinted()) break; + output.print(message); onTick.run(); + monitor.refresh(id); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..17398e2d 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,41 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here + private int currId; + private final int workerNum; + private final int ticksPerWriter; + private int refreshCnt = 0; + + public StreamingMonitor(int id, int num, int ticks) { + currId = id; + workerNum = num; + ticksPerWriter = ticks; + } + + public void waitQueue (int id) { + synchronized (this) { + while (currId != id && !checkAllTicksPrinted()) { + try { + this.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + } + + public void refresh(int id) { + synchronized (this) { + refreshCnt++; + currId = currId % workerNum + 1; + notifyAll(); + } + } + + public boolean checkAllTicksPrinted() { + synchronized (this) { + return refreshCnt >= ticksPerWriter * workerNum; + } + } + } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..716c7376 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,7 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; public class Synchronizer { @@ -21,12 +23,22 @@ public Synchronizer(List tasks, int ticksPerWriter) { * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks * in strict ascending id order. */ - public void execute() { - // add monitor and sync - for (StreamWriter writer : tasks) { + public void execute() throws InterruptedException { + List sortedTasks = tasks.stream() + .sorted(Comparator.comparingInt(StreamWriter::getId)) + .toList(); + StreamingMonitor monitor = new StreamingMonitor(sortedTasks.get(0).getId(), sortedTasks.size(), ticksPerWriter); + List threads = new ArrayList<>(); + for (StreamWriter writer : sortedTasks) { + writer.attachMonitor(monitor); Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); - worker.setDaemon(true); + worker.setDaemon(false); worker.start(); + threads.add(worker); + } + + for (Thread worker: threads) { + worker.join(); } }