diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 5c686cff..ec9c56d6 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,25 @@ -package hse.java.lectures.lecture6.tasks.queue; - -public class BoundedBlockingQueue { - - - public BoundedBlockingQueue(int capacity) { - - } - - public void put(T item) { - - } - - public T take() { - return null; - } - - public int size() { - return 0; - } - - public int capacity() { - return 0; - } -} +package hse.java.lectures.lecture6.tasks.queue; + +public class BoundedBlockingQueue { + + + public BoundedBlockingQueue(int capacity) { + + } + + public void put(T item) throws InterruptedException { + + } + + public T take() throws InterruptedException { + return null; + } + + public int size() { + return 0; + } + + public int capacity() { + return 0; + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..d52a0c57 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,8 +28,22 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + synchronized (monitor) { + while (id != monitor.streamIds.get(monitor.nowIdIndex)) { + try { + monitor.wait(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + if (monitor.cntTicks == monitor.cntCompleteTicks){ + break; + } + output.print(message); + onTick.run(); + monitor.nextId(); + monitor.notifyAll(); + } } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..d6c7da95 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,28 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.List; +import java.util.stream.Collectors; + public class StreamingMonitor { - // impl your sync here + public int nowIdIndex = 0; + public int cntTicks; + public int cntCompleteTicks = 0; + public List streamIds; + + StreamingMonitor(List streamIds, int cntTicks) { + this.cntTicks = cntTicks; + this.streamIds = streamIds + .stream() + .map(writer -> writer.getId()) + .sorted() + .collect(Collectors.toList()); + } + + void nextId() { + nowIdIndex++; + if (nowIdIndex == streamIds.size()) { + cntCompleteTicks++; + nowIdIndex = 0; + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..78e4e980 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -7,27 +7,35 @@ public class Synchronizer { public static final int DEFAULT_TICKS_PER_WRITER = 10; private final List tasks; private final int ticksPerWriter; + private StreamingMonitor monitor; public Synchronizer(List tasks) { this(tasks, DEFAULT_TICKS_PER_WRITER); + monitor = new StreamingMonitor(tasks, DEFAULT_TICKS_PER_WRITER); } public Synchronizer(List tasks, int ticksPerWriter) { this.tasks = tasks; this.ticksPerWriter = ticksPerWriter; + monitor = new StreamingMonitor(tasks, ticksPerWriter); } /** * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks * in strict ascending id order. */ - public void execute() { - // add monitor and sync + public void execute() throws InterruptedException { for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + synchronized (monitor) { + while (monitor.cntTicks != monitor.cntCompleteTicks) { + monitor.wait(); + } + } } }