diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..16d490e4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -1,36 +1,41 @@ package hse.java.lectures.lecture6.tasks.synchronizer; -import lombok.Getter; - import java.io.PrintStream; +import lombok.Getter; public class StreamWriter implements Runnable { - private final String message; - @Getter - private final int id; - private final PrintStream output; - private final Runnable onTick; - private volatile StreamingMonitor monitor; - - public StreamWriter(int id, String message, PrintStream output, Runnable onTick) { - this.message = message; - this.id = id; - this.output = output; - this.onTick = onTick; + private final String message; + @Getter private final int id; + private final PrintStream output; + private final Runnable onTick; + private volatile StreamingMonitor monitor; + + public StreamWriter(int id, String message, PrintStream output, + Runnable onTick) { + this.message = message; + this.id = id; + this.output = output; + this.onTick = onTick; + } + + public void attachMonitor(StreamingMonitor monitor) { + this.monitor = monitor; + } + + @Override + public void run() { + while (true) { + try { + monitor.awaitTurn(id); + if (monitor.isDone()) + return; + output.print(message); + onTick.run(); + monitor.completeTick(id); + } catch (InterruptedException e) { + return; + } } - - public void attachMonitor(StreamingMonitor monitor) { - this.monitor = monitor; - } - - @Override - public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); - } - } - -} + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..a0138a91 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,53 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here + + private final int writerCount; + private final int ticksPerWriter; + private int currentId = 1; + private final int[] ticks; + private boolean done = false; + + public StreamingMonitor(int writerCount, int ticksPerWriter) { + this.writerCount = writerCount; + this.ticksPerWriter = ticksPerWriter; + this.ticks = new int[writerCount + 1]; + } + + public synchronized void awaitTurn(int id) throws InterruptedException { + while (currentId != id && !done) + wait(); + if (done) + throw new InterruptedException("done"); + } + + public synchronized void completeTick(int id) { + ticks[id]++; + int next = (currentId % writerCount) + 1; + while (ticks[next] >= ticksPerWriter) { + next = (next % writerCount) + 1; + if (next == id) { + done = true; + break; + } + } + boolean allDone = true; + for (int i = 1; i <= writerCount; i++) { + if (ticks[i] < ticksPerWriter) { + allDone = false; + break; + } + } + if (allDone) + done = true; + currentId = next; + notifyAll(); + } + + public synchronized boolean isDone() { return done; } + + public synchronized void waitUntilDone() throws InterruptedException { + while (!done) + wait(); + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..a2cdd3fa 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -4,30 +4,37 @@ public class Synchronizer { - public static final int DEFAULT_TICKS_PER_WRITER = 10; - private final List tasks; - private final int ticksPerWriter; + public static final int DEFAULT_TICKS_PER_WRITER = 10; + private final List tasks; + private final int ticksPerWriter; - public Synchronizer(List tasks) { - this(tasks, DEFAULT_TICKS_PER_WRITER); - } + public Synchronizer(List tasks) { + this(tasks, DEFAULT_TICKS_PER_WRITER); + } + + public Synchronizer(List tasks, int ticksPerWriter) { + this.tasks = tasks; + this.ticksPerWriter = ticksPerWriter; + } - public Synchronizer(List tasks, int ticksPerWriter) { - this.tasks = tasks; - this.ticksPerWriter = ticksPerWriter; + public void execute() { + StreamingMonitor monitor = + new StreamingMonitor(tasks.size(), ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ - public void execute() { - // add monitor and sync - for (StreamWriter writer : tasks) { - Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); - worker.setDaemon(true); - worker.start(); - } + for (StreamWriter writer : tasks) { + Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); + worker.setDaemon(true); + worker.start(); } + try { + monitor.waitUntilDone(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } }