diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 5c686cff..56c3bd96 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,70 @@ package hse.java.lectures.lecture6.tasks.queue; +import java.util.ArrayList; +import java.util.List; + public class BoundedBlockingQueue { + private List buf; + private int head = 0; + private int tail = 0; + private int size; + private final int capacity; + private Object monitor; public BoundedBlockingQueue(int capacity) { + if (capacity <= 0) { + throw new IllegalArgumentException(); + } + this.buf = new ArrayList<>(capacity); + this.size = 0; + this.capacity = capacity; + this.monitor = new Object(); + for (int i = 0; i < capacity; ++i) { + buf.add(null); + } } - public void put(T item) { + public void put(T item) throws InterruptedException { + if (item == null) { + throw new NullPointerException(); + } + synchronized (monitor) { + while (size == capacity) { + monitor.wait(); + } + + buf.set(tail, item); + size++; + tail = (tail + 1) % capacity; + + monitor.notify(); + } } - public T take() { - return null; + public T take() throws InterruptedException { + synchronized (monitor) { + while (size == 0) { + monitor.wait(); + } + T res = buf.get(head); + head = (head + 1) % capacity; + size--; + monitor.notify(); + return res; + } + } public int size() { - return 0; + synchronized (monitor) { + return size; + } } public int capacity() { - return 0; + return capacity; } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..7d9d8939 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -12,6 +12,7 @@ public class StreamWriter implements Runnable { private final PrintStream output; private final Runnable onTick; private volatile StreamingMonitor monitor; + private int ticksPrinted = 0; public StreamWriter(int id, String message, PrintStream output, Runnable onTick) { this.message = message; @@ -26,10 +27,16 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. - while (true) { - output.print(message); - onTick.run(); + while (ticksPrinted > Synchronizer.DEFAULT_TICKS_PER_WRITER) { + try { + monitor.waitForTurn(id); + output.print(message); + ticksPrinted++; + onTick.run(); + monitor.notifyTurnDone(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..460eeb27 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,27 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here + private int currentId = 1; + private int totalCount = 0; + private int limit; + private boolean isFinished = false; + + public StreamingMonitor(int totalTicks) { + this.limit = totalTicks; + } + + public synchronized void waitForTurn(int id) throws InterruptedException { + while (currentId != id || isFinished) { + this.wait(); + } + } + + public synchronized void notifyTurnDone() { + totalCount++; + if (totalCount == limit) { + isFinished = true; + } + currentId = (currentId % 3) + 1; + this.notifyAll(); + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..29e59390 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -18,12 +18,16 @@ public Synchronizer(List tasks, int ticksPerWriter) { } /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks + * Starts infinite writer threads and waits until each writer prints exactly + * ticksPerWriter ticks * in strict ascending id order. */ public void execute() { - // add monitor and sync + int totalTicks = tasks.size() * ticksPerWriter; + StreamingMonitor monitor = new StreamingMonitor(totalTicks); + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start();