Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package hse.java.lectures.lecture6.tasks.queue;

public class BoundedBlockingQueue<T> {


public BoundedBlockingQueue(int capacity) {

}

public void put(T item) {

}

public T take() {
return null;
}

public int size() {
return 0;
}

public int capacity() {
return 0;
}
}
package hse.java.lectures.lecture6.tasks.queue;
public class BoundedBlockingQueue<T> {
public BoundedBlockingQueue(int capacity) {
}
public void put(T item) throws InterruptedException {
}
public T take() throws InterruptedException {
return null;
}
public int size() {
return 0;
}
public int capacity() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,22 @@ public void attachMonitor(StreamingMonitor monitor) {
public void run() {
// Writer threads are intentionally infinite for the task contract.
while (true) {
output.print(message);
onTick.run();
synchronized (monitor) {
while (id != monitor.streamIds.get(monitor.nowIdIndex)) {
try {
monitor.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (monitor.cntTicks == monitor.cntCompleteTicks){
break;
}
output.print(message);
onTick.run();
monitor.nextId();
monitor.notifyAll();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,28 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import java.util.List;
import java.util.stream.Collectors;

public class StreamingMonitor {
// impl your sync here
public int nowIdIndex = 0;
public int cntTicks;
public int cntCompleteTicks = 0;
public List<Integer> streamIds;

StreamingMonitor(List<StreamWriter> streamIds, int cntTicks) {
this.cntTicks = cntTicks;
this.streamIds = streamIds
.stream()
.map(writer -> writer.getId())
.sorted()
.collect(Collectors.toList());
}

void nextId() {
nowIdIndex++;
if (nowIdIndex == streamIds.size()) {
cntCompleteTicks++;
nowIdIndex = 0;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,35 @@ public class Synchronizer {
public static final int DEFAULT_TICKS_PER_WRITER = 10;
private final List<StreamWriter> tasks;
private final int ticksPerWriter;
private StreamingMonitor monitor;

public Synchronizer(List<StreamWriter> tasks) {
this(tasks, DEFAULT_TICKS_PER_WRITER);
monitor = new StreamingMonitor(tasks, DEFAULT_TICKS_PER_WRITER);
}

public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) {
this.tasks = tasks;
this.ticksPerWriter = ticksPerWriter;
monitor = new StreamingMonitor(tasks, ticksPerWriter);
}

/**
* Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks
* in strict ascending id order.
*/
public void execute() {
// add monitor and sync
public void execute() throws InterruptedException {
for (StreamWriter writer : tasks) {
writer.attachMonitor(monitor);
Thread worker = new Thread(writer, "stream-writer-" + writer.getId());
worker.setDaemon(true);
worker.start();
}
synchronized (monitor) {
while (monitor.cntTicks != monitor.cntCompleteTicks) {
monitor.wait();
}
}
}

}
Loading