From da507bcdb8141a07eb2df487fcb161d9d704b010 Mon Sep 17 00:00:00 2001 From: g-mixalowski Date: Wed, 18 Mar 2026 20:52:39 +0300 Subject: [PATCH] synchronizer: try --- .../tasks/synchronizer/StreamWriter.java | 22 ++++-- .../tasks/synchronizer/StreamingMonitor.java | 77 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 20 +++-- 3 files changed, 106 insertions(+), 13 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..623d3e42 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,11 +26,23 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + StreamingMonitor m = monitor; + if (m == null) { + throw new IllegalStateException(); + } + boolean allowed = m.awaitTurn(id); + if (!allowed) { + return; + } + output.print(message); + onTick.run(); + m.tickDone(id); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } } } - -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..3d71fb6b 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,80 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.*; + public class StreamingMonitor { - // impl your sync here + + private final List orderedIds; + private final int ticksPerWriter; + private final Map ticksDone = new HashMap<>(); + private int currentIndex = 0; + private int totalDone = 0; + private final int requiredTotal; + + public StreamingMonitor(List writers, int ticksPerWriter) { + if (writers == null || writers.isEmpty()) { + throw new IllegalArgumentException(); + } + if (ticksPerWriter < 0) { + throw new IllegalArgumentException(); + } + this.ticksPerWriter = ticksPerWriter; + + TreeSet sorted = new TreeSet<>(); + for (StreamWriter w : writers) { + sorted.add(w.getId()); + } + this.orderedIds = new ArrayList<>(sorted); + for (Integer id : orderedIds) { + ticksDone.put(id, 0); + } + this.requiredTotal = orderedIds.size() * ticksPerWriter; + } + + public synchronized boolean awaitTurn(int id) throws InterruptedException { + while (true) { + if (totalDone >= requiredTotal) { + return false; + } + int currentId = orderedIds.get(currentIndex); + if (id == currentId && ticksDone.get(id) < ticksPerWriter) { + return true; + } + wait(); + } + } + + public synchronized void tickDone(int id) { + if (totalDone >= requiredTotal) { + notifyAll(); + return; + } + + int done = ticksDone.get(id) + 1; + ticksDone.put(id, done); + totalDone++; + + if (totalDone >= requiredTotal) { + notifyAll(); + return; + } + + int n = orderedIds.size(); + for (int step = 1; step <= n; step++) { + int idx = (currentIndex + step) % n; + int nextId = orderedIds.get(idx); + if (ticksDone.get(nextId) < ticksPerWriter) { + currentIndex = idx; + break; + } + } + + notifyAll(); + } + + public synchronized void awaitCompletion() throws InterruptedException { + while (totalDone < requiredTotal) { + wait(); + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..45331096 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -17,17 +17,23 @@ public Synchronizer(List tasks, int ticksPerWriter) { this.ticksPerWriter = ticksPerWriter; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ public void execute() { - // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } - } -} + try { + monitor.awaitCompletion(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} \ No newline at end of file