From 5e5caa43b90de4a3c41049fe8728f778c37dfa86 Mon Sep 17 00:00:00 2001 From: MuravAna Date: Fri, 20 Mar 2026 23:58:20 +0300 Subject: [PATCH 1/2] queue: bebebe --- .../tasks/queue/BoundedBlockingQueue.java | 39 ++++++++++++++----- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 816f3ee..f476623 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,46 @@ package hse.java.lectures.lecture6.tasks.queue; +import java.util.LinkedList; +import java.util.Queue; + public class BoundedBlockingQueue { + private final int capacity; + private final Queue queue; public BoundedBlockingQueue(int capacity) { - + if (capacity <= 0) { + throw new IllegalArgumentException(); + } + this.capacity = capacity; + this.queue = new LinkedList<>(); } - public void put(T item) throws InterruptedException { - + public synchronized void put(T item) throws InterruptedException { + if (item == null) { + throw new NullPointerException(); + } + while (queue.size() == capacity) { + wait(); + } + queue.add(item); + notifyAll(); } - public T take() throws InterruptedException { - return null; + public synchronized T take() throws InterruptedException { + while (queue.isEmpty()) { + wait(); + } + T item = queue.poll(); + notifyAll(); + return item; } - public int size() { - return 0; + public synchronized int size() { + return queue.size(); } public int capacity() { - return 0; + return capacity; } -} +} \ No newline at end of file From d3dbec4739c3c7852c33f99f767271600760850a Mon Sep 17 00:00:00 2001 From: MuravAna Date: Sat, 21 Mar 2026 14:27:45 +0300 Subject: [PATCH 2/2] synchronizer: add code --- .../tasks/synchronizer/StreamWriter.java | 8 ++-- .../tasks/synchronizer/StreamingMonitor.java | 44 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 21 ++++++--- 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e6..36b2993 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,11 +26,13 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { + if (!monitor.waitTurn(id)) { + break; + } output.print(message); onTick.run(); + monitor.tickDone(); } } - -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f27..0b0ccdd 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,45 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here -} + + private final int[] sortedIds; + private final int targetTicks; + private int currentIdx = 0; + private int completedTicks = 0; + + public StreamingMonitor(int[] sortedIds, int targetTicks) { + this.sortedIds = sortedIds; + this.targetTicks = targetTicks; + } + + public synchronized boolean waitTurn(int id) { + try { + while (completedTicks < targetTicks && sortedIds[currentIdx] != id) { + wait(); + } + return completedTicks < targetTicks; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + public synchronized void tickDone() { + currentIdx++; + if (currentIdx == sortedIds.length) { + currentIdx = 0; + completedTicks++; + } + notifyAll(); + } + + public synchronized void awaitDone() { + try { + while (completedTicks < targetTicks) { + wait(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8ade..c799183 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -17,17 +17,24 @@ public Synchronizer(List tasks, int ticksPerWriter) { this.ticksPerWriter = ticksPerWriter; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ public void execute() { - // add monitor and sync + int[] sortedIds = tasks.stream() + .mapToInt(StreamWriter::getId) + .sorted() + .toArray(); + + StreamingMonitor monitor = new StreamingMonitor(sortedIds, ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } - } -} + monitor.awaitDone(); + } +} \ No newline at end of file