Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,24 @@ public void attachMonitor(StreamingMonitor monitor) {

@Override
public void run() {
// Writer threads are intentionally infinite for the task contract.
while (true) {
output.print(message);
onTick.run();
synchronized (monitor) {
while (monitor.getCurrentId() != id && !monitor.isLimitReached()) {
try {
monitor.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
if (monitor.isLimitReached()) {
monitor.notifyAll();
break;
}
output.print(message);
onTick.run();
monitor.nextTurn();
monitor.notifyAll();
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,32 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

import lombok.Getter;

import java.util.List;
import java.util.concurrent.CountDownLatch;

public class StreamingMonitor {
// impl your sync here
@Getter
private final CountDownLatch latch;
private final List<Integer> order;
private int currentInd;

public StreamingMonitor(List<StreamWriter> tasks, int ticksPerWriter) {
this.order = tasks.stream().map(StreamWriter::getId).sorted().toList();
this.currentInd = 0;
this.latch = new CountDownLatch(order.size() * ticksPerWriter);
}

void nextTurn() {
currentInd = (currentInd + 1) % order.size();
latch.countDown();
}

int getCurrentId() {
return order.get(currentInd);
}

boolean isLimitReached() {
return latch.getCount() == 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) {
* Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks
* in strict ascending id order.
*/
public void execute() {
// add monitor and sync
public void execute() throws InterruptedException {
StreamingMonitor monitor = new StreamingMonitor(tasks, ticksPerWriter);

for (StreamWriter writer : tasks) {
writer.attachMonitor(monitor);
Thread worker = new Thread(writer, "stream-writer-" + writer.getId());
worker.setDaemon(true);
worker.start();
}

monitor.getLatch().await();
}

}
26 changes: 26 additions & 0 deletions src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/t.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
1. синхронизатор запускает все потоки -- понятно
2. упорядочить вывод по id
- нужно я так понимаю реализовать StreamingMonitor
- нужно чтобы 1ый writer был с минимальным id
- после него подхватывал работу 2ой и т.д.
- разберемся с этим ниже
- передать каждому writer перед стартом
- в самом writer синхронизировать внутри while (synchronized + wait)
3. взять от каждого первые tickPerWrite
- пункт3 содержит в себе это условие: после N * ticks per write завершить execute()
- самый топорный способ: создать массив, в который по id каждого потока будет лежать сколько осталось сделать вызовов, тогда надо каждому потоку еще проверять условие: если == 0, завершиться
- или можно сделать массив latch (только возникает вопрос: как проверить все эти latch? думаю что то типо такого:
```java
for (var latch: latches) {
latch.await(); // я синтаксис точный не помню -- важна суть
}
```

- нужно чтобы 1ый writer был с минимальным id
- после него подхватывал работу 2ой и т.д.
из мыслей: было бы классно, если в StreamingMonitor лежала какая то последовательность:
1 -> 2 -> 3
потоки могли бы читать (concurency reading это же не гонка!), но с synchronized (mutex)
как только поток видит: о мой id: (внутри самого writer он делает свою работу под захватом и меняет последовательность)

как идея?
Loading