forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinlet_connection.cpp
More file actions
340 lines (299 loc) · 14.8 KB
/
inlet_connection.cpp
File metadata and controls
340 lines (299 loc) · 14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
#include <iostream>
#include <boost/bind.hpp>
#include "cast.h"
#include "inlet_connection.h"
#include "api_config.h"
// === implementation of the inlet_connection class ===
using namespace lsl;
using namespace lslboost::asio;
/**
* Construct a new inlet connection.
* @param info A resolved stream info object (as coming from one of the resolver functions).
* It is possible -- but highly discouraged -- to initialize a connection with an unresolved (i.e. made-up) stream_info; in this case,
* a connection will be resolved alongside based on the provided info, but will only succeed if the channel count and channel format
* match the one that is provided.
* @param recover Try to silently recover lost streams that are recoverable (=those that that have a source_id set).
* In all other cases (recover is false or the stream is not recoverable) a lost_error is thrown where indicated if the stream's source is lost (e.g., due to an app or computer crash).
*/
inlet_connection::inlet_connection(const stream_info_impl &info, bool recover):
type_info_(info), host_info_(info), tcp_protocol_(tcp::v4()), udp_protocol_(udp::v4()),
recovery_enabled_(recover), lost_(false), shutdown_(false),
last_receive_time_(lsl_clock()), active_transmissions_(0) {
// if the given stream_info is already fully resolved...
if (!host_info_.v4address().empty() || !host_info_.v6address().empty()) {
// check LSL protocol version (we strictly forbid incompatible protocols instead of risking silent failure)
if (type_info_.version()/100 > api_config::get_instance()->use_protocol_version()/100)
throw std::runtime_error((std::string("The received stream (")+=host_info_.name()) += ") uses a newer protocol version than this inlet. Please update.");
// select TCP/UDP protocol versions
if (api_config::get_instance()->allow_ipv6()) {
// if IPv6 is optionally allowed...
if (host_info_.v4address().empty() || !host_info_.v4data_port() || !host_info_.v4service_port()) {
// then use it but only iff there are problems with the IPv4 connection data
tcp_protocol_ = tcp::v6();
udp_protocol_ = udp::v6();
} else {
// (otherwise stick to IPv4)
tcp_protocol_ = tcp::v4();
udp_protocol_ = udp::v4();
}
} else {
// otherwise use the protocol type that is selected in the config
tcp_protocol_ = api_config::get_instance()->allow_ipv4() ? tcp::v4() : tcp::v6();
udp_protocol_ = api_config::get_instance()->allow_ipv4() ? udp::v4() : udp::v6();
}
if (recovery_enabled_ && type_info_.source_id().empty()) {
// we cannot correctly recover streams which don't have a unique source id
std::clog << "Note: The stream named '" << host_info_.name() << "' could not be recovered automatically if its provider crashed because it does not specify a unique data source ID." << std::endl;
recovery_enabled_ = false;
}
} else {
// the actual endpoint is not known yet -- we need to discover it later on the fly
// check that all the necessary information for this has been fully specified
if (type_info_.name().empty() && type_info_.type().empty() && type_info_.source_id().empty())
throw std::invalid_argument("When creating an inlet with a constructed (instead of resolved) stream_info, you must assign at least the name, type or source_id of the desired stream.");
if (type_info_.channel_count() == 0)
throw std::invalid_argument("When creating an inlet with a constructed (instead of resolved) stream_info, you must assign a nonzero channel count.");
if (type_info_.channel_format() == cft_undefined)
throw std::invalid_argument("When creating an inlet with a constructed (instead of resolved) stream_info, you must assign a channel format.");
// use the protocol that is specified in the config
tcp_protocol_ = api_config::get_instance()->allow_ipv4() ? tcp::v4() : tcp::v6();
udp_protocol_ = api_config::get_instance()->allow_ipv4() ? udp::v4() : udp::v6();
// assign initial dummy endpoints
host_info_.v4address("127.0.0.1");
host_info_.v6address("::1");
host_info_.v4data_port(49999);
host_info_.v4service_port(49999);
host_info_.v6data_port(49999);
host_info_.v6service_port(49999);
// recovery must generally be enabled
recovery_enabled_ = true;
}
}
/// Engage the connection and its recovery watchdog thread.
void inlet_connection::engage() {
if (recovery_enabled_)
watchdog_thread_ = lslboost::thread(&inlet_connection::watchdog_thread,this);
}
/// Disengage the connection and all its resolver capabilities (including the watchdog).
void inlet_connection::disengage() {
// shut down the connection
{
lslboost::lock_guard<lslboost::mutex> lock(shutdown_mut_);
shutdown_ = true;
}
shutdown_cond_.notify_all();
// cancel all operations (resolver, streams, ...)
resolver_.cancel();
cancel_and_shutdown();
// and wait for the watchdog to finish
if (recovery_enabled_)
watchdog_thread_.join();
}
// === external accessors for connection properties ===
// get the TCP endpoint from the info (according to our configured protocol)
tcp::endpoint inlet_connection::get_tcp_endpoint() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
if(tcp_protocol_ == tcp::v4()) {
std::string address = host_info_.v4address();
uint16_t port = host_info_.v4data_port();
return tcp::endpoint(ip::make_address(address), port);
//This more complicated procedure is required when the address is an ipv6 link-local address.
//Simplified from https://stackoverflow.com/questions/10286042/using-lslboost-to-accept-on-ipv6-link-scope-address
//It does not hurt when the address is not link-local.
} else {
std::string address = host_info_.v6address();
std::string port = to_string(host_info_.v6data_port());
io_context io;
ip::tcp::resolver resolver(io);
ip::tcp::resolver::results_type res = resolver.resolve(address, port);
if(res.size() == 0) {
throw lost_error("Unable to resolve tcp stream at address: " + address + ", port: " + port);
}
//assuming first (typically only) element in list is valid.
return *res.begin();
}
}
// get the UDP endpoint from the info (according to our configured protocol)
udp::endpoint inlet_connection::get_udp_endpoint() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
if(udp_protocol_ == udp::v4()) {
std::string address = host_info_.v4address();
uint16_t port = host_info_.v4service_port();
return udp::endpoint(ip::make_address(address), port);
//This more complicated procedure is required when the address is an ipv6 link-local address.
//Simplified from https://stackoverflow.com/questions/10286042/using-lslboost-to-accept-on-ipv6-link-scope-address
//It does not hurt when the address is not link-local.
} else {
std::string address = host_info_.v6address();
std::string port = to_string(host_info_.v6service_port());
io_context io;
ip::udp::resolver resolver(io);
ip::udp::resolver::results_type res = resolver.resolve(address, port);
if(res.size() == 0) {
throw lost_error("Unable to resolve udp stream at address: " + address + ", port: " + port);
}
//assuming first (typically only) element in list is valid.
return *res.begin();
}
}
// get the hostname from the info
std::string inlet_connection::get_hostname() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
return host_info_.hostname();
}
/// get the current stream UID (may change between crashes/reconnects)
std::string inlet_connection::current_uid() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
return host_info_.uid();
}
/// get the current nominal sampling rate (might change between crashes/reconnects)
double inlet_connection::current_srate() {
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
return host_info_.nominal_srate();
}
// === connection recovery logic ===
/// Performs the actual work of attempting a recovery.
void inlet_connection::try_recover() {
if (recovery_enabled_) {
try {
lslboost::lock_guard<lslboost::mutex> lock(recovery_mut_);
// first create the query string based on the known stream information
std::ostringstream query;
{
lslboost::shared_lock<lslboost::shared_mutex> lock(host_info_mut_);
// construct query according to the fields that are present in the stream_info
const char *channel_format_strings[] = {"undefined","float32","double64","string","int32","int16","int8","int64"};
query << "channel_count='" << host_info_.channel_count() << "'";
if (!host_info_.name().empty())
query << " and name='" << host_info_.name() << "'";
if (!host_info_.type().empty())
query << " and type='" << host_info_.type() << "'";
// for floating point values, str2double(double2str(fpvalue)) == fpvalue is most
// likely wrong and might lead to streams not being resolved.
// We accept that a lost stream might be replaced by a stream from the same host
// with the same type, channel type and channel count but a different srate
/*if (host_info_.nominal_srate() > 0)
query << " and nominal_srate='" << host_info_.nominal_srate() << "'";
*/
if (!host_info_.source_id().empty())
query << " and source_id='" << host_info_.source_id() << "'";
query << " and channel_format='" << channel_format_strings[host_info_.channel_format()] << "'";
}
// attempt a recovery
for (int attempt=0;;attempt++) {
// issue the resolve (blocks until it is either cancelled or got at least one matching streaminfo and has waited for a certain timeout)
std::vector<stream_info_impl> infos = resolver_.resolve_oneshot(query.str(),1,FOREVER,attempt==0 ? 1.0 : 5.0);
if (!infos.empty()) {
// got a result
lslboost::unique_lock<lslboost::shared_mutex> lock(host_info_mut_);
// check if any of the returned streams is the one that we're currently connected to
for (std::size_t k=0;k<infos.size();k++)
if (infos[k].uid() == host_info_.uid())
return; // in this case there is no need to recover (we're still fine)
// otherwise our stream is gone and we indeed need to recover:
// ensure that the query result is unique (since someone might have used a non-unique stream ID)
if (infos.size() == 1) {
// update the endpoint
host_info_ = infos[0];
// cancel all cancellable operations registered with this connection
cancel_all_registered();
// invoke any callbacks associated with a connection recovery
lslboost::lock_guard<lslboost::mutex> lock(onrecover_mut_);
for(std::map<void*,lslboost::function<void()> >::iterator i=onrecover_.begin(),e=onrecover_.end();i!=e;i++)
(i->second)();
} else {
// there are multiple possible streams to connect to in a recovery attempt: we warn and re-try
// this is because we don't want to randomly connect to the wrong source without the user knowing about it;
// the correct action (if this stream shall indeed have multiple instances) is to change the user code and
// make its source_id unique, or remove the source_id altogether if that's not possible (therefore disabling the ability to recover)
std::clog << "Found multiple streams with name='" << host_info_.name() << "' and source_id='" << host_info_.source_id() << "'. Cannot recover unless all but one are closed." << std::endl;
continue;
}
} else {
// cancelled
}
break;
}
} catch(std::exception &e) {
std::cerr << "A recovery attempt encountered an unexpected error: " << e.what() << std::endl;
}
}
}
/// A thread that periodically re-resolves the stream and checks if it has changed its location
void inlet_connection::watchdog_thread() {
while(!lost_ && !shutdown_) {
try {
// we only try to recover if a) there are active transmissions and b) we haven't seen new data for some time
{
lslboost::unique_lock<lslboost::mutex> lock(client_status_mut_);
if ((active_transmissions_ > 0) && (lsl_clock() - last_receive_time_ > api_config::get_instance()->watchdog_time_threshold())) {
lock.unlock();
try_recover();
}
}
// instead of sleeping we're waiting on a condition variable for the sleep duration
// so that the watchdog can be cancelled conveniently
{
lslboost::unique_lock<lslboost::mutex> lock(shutdown_mut_);
shutdown_cond_.wait_for(lock,lslboost::chrono::duration<double>(api_config::get_instance()->watchdog_check_interval()), lslboost::bind(&inlet_connection::shutdown,this));
}
} catch(std::exception &e) {
std::cerr << "Unexpected hiccup in the watchdog thread: " << e.what() << std::endl;
}
}
}
/// Issue a recovery attempt if a connection loss was detected.
void inlet_connection::try_recover_from_error() {
if (!shutdown_) {
if (!recovery_enabled_) {
// if the stream is irrecoverable it is now lost,
// so we need to notify the other inlet components
lost_ = true;
try {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
for(std::map<void*,lslboost::condition_variable*>::iterator i=onlost_.begin(),e=onlost_.end();i!=e;i++)
i->second->notify_all();
} catch(std::exception &e) {
std::cerr << "Unexpected problem while trying to issue a connection loss notification: " << e.what() << std::endl;
}
throw lost_error("The stream read by this inlet has been lost. To recover, you need to re-resolve the source and re-create the inlet.");
} else
try_recover();
}
}
// === client status updates ===
/// Indicate that a transmission is now active.
void inlet_connection::acquire_watchdog() {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
active_transmissions_++;
}
/// Indicate that a transmission has just completed.
void inlet_connection::release_watchdog() {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
active_transmissions_--;
}
/// Update the time when the last content was received from the source
void inlet_connection::update_receive_time(double t) {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
last_receive_time_ = t;
}
/// Register a condition variable that should be notified when a connection is lost
void inlet_connection::register_onlost(void *id, lslboost::condition_variable *cond) {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
onlost_[id] = cond;
}
/// Unregister a condition variable
void inlet_connection::unregister_onlost(void *id) {
lslboost::lock_guard<lslboost::mutex> lock(client_status_mut_);
onlost_.erase(id);
}
/// Register a callback function that shall be called when a recovery has been performed
void inlet_connection::register_onrecover(void *id, const lslboost::function<void()> &func) {
lslboost::lock_guard<lslboost::mutex> lock(onrecover_mut_);
onrecover_[id] = func;
}
/// Unregister a recovery callback function
void inlet_connection::unregister_onrecover(void *id) {
lslboost::lock_guard<lslboost::mutex> lock(onrecover_mut_);
onrecover_.erase(id);
}