Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,53 @@
package hse.java.lectures.lecture6.tasks.queue;

public class BoundedBlockingQueue<T> {
import java.util.ArrayDeque;
import java.util.Queue;

public class BoundedBlockingQueue<T> {

public BoundedBlockingQueue(int capacity) {
ArrayDeque<T> queue = new ArrayDeque<>();
private int capacity;
private int size = 0;
Object monitor = new Object();

public BoundedBlockingQueue(int capacity) throws IllegalArgumentException {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}

public void put(T item) {
public void put(T item) throws InterruptedException, IllegalArgumentException {
if (item == null) throw new IllegalArgumentException();
synchronized (monitor) {
if (size == capacity) {

monitor.wait();

}

queue.add(item);
size++;
monitor.notifyAll();
}
}

public T take() {
return null;
public T take() throws InterruptedException {
synchronized (monitor) {
while (size == 0) {

monitor.wait();

}
size--;
monitor.notifyAll();
return queue.poll();
}
}

public int size() {
return 0;
return this.size;
}

public int capacity() {
return 0;
return this.capacity;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import lombok.Getter;
import lombok.SneakyThrows;

import java.io.PrintStream;

Expand All @@ -24,12 +25,20 @@ public void attachMonitor(StreamingMonitor monitor) {
this.monitor = monitor;
}

@SneakyThrows
@Override
public void run() {
// Writer threads are intentionally infinite for the task contract.
while (true) {
output.print(message);
onTick.run();
synchronized (monitor) {
while (!monitor.isWorking || !(monitor.nowWriter() == id)) {
monitor.wait();
}
output.print(message);
onTick.run();
monitor.next();
monitor.notifyAll();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,34 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

public class StreamingMonitor {
// impl your sync here
int shouldPrintNow = 0;
List<Integer> ids;
boolean isWorking = true;
int ticksDone = 0;
int tickNeeded = 0;
StreamingMonitor(List<Integer> ids, int tickNeeded) {
this.ids = new ArrayList<>(ids);
this.ids.sort(Comparator.naturalOrder());
this.tickNeeded = tickNeeded;
}

Integer nowWriter() {
return ids.get(shouldPrintNow);
}

void next() {
shouldPrintNow++;
if (shouldPrintNow == ids.size()) {
shouldPrintNow = 0;
ticksDone++;
}

if (ticksDone == tickNeeded) {
isWorking = false;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

public class Synchronizer {

public static final int DEFAULT_TICKS_PER_WRITER = 10;
private final List<StreamWriter> tasks;
private List<StreamWriter> tasks = List.of();
private final int ticksPerWriter;

public Synchronizer(List<StreamWriter> tasks) {
Expand All @@ -15,19 +18,29 @@ public Synchronizer(List<StreamWriter> tasks) {
public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) {
this.tasks = tasks;
this.ticksPerWriter = ticksPerWriter;
monitor = new StreamingMonitor(tasks.stream()
.map(StreamWriter::getId).toList(), ticksPerWriter);
}

/**
* Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks
* in strict ascending id order.
*/
public void execute() {
private final StreamingMonitor monitor;
public void execute() throws InterruptedException {
// add monitor and sync
for (StreamWriter writer : tasks) {
writer.attachMonitor(monitor);
Thread worker = new Thread(writer, "stream-writer-" + writer.getId());
worker.setDaemon(true);
worker.start();
}

synchronized (monitor) {
while (monitor.isWorking) {
monitor.wait();
}
}
}

}
Loading