Skip to content

Commit d929fe6

Browse files
committed
add nonlinear_buffered_channel
1 parent fc150f5 commit d929fe6

File tree

6 files changed

+140
-58
lines changed

6 files changed

+140
-58
lines changed

runtime/include/verifying.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#pragma once
22
#include <gflags/gflags.h>
33

4+
#include <chrono>
45
#include <memory>
56
#include <type_traits>
67

verifying/blocking/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ set (SOURCE_TARGET_LIST
55
shared_mutexed_register.cpp
66
bank.cpp
77
buffered_channel.cpp
8+
nonlinear_buffered_channel.cpp
89
simple_deadlock.cpp
910
bank_deadlock.cpp
1011
)
@@ -79,6 +80,10 @@ add_integration_test_blocking("buffered_channel_pct" "verify" FALSE
7980
buffered_channel --strategy pct --rounds 10000
8081
)
8182

83+
add_integration_test_blocking("nonlinear_buffered_channel_pct" "verify" TRUE
84+
nonlinear_buffered_channel --strategy pct --rounds 10000
85+
)
86+
8287
add_integration_test_blocking("buffered_channel_random" "verify" FALSE
8388
buffered_channel --strategy random --rounds 10000
8489
)

verifying/blocking/buffered_channel.cpp

Lines changed: 3 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,67 +1,13 @@
1+
#include "../specs/buffered_channel.h"
2+
13
#include <atomic>
24
#include <condition_variable>
35
#include <cstring>
46
#include <mutex>
57

6-
#include "../specs/queue.h"
7-
#include "runtime/include/value_wrapper.h"
88
#include "runtime/include/verifying.h"
99
#include "verifiers/buffered_channel_verifier.h"
1010

11-
constexpr int N = 5;
12-
13-
namespace spec {
14-
struct BufferedChannel {
15-
void Send(int v) { deq.push_back(v); }
16-
17-
int Recv() {
18-
if (deq.empty()) {
19-
return -1;
20-
}
21-
auto value = deq.front();
22-
deq.pop_front();
23-
return value;
24-
}
25-
26-
using method_t = std::function<ValueWrapper(BufferedChannel *l, void *args)>;
27-
static auto GetMethods() {
28-
method_t send_func = [](BufferedChannel *l, void *args) {
29-
auto real_args = reinterpret_cast<std::tuple<int> *>(args);
30-
l->Send(std::get<0>(*real_args));
31-
return void_v;
32-
};
33-
34-
method_t recv_func = [](BufferedChannel *l, void *args) -> int {
35-
return l->Recv();
36-
};
37-
38-
return std::map<std::string, method_t>{
39-
{"Send", send_func},
40-
{"Recv", recv_func},
41-
};
42-
}
43-
44-
std::deque<int> deq;
45-
};
46-
47-
struct BufferedChannelHash {
48-
size_t operator()(const BufferedChannel &r) const {
49-
int res = 0;
50-
for (int elem : r.deq) {
51-
res += elem;
52-
}
53-
return res;
54-
}
55-
};
56-
57-
struct BufferedChannelEquals {
58-
bool operator()(const BufferedChannel &lhs,
59-
const BufferedChannel &rhs) const {
60-
return lhs.deq == rhs.deq;
61-
}
62-
};
63-
}; // namespace spec
64-
6511
struct BufferedChannel {
6612
non_atomic void Send(int v) {
6713
std::unique_lock lock{mutex_};
@@ -86,7 +32,7 @@ struct BufferedChannel {
8632
}
8733
debug(stderr, "Recv\n");
8834
auto val = queue_[ridx_];
89-
ridx_ = (ridx_ + 1) % 5;
35+
ridx_ = (ridx_ + 1) % N;
9036
empty_ = (sidx_ == ridx_);
9137
full_ = false;
9238
send_side_cv_.notify_one();

verifying/blocking/folly_flatcombining_queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
#include "runtime/include/verifying_macro.h"
66
#include "verifying/specs/queue.h"
77

8-
class FlatCombiningQueue : public folly::FlatCombining<FlatCombiningQueue> {
8+
class FlatCombiningQueue
9+
: public folly::FlatCombining<FlatCombiningQueue, ltest::mutex> {
910
spec::Queue<> queue_;
1011

1112
public:
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#include <atomic>
2+
#include <condition_variable>
3+
#include <cstring>
4+
#include <mutex>
5+
6+
#include "../specs/buffered_channel.h"
7+
#include "runtime/include/verifying.h"
8+
#include "verifiers/buffered_channel_verifier.h"
9+
10+
struct BufferedChannel {
11+
non_atomic void Send(int v) {
12+
std::unique_lock lock{mutex_};
13+
while (!closed_ && full_) {
14+
debug(stderr, "Waiting in send...\n");
15+
send_side_cv_.wait(lock);
16+
}
17+
debug(stderr, "Send\n");
18+
19+
queue_[sidx_] = v;
20+
sidx_ = (sidx_ + 1) % N;
21+
empty_ = false;
22+
recv_side_cv_.notify_one();
23+
}
24+
25+
non_atomic int Recv() {
26+
std::unique_lock lock{mutex_};
27+
while (!closed_ && empty_) {
28+
debug(stderr, "Waiting in recv...\n");
29+
recv_side_cv_.wait(lock);
30+
}
31+
debug(stderr, "Recv\n");
32+
auto val = queue_[ridx_];
33+
ridx_ = (ridx_ + 1) % N;
34+
empty_ = (sidx_ == ridx_);
35+
full_ = false;
36+
send_side_cv_.notify_one();
37+
return val;
38+
}
39+
40+
void Close() {
41+
closed_.store(true);
42+
send_side_cv_.notify_all();
43+
recv_side_cv_.notify_all();
44+
}
45+
46+
std::mutex mutex_;
47+
std::condition_variable send_side_cv_, recv_side_cv_;
48+
std::atomic<bool> closed_{false};
49+
50+
bool full_{false};
51+
bool empty_{true};
52+
53+
uint32_t sidx_{0}, ridx_{0};
54+
55+
std::array<int, N> queue_{};
56+
};
57+
58+
auto generateInt(size_t) {
59+
return ltest::generators::makeSingleArg(rand() % 10 + 1);
60+
}
61+
62+
using spec_t =
63+
ltest::Spec<BufferedChannel, spec::BufferedChannel,
64+
spec::BufferedChannelHash, spec::BufferedChannelEquals>;
65+
66+
LTEST_ENTRYPOINT_CONSTRAINT(spec_t, spec::BufferedChannelVerifier);
67+
68+
target_method(generateInt, void, BufferedChannel, Send, int);
69+
target_method(ltest::generators::genEmpty, int, BufferedChannel, Recv);

verifying/specs/buffered_channel.h

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
2+
#include <deque>
3+
#include <functional>
4+
#include <map>
5+
6+
#include "runtime/include/value_wrapper.h"
7+
8+
constexpr int N = 5;
9+
10+
namespace spec {
11+
struct BufferedChannel {
12+
void Send(int v) { deq.push_back(v); }
13+
14+
int Recv() {
15+
if (deq.empty()) {
16+
return -1;
17+
}
18+
auto value = deq.front();
19+
deq.pop_front();
20+
return value;
21+
}
22+
23+
using method_t = std::function<ValueWrapper(BufferedChannel *l, void *args)>;
24+
static auto GetMethods() {
25+
method_t send_func = [](BufferedChannel *l, void *args) {
26+
auto real_args = reinterpret_cast<std::tuple<int> *>(args);
27+
l->Send(std::get<0>(*real_args));
28+
return void_v;
29+
};
30+
31+
method_t recv_func = [](BufferedChannel *l, void *args) -> int {
32+
return l->Recv();
33+
};
34+
35+
return std::map<std::string, method_t>{
36+
{"Send", send_func},
37+
{"Recv", recv_func},
38+
};
39+
}
40+
41+
std::deque<int> deq;
42+
};
43+
44+
struct BufferedChannelHash {
45+
size_t operator()(const BufferedChannel &r) const {
46+
int res = 0;
47+
for (int elem : r.deq) {
48+
res += elem;
49+
}
50+
return res;
51+
}
52+
};
53+
54+
struct BufferedChannelEquals {
55+
bool operator()(const BufferedChannel &lhs,
56+
const BufferedChannel &rhs) const {
57+
return lhs.deq == rhs.deq;
58+
}
59+
};
60+
}; // namespace spec

0 commit comments

Comments
 (0)