forked from bitcoin/bitcoin
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol.cpp
More file actions
156 lines (147 loc) · 5.27 KB
/
protocol.cpp
File metadata and controls
156 lines (147 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) 2021-present The Bitcoin Core developers
// Distributed under the MIT software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
#include <interfaces/init.h>
#include <ipc/capnp/context.h>
#include <ipc/capnp/init.capnp.h>
#include <ipc/capnp/init.capnp.proxy.h>
#include <ipc/capnp/protocol.h>
#include <ipc/exception.h>
#include <ipc/protocol.h>
#include <kj/async.h>
#include <logging.h>
#include <mp/proxy-io.h>
#include <mp/proxy-types.h>
#include <mp/util.h>
#include <util/threadnames.h>
#include <cassert>
#include <cerrno>
#include <future>
#include <memory>
#include <mutex>
#include <optional>
#include <string>
#include <sys/socket.h>
#include <system_error>
#include <thread>
namespace ipc {
namespace capnp {
namespace {
mp::Log GetRequestedIPCLogLevel()
{
if (LogAcceptCategory(BCLog::IPC, BCLog::Level::Trace)) return mp::Log::Trace;
if (LogAcceptCategory(BCLog::IPC, BCLog::Level::Debug)) return mp::Log::Debug;
// Info, Warning, and Error are logged unconditionally
return mp::Log::Info;
}
void IpcLogFn(mp::LogMessage message)
{
switch (message.level) {
case mp::Log::Trace:
LogTrace(BCLog::IPC, "%s", message.message);
return;
case mp::Log::Debug:
LogDebug(BCLog::IPC, "%s", message.message);
return;
case mp::Log::Info:
LogInfo("ipc: %s", message.message);
return;
case mp::Log::Warning:
LogWarning("ipc: %s", message.message);
return;
case mp::Log::Error:
LogError("ipc: %s", message.message);
return;
case mp::Log::Raise:
LogError("ipc: %s", message.message);
throw Exception(message.message);
} // no default case, so the compiler can warn about missing cases
// Be conservative and assume that if MP ever adds a new log level, it
// should only be shown at our most verbose level.
LogTrace(BCLog::IPC, "%s", message.message);
}
class CapnpProtocol : public Protocol
{
public:
~CapnpProtocol() noexcept(true)
{
m_loop_ref.reset();
if (m_loop_thread.joinable()) m_loop_thread.join();
assert(!m_loop);
};
std::unique_ptr<interfaces::Init> connect(int fd, const char* exe_name) override
{
startLoop(exe_name);
return mp::ConnectStream<messages::Init>(*m_loop, fd);
}
void listen(int listen_fd, const char* exe_name, interfaces::Init& init) override
{
startLoop(exe_name);
if (::listen(listen_fd, /*backlog=*/5) != 0) {
throw std::system_error(errno, std::system_category());
}
mp::ListenConnections<messages::Init>(*m_loop, listen_fd, init);
}
void serve(int fd, const char* exe_name, interfaces::Init& init, const std::function<void()>& ready_fn = {}) override
{
assert(!m_loop);
mp::g_thread_context.thread_name = mp::ThreadName(exe_name);
mp::LogOptions opts = {
.log_fn = IpcLogFn,
.log_level = GetRequestedIPCLogLevel()
};
m_loop.emplace(exe_name, std::move(opts), &m_context);
if (ready_fn) ready_fn();
mp::ServeStream<messages::Init>(*m_loop, fd, init);
m_parent_connection = &m_loop->m_incoming_connections.back();
m_loop->loop();
m_loop.reset();
}
void disconnectIncoming() override
{
if (!m_loop) return;
// Delete incoming connections, except the connection to a parent
// process (if there is one), since a parent process should be able to
// monitor and control this process, even during shutdown.
m_loop->sync([&] {
m_loop->m_incoming_connections.remove_if([this](mp::Connection& c) { return &c != m_parent_connection; });
});
}
void addCleanup(std::type_index type, void* iface, std::function<void()> cleanup) override
{
mp::ProxyTypeRegister::types().at(type)(iface).cleanup_fns.emplace_back(std::move(cleanup));
}
Context& context() override { return m_context; }
void startLoop(const char* exe_name)
{
if (m_loop) return;
std::promise<void> promise;
m_loop_thread = std::thread([&] {
util::ThreadRename("capnp-loop");
mp::LogOptions opts = {
.log_fn = IpcLogFn,
.log_level = GetRequestedIPCLogLevel()
};
m_loop.emplace(exe_name, std::move(opts), &m_context);
m_loop_ref.emplace(*m_loop);
promise.set_value();
m_loop->loop();
m_loop.reset();
});
promise.get_future().wait();
}
Context m_context;
std::thread m_loop_thread;
//! EventLoop object which manages I/O events for all connections.
std::optional<mp::EventLoop> m_loop;
//! Reference to the same EventLoop. Increments the loop’s refcount on
//! creation, decrements on destruction. The loop thread exits when the
//! refcount reaches 0. Other IPC objects also hold their own EventLoopRef.
std::optional<mp::EventLoopRef> m_loop_ref;
//! Connection to parent, if this is a child process spawned by a parent process.
mp::Connection* m_parent_connection{nullptr};
};
} // namespace
std::unique_ptr<Protocol> MakeCapnpProtocol() { return std::make_unique<CapnpProtocol>(); }
} // namespace capnp
} // namespace ipc