diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..fc9adb92 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,11 +26,24 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + synchronized (monitor) { + while (monitor.getCurrentId() != id && !monitor.isLimitReached()) { + try { + monitor.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + if (monitor.isLimitReached()) { + monitor.notifyAll(); + break; + } + output.print(message); + onTick.run(); + monitor.nextTurn(); + monitor.notifyAll(); + } } } - } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..2055008e 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,32 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import lombok.Getter; + +import java.util.List; +import java.util.concurrent.CountDownLatch; + public class StreamingMonitor { - // impl your sync here + @Getter + private final CountDownLatch latch; + private final List order; + private int currentInd; + + public StreamingMonitor(List tasks, int ticksPerWriter) { + this.order = tasks.stream().map(StreamWriter::getId).sorted().toList(); + this.currentInd = 0; + this.latch = new CountDownLatch(order.size() * ticksPerWriter); + } + + void nextTurn() { + currentInd = (currentInd + 1) % order.size(); + latch.countDown(); + } + + int getCurrentId() { + return order.get(currentInd); + } + + boolean isLimitReached() { + return latch.getCount() == 0; + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..63ad2b7e 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -21,13 +21,17 @@ public Synchronizer(List tasks, int ticksPerWriter) { * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks * in strict ascending id order. */ - public void execute() { - // add monitor and sync + public void execute() throws InterruptedException { + StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + monitor.getLatch().await(); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/t.md b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/t.md new file mode 100644 index 00000000..24b99046 --- /dev/null +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/t.md @@ -0,0 +1,26 @@ +1. синхронизатор запускает все потоки -- понятно +2. упорядочить вывод по id +- нужно я так понимаю реализовать StreamingMonitor + - нужно чтобы 1ый writer был с минимальным id + - после него подхватывал работу 2ой и т.д. + - разберемся с этим ниже +- передать каждому writer перед стартом +- в самом writer синхронизировать внутри while (synchronized + wait) +3. взять от каждого первые tickPerWrite +- пункт3 содержит в себе это условие: после N * ticks per write завершить execute() +- самый топорный способ: создать массив, в который по id каждого потока будет лежать сколько осталось сделать вызовов, тогда надо каждому потоку еще проверять условие: если == 0, завершиться +- или можно сделать массив latch (только возникает вопрос: как проверить все эти latch? думаю что то типо такого: +```java +for (var latch: latches) { + latch.await(); // я синтаксис точный не помню -- важна суть +} +``` + +- нужно чтобы 1ый writer был с минимальным id +- после него подхватывал работу 2ой и т.д. +из мыслей: было бы классно, если в StreamingMonitor лежала какая то последовательность: +1 -> 2 -> 3 +потоки могли бы читать (concurency reading это же не гонка!), но с synchronized (mutex) +как только поток видит: о мой id: (внутри самого writer он делает свою работу под захватом и меняет последовательность) + +как идея?