Skip to content

Commit fc150f5

Browse files
committed
make shared_mutex fair
1 parent 2d8f2ef commit fc150f5

File tree

2 files changed

+94
-46
lines changed

2 files changed

+94
-46
lines changed

runtime/include/blocking_primitives.h

Lines changed: 54 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,24 @@ struct mutex {
3838
friend struct condition_variable;
3939
};
4040

41-
struct shared_mutex {
41+
struct condition_variable {
42+
as_atomic void wait(std::unique_lock<ltest::mutex>& lock) {
43+
addr = lock.mutex()->state.addr;
44+
lock.unlock();
45+
this_coro->SetBlocked({addr, 1});
46+
CoroYield();
47+
lock.lock();
48+
}
49+
50+
as_atomic void notify_one() { block_manager.UnblockOn(addr, 1); }
51+
52+
as_atomic void notify_all() { block_manager.UnblockAllOn(addr); }
53+
54+
private:
55+
std::intptr_t addr;
56+
};
57+
58+
struct shared_mutex_r {
4259
as_atomic void lock() {
4360
while (locked != 0) {
4461
this_coro->SetBlocked(state);
@@ -67,21 +84,47 @@ struct shared_mutex {
6784
BlockState state{reinterpret_cast<std::intptr_t>(&locked), locked};
6885
};
6986

70-
struct condition_variable {
71-
as_atomic void wait(std::unique_lock<ltest::mutex>& lock) {
72-
addr = lock.mutex()->state.addr;
73-
lock.unlock();
74-
this_coro->SetBlocked({addr, 1});
75-
CoroYield();
76-
lock.lock();
87+
struct shared_mutex {
88+
as_atomic void lock() {
89+
std::unique_lock lock{mutex_};
90+
while (write_) {
91+
write_entered_.wait(lock);
92+
}
93+
write_ = true;
94+
while (reader_count_ > 0) {
95+
no_readers_.wait(lock);
96+
}
7797
}
7898

79-
as_atomic void notify_one() { block_manager.UnblockOn(addr, 1); }
99+
as_atomic void unlock() {
100+
std::unique_lock lock{mutex_};
101+
write_ = false;
102+
write_entered_.notify_all();
103+
}
80104

81-
as_atomic void notify_all() { block_manager.UnblockAllOn(addr); }
105+
as_atomic void lock_shared() {
106+
std::unique_lock lock{mutex_};
107+
while (write_) {
108+
write_entered_.wait(lock);
109+
}
110+
++reader_count_;
111+
}
112+
as_atomic void unlock_shared() {
113+
std::unique_lock lock{mutex_};
114+
--reader_count_;
115+
if (write_ && reader_count_ == 0) {
116+
no_readers_.notify_one();
117+
}
118+
}
82119

83120
private:
84-
std::intptr_t addr;
121+
int reader_count_{0};
122+
bool write_{false};
123+
124+
ltest::condition_variable write_entered_;
125+
ltest::condition_variable no_readers_;
126+
127+
ltest::mutex mutex_;
85128
};
86129

87130
} // namespace ltest

runtime/include/pct_strategy.h

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,37 @@ struct PctStrategy : public BaseStrategyWithThreads<TargetObj, Verifier> {
2020
PrepareForDepth(current_depth, 1);
2121
}
2222

23+
void AvoidLivelock(size_t& index, size_t& prior) {
24+
auto& threads = this->threads;
25+
if (fair_stage > 0) [[unlikely]] {
26+
for (size_t attempt = 0; attempt < threads.size(); ++attempt) {
27+
auto i = (++last_chosen) % threads.size();
28+
if (!threads[i].empty() && threads[i].back()->IsBlocked()) {
29+
continue;
30+
}
31+
index = i;
32+
prior = priorities[i];
33+
break;
34+
}
35+
// debug(stderr, "round robin choose: %d\n", index_of_max);
36+
if (fair_start == index) {
37+
--fair_stage;
38+
}
39+
}
40+
41+
// TODO: Choose wiser constant
42+
if (count_chosen_same == 1000 && index == last_chosen) [[unlikely]] {
43+
fair_stage = 5;
44+
fair_start = index;
45+
}
46+
47+
if (index == last_chosen) {
48+
++count_chosen_same;
49+
} else {
50+
count_chosen_same = 1;
51+
}
52+
}
53+
2354
std::optional<size_t> NextThreadId() override {
2455
auto& threads = this->threads;
2556
size_t max = std::numeric_limits<size_t>::min();
@@ -45,33 +76,7 @@ struct PctStrategy : public BaseStrategyWithThreads<TargetObj, Verifier> {
4576
}
4677
}
4778

48-
if (round_robin_stage > 0) [[unlikely]] {
49-
for (size_t attempt = 0; attempt < threads.size(); ++attempt) {
50-
auto i = (++last_chosen) % threads.size();
51-
if (!threads[i].empty() && threads[i].back()->IsBlocked()) {
52-
continue;
53-
}
54-
index_of_max = i;
55-
max = priorities[i];
56-
break;
57-
}
58-
// debug(stderr, "round robin choose: %d\n", index_of_max);
59-
if (round_robin_start == index_of_max) {
60-
--round_robin_stage;
61-
}
62-
}
63-
64-
// TODO: Choose wiser constant
65-
if (count_chosen_same == 1000 && index_of_max == last_chosen) [[unlikely]] {
66-
round_robin_stage = 5;
67-
round_robin_start = index_of_max;
68-
}
69-
70-
if (index_of_max == last_chosen) {
71-
++count_chosen_same;
72-
} else {
73-
count_chosen_same = 1;
74-
}
79+
AvoidLivelock(index_of_max, max);
7580

7681
if (max == std::numeric_limits<size_t>::min()) [[unlikely]] {
7782
return std::nullopt;
@@ -117,7 +122,7 @@ struct PctStrategy : public BaseStrategyWithThreads<TargetObj, Verifier> {
117122
}
118123
}
119124

120-
if (round_robin_stage > 0) [[unlikely]] {
125+
if (fair_stage > 0) [[unlikely]] {
121126
for (size_t attempt = 0; attempt < threads.size(); ++attempt) {
122127
auto i = (++last_chosen) % threads.size();
123128
int task_index = this->GetNextTaskInThread(i);
@@ -130,15 +135,15 @@ struct PctStrategy : public BaseStrategyWithThreads<TargetObj, Verifier> {
130135
break;
131136
}
132137
// debug(stderr, "round robin choose: %d\n", index_of_max);
133-
if (round_robin_start == index_of_max) {
134-
--round_robin_stage;
138+
if (fair_start == index_of_max) {
139+
--fair_stage;
135140
}
136141
}
137142

138143
// TODO: Choose wiser constant
139144
if (count_chosen_same == 1000 && index_of_max == last_chosen) [[unlikely]] {
140-
round_robin_stage = 5;
141-
round_robin_start = index_of_max;
145+
fair_stage = 5;
146+
fair_start = index_of_max;
142147
}
143148

144149
if (index_of_max == last_chosen) {
@@ -194,7 +199,7 @@ struct PctStrategy : public BaseStrategyWithThreads<TargetObj, Verifier> {
194199
k_statistics.push_back(current_schedule_length);
195200
current_schedule_length = 0;
196201
count_chosen_same = 0;
197-
round_robin_stage = 0;
202+
fair_stage = 0;
198203

199204
// current_depth have been increased
200205
size_t new_k = std::reduce(k_statistics.begin(), k_statistics.end()) /
@@ -227,8 +232,8 @@ struct PctStrategy : public BaseStrategyWithThreads<TargetObj, Verifier> {
227232
// original article)
228233
size_t count_chosen_same;
229234
size_t last_chosen;
230-
size_t round_robin_start;
231-
size_t round_robin_stage{0};
235+
size_t fair_start;
236+
size_t fair_stage{0};
232237
std::vector<int> priorities;
233238
std::vector<size_t> priority_change_points;
234239
std::mt19937 rng;

0 commit comments

Comments
 (0)