diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..4e036098 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,10 +26,20 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + try + { + while (true) + { + monitor.waitWorker(this.id); + output.print(message); + onTick.run(); + + monitor.next(); + } + } + catch (InterruptedException e) + { + // И тут тоже, что делать стоит.... } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..11c5d43c 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,43 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.List; + public class StreamingMonitor { - // impl your sync here + + private final int maxTicks; + private final List ids; + + private int curIndex = 0; + private int curTick = 0; + + public StreamingMonitor(List ids, int maxTicks) + { + this.ids = ids; + this.maxTicks = maxTicks; + } + + + public synchronized void waitWorker(int id) throws InterruptedException + { + while (curTick >= maxTicks || ids.get(curIndex) != id) + { + wait(); + } + } + + public synchronized void next() + { + curTick++; + curIndex = (curIndex + 1) % ids.size(); + notifyAll(); + } + + public synchronized void waitFinish() throws InterruptedException + { + while (curTick < maxTicks) + { + wait(); + } + } } + diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..4d609610 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,33 +1,56 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; -public class Synchronizer { +public class Synchronizer +{ public static final int DEFAULT_TICKS_PER_WRITER = 10; private final List tasks; private final int ticksPerWriter; - public Synchronizer(List tasks) { + public Synchronizer(List tasks) + { this(tasks, DEFAULT_TICKS_PER_WRITER); } - public Synchronizer(List tasks, int ticksPerWriter) { + public Synchronizer(List tasks, int ticksPerWriter) + { this.tasks = tasks; this.ticksPerWriter = ticksPerWriter; } /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks + * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter + * ticks * in strict ascending id order. */ - public void execute() { - // add monitor and sync - for (StreamWriter writer : tasks) { + public void execute() + { + + List tasksCopy = new ArrayList<>(tasks); + tasksCopy.sort(Comparator.comparingInt(StreamWriter::getId)); + + List taskIds = tasksCopy.stream().map(StreamWriter::getId).toList(); + + StreamingMonitor monitor = new StreamingMonitor(taskIds, tasks.size() * ticksPerWriter); + for (StreamWriter writer : tasksCopy) + { + writer.attachMonitor(monitor); Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + try + { + monitor.waitFinish(); + } catch (InterruptedException e) + { + // я не знаю что здесь стоит сделать =( + } } }