From 92acb3548904cd721610561f3329476a9f585ebc Mon Sep 17 00:00:00 2001 From: Sofa Gareeva Date: Sat, 21 Mar 2026 22:24:51 +0300 Subject: [PATCH] synchronizer: first try --- .../tasks/synchronizer/StreamWriter.java | 15 +++-- .../tasks/synchronizer/StreamingMonitor.java | 60 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 14 ++++- 3 files changed, 82 insertions(+), 7 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..0a39ec06 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,10 +26,17 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + try { + while (true) { + if (!monitor.waitForTurn(id)) { + return; + } + output.print(message); + onTick.run(); + monitor.endTick(id); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..cc24d1cb 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,63 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.*; +import java.util.stream.Collectors; + public class StreamingMonitor { - // impl your sync here + private final List orderedIds; + private final Map idToOrder; + private final int ticksPerWriter; + private final int[] ticksDonePerWriter; + private int currentOrder; + private boolean isFinished = false; + + public StreamingMonitor(List orderedIds, int ticksPerWriter) { + if (orderedIds == null || orderedIds.isEmpty() || ticksPerWriter < 0) { + throw new IllegalArgumentException(); + } + this.orderedIds = orderedIds.stream().sorted().collect(Collectors.toList()); + this.idToOrder = new HashMap<>(); + for (int i = 0; i < this.orderedIds.size(); i++) { + idToOrder.put(this.orderedIds.get(i), i); + } + this.ticksPerWriter = ticksPerWriter; + this.ticksDonePerWriter = new int[orderedIds.size()]; + this.currentOrder = 0; + this.isFinished = false; + } + + public synchronized boolean waitForTurn(int id) throws InterruptedException { + int order = idToOrder.get(id); + while (isFinished == false && (currentOrder != order || ticksDonePerWriter[order] >= ticksPerWriter)) { + wait(); + } + if (isFinished || ticksDonePerWriter[order] >= ticksPerWriter) { + return false; + } + + return true; + } + + public synchronized void endTick(int id) { + int order = idToOrder.get(id); + ticksDonePerWriter[order]++; + currentOrder = (currentOrder + 1) % orderedIds.size(); + boolean allDone = true; + for (int ticks : ticksDonePerWriter) { + if (ticks < ticksPerWriter) { + allDone = false; + break; + } + } + if (allDone) { + isFinished = true; + } + notifyAll(); + } + + public synchronized void waitForEnd() throws InterruptedException { + while (!isFinished) { + wait(); + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..14c35d5b 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,6 +1,7 @@ package hse.java.lectures.lecture6.tasks.synchronizer; -import java.util.List; +import java.util.*; +import java.util.stream.Collectors; public class Synchronizer { @@ -22,12 +23,21 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - // add monitor and sync + List collectedIds = tasks.stream().map(StreamWriter::getId).collect(Collectors.toList()); + StreamingMonitor monitor = new StreamingMonitor(collectedIds, ticksPerWriter); + tasks.forEach(writer -> writer.attachMonitor(monitor)); + List threads = new ArrayList<>(); for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); + threads.add(worker); worker.start(); } + try { + monitor.waitForEnd(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } }