-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrx-asio.h
More file actions
117 lines (97 loc) · 3.76 KB
/
rx-asio.h
File metadata and controls
117 lines (97 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
#include "rxcpp/rx-scheduler.hpp"
// TODO: C++17 Networking TS
#ifdef WITHOUT_BOOST
// Standalone ASIO
#include <asio.hpp>
namespace asio_ns=::asio
namespace system_ns=::std
#else
// Boost.ASIO
#include <boost/asio.hpp>
namespace asio_ns=::boost::asio;
namespace system_ns=::boost::system;
#endif
namespace rxcpp {
namespace schedulers {
class asio : public scheduler_interface
{
typedef asio this_type;
asio_ns::io_service& io_service;
asio(const this_type&) = delete;
struct asio_worker : public worker_interface
{
private:
typedef asio_worker this_type;
public:
explicit asio_worker(composite_subscription cs, asio_ns::io_service& ios_)
: lifetime(cs), ios(ios_)
{
printf("worker %p created\n", this);
}
virtual ~asio_worker()
{
printf("worker %p destroyed\n", this);
lifetime.unsubscribe();
}
virtual clock_type::time_point now() const override { return clock_type::now(); }
virtual void schedule(const schedulable& scbl) const override
{
if (scbl.is_subscribed()) {
auto keep_alive = shared_from_this();
ios.post([=]() {
(void)(keep_alive);
// allow recursion
scbl(recursion(true).get_recurse());
});
}
}
virtual void schedule(clock_type::time_point when, const schedulable& scbl) const override
{
if (scbl.is_subscribed()) {
// printf("scheduled on %p with timeout\n", this);
auto keep_alive = shared_from_this();
auto timer = std::make_shared<asio_ns::basic_waitable_timer<clock_type>>
(ios, when);
timer->async_wait([=](const system_ns::error_code&) {
(void)(keep_alive);
(void)(timer);
// allow recursion
scbl(recursion(true).get_recurse());
});
}
}
composite_subscription lifetime;
asio_ns::io_service& ios;
};
public:
asio(asio_ns::io_service& ios)
: io_service(ios) { }
virtual ~asio() { }
virtual clock_type::time_point now() const { return clock_type::now(); }
virtual worker create_worker(composite_subscription cs) const
{
return worker(cs, std::make_shared<asio_worker>(cs, io_service));
}
};
inline scheduler make_asio(asio_ns::io_service& ios)
{
return make_scheduler<asio>(ios);
}
} // End of namespace schedulers
inline observe_on_one_worker observe_on_asio(asio_ns::io_service& io_service)
{
return observe_on_one_worker(rxsc::make_asio(io_service));
}
inline synchronize_in_one_worker synchronize_in_asio(asio_ns::io_service& io_service)
{
return synchronize_in_one_worker(rxsc::make_asio(io_service));
}
inline identity_one_worker identitiy_asio(asio_ns::io_service& io_service)
{
return identity_one_worker(rxsc::make_asio(io_service));
}
inline serialize_one_worker serialize_asio(asio_ns::io_service& io_service)
{
return serialize_one_worker(rxsc::make_asio(io_service));
}
} // End of namespace rxcpp