From 916f06df90c209b56a45d5fd429c4cb6185e4bab Mon Sep 17 00:00:00 2001 From: Milanya Date: Sat, 21 Mar 2026 01:26:25 +0300 Subject: [PATCH 1/4] queue: first try --- .../tasks/queue/BoundedBlockingQueue.java | 41 ++++++++++++++++--- 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 816f3ee..3a3920a 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,54 @@ package hse.java.lectures.lecture6.tasks.queue; +import java.util.LinkedList; +import java.util.Queue; + public class BoundedBlockingQueue { + private final Queue q; + private final int capacity; + private int size; public BoundedBlockingQueue(int capacity) { - + if (capacity <= 0) { + throw new IllegalArgumentException(); + } + this.capacity = capacity; + q = new LinkedList(); + size = 0; } public void put(T item) throws InterruptedException { - + if (item == null) { + throw new NullPointerException(); + } + synchronized (this) { + while (size == capacity) { + wait(); + } + q.add(item); + size++; + notifyAll(); + } } public T take() throws InterruptedException { - return null; + synchronized (this) { + while (size == 0) { + wait(); + } + T item = q.poll(); + size--; + notifyAll(); + return item; + } } - public int size() { - return 0; + public synchronized int size() { + return size; } public int capacity() { - return 0; + return capacity; } } From cc7116ace5a5a05120ea48c20193b52960442f53 Mon Sep 17 00:00:00 2001 From: Milanya Date: Sat, 21 Mar 2026 03:21:22 +0300 Subject: [PATCH 2/4] synchronizer: first try --- .../tasks/synchronizer/StreamWriter.java | 11 ++- .../tasks/synchronizer/StreamingMonitor.java | 68 ++++++++++++++++++- .../tasks/synchronizer/Synchronizer.java | 16 ++++- 3 files changed, 91 insertions(+), 4 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e6..d6e1c04 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -28,8 +28,15 @@ public void attachMonitor(StreamingMonitor monitor) { public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + monitor.wait_id(id); + if (!monitor.work_tick(id)) return; + output.print(message); + onTick.run(); + } catch (Exception e) { + Thread.currentThread().interrupt(); + break; + } } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f27..eac43d4 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,71 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.*; public class StreamingMonitor { - // impl your sync here + private final List writers; + private final int ticks_per_writer; + + private final Map done_ticks = new HashMap<>(); + + private int cur_ind; + private int cnt_ticks; + private final int max_cnt_ticks; + + public StreamingMonitor(List writers, int ticks_per_writer) { + this.writers = writers; + this.ticks_per_writer = ticks_per_writer; + this.max_cnt_ticks = writers.size() * ticks_per_writer; + + for (StreamWriter writer : writers) { + done_ticks.put(writer.getId(), 0); + } + cur_ind = 0; + cnt_ticks = 0; + } + + private boolean can_run(int id) { + if (cnt_ticks >= max_cnt_ticks) { + return true; + } + return writers.get(cur_ind).getId() == id && done_ticks.get(id) < ticks_per_writer; + } + + public synchronized void wait_id(int id) throws InterruptedException { + while (!can_run(id)) { + wait(); + } + } + + private void next() { + int n = writers.size(); + + for (int i = 1; i <= n; i++) { + int next = (cur_ind + i) % n; + int nextId = writers.get(next).getId(); + + if (done_ticks.get(nextId) < ticks_per_writer) { + cur_ind = next; + return; + } + } + } + + public synchronized boolean work_tick(int id) { + if (cnt_ticks >= max_cnt_ticks) { + notifyAll(); + return false; + } + done_ticks.put(id, done_ticks.get(id) + 1); + cnt_ticks++; + next(); + + notifyAll(); + return cnt_ticks < max_cnt_ticks; + } + + public synchronized void wait_all() throws InterruptedException { + while (cnt_ticks < max_cnt_ticks) { + wait(); + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8ade..e8cba98 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,6 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.Comparator; import java.util.List; public class Synchronizer { @@ -22,12 +23,25 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - // add monitor and sync + tasks.sort(Comparator.comparingInt(StreamWriter::getId)); + + StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } + + try { + monitor.wait_all(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } } } From 281dd869f354160f0d4895470957ea85e75623bc Mon Sep 17 00:00:00 2001 From: Milanya Date: Sat, 21 Mar 2026 03:32:07 +0300 Subject: [PATCH 3/4] synchronizer: sorted arraylist --- .../lecture6/tasks/synchronizer/Synchronizer.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index e8cba98..9339c55 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -1,5 +1,6 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -23,15 +24,16 @@ public Synchronizer(List tasks, int ticksPerWriter) { * in strict ascending id order. */ public void execute() { - tasks.sort(Comparator.comparingInt(StreamWriter::getId)); + List sorted = new ArrayList<>(tasks); + sorted.sort(Comparator.comparingInt(StreamWriter::getId)); - StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter); + StreamingMonitor monitor = new StreamingMonitor(sorted, ticksPerWriter); - for (StreamWriter writer : tasks) { + for (StreamWriter writer : sorted) { writer.attachMonitor(monitor); } - for (StreamWriter writer : tasks) { + for (StreamWriter writer : sorted) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); From ad5a7c45288fe68410e78cbfa099069b2a6176ea Mon Sep 17 00:00:00 2001 From: Milanya Date: Sat, 21 Mar 2026 04:32:05 +0300 Subject: [PATCH 4/4] synchronizer: fix --- .../lecture6/tasks/synchronizer/StreamWriter.java | 5 +---- .../lecture6/tasks/synchronizer/StreamingMonitor.java | 9 ++++++++- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index d6e1c04..d0a95cf 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -29,10 +29,7 @@ public void run() { // Writer threads are intentionally infinite for the task contract. while (true) { try { - monitor.wait_id(id); - if (!monitor.work_tick(id)) return; - output.print(message); - onTick.run(); + if (!monitor.work_tick(id, message, output, onTick)) return; } catch (Exception e) { Thread.currentThread().interrupt(); break; diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index eac43d4..35b7173 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,6 @@ package hse.java.lectures.lecture6.tasks.synchronizer; +import java.io.PrintStream; import java.util.*; public class StreamingMonitor { private final List writers; @@ -50,11 +51,17 @@ private void next() { } } - public synchronized boolean work_tick(int id) { + public synchronized boolean work_tick(int id, String message, PrintStream output, Runnable onTick) throws InterruptedException { + while (!can_run(id)) { + wait(); + } + if (cnt_ticks >= max_cnt_ticks) { notifyAll(); return false; } + output.print(message); + onTick.run(); done_ticks.put(id, done_ticks.get(id) + 1); cnt_ticks++; next();