Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/client/messenger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,13 @@ void osd_messenger_t::handle_peer_epoll(int peer_fd, int epoll_events)

void osd_messenger_t::on_connect_peer(osd_num_t peer_osd, int peer_fd)
{
auto & wp = wanted_peers.at(peer_osd);
auto wp_it = wanted_peers.find(peer_osd);
if (wp_it == wanted_peers.end())
{
fprintf(stderr, "on_connect_peer: no wanted peer entry for OSD %ju\n", peer_osd);
return;
}
auto & wp = wp_it->second;
wp.connecting = false;
if (peer_fd < 0)
{
Expand Down
1 change: 1 addition & 0 deletions src/client/msgr_receive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ void osd_messenger_t::read_requests()
}
if (!sqe)
{
cl->refs--;
cl->read_msg.msg_iovlen = 0;
read_ready_clients.erase(read_ready_clients.begin(), read_ready_clients.begin() + i);
return;
Expand Down
10 changes: 10 additions & 0 deletions src/client/msgr_stop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,16 @@ void osd_messenger_t::stop_client(int peer_fd, bool force, bool force_delete)
osd_peer_fds.erase(osd_it);
}
}
#ifdef WITH_RDMA
if (cl->rdma_conn && cl->rdma_conn->cmid)
{
auto rdma_it = rdmacm_connections.find(cl->rdma_conn->cmid);
if (rdma_it != rdmacm_connections.end() && rdma_it->second == cl)
{
rdmacm_connections.erase(rdma_it);
}
}
#endif
#ifndef __MOCK__
// Then remove FD from the eventloop so we don't accidentally read something
tfd->set_fd_handler(peer_fd, false, NULL);
Expand Down
4 changes: 2 additions & 2 deletions src/osd/osd.h
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ class osd_t
void exec_show_config(osd_op_t *cur_op);
void exec_secondary(osd_op_t *cur_op);
void exec_secondary_real(osd_op_t *cur_op);
void exec_sec_read_bmp(osd_op_t *cur_op);
void exec_sec_lock(osd_op_t *cur_op);
void exec_sec_read_bmp(osd_op_t *cur_op, osd_client_t *cl);
void exec_sec_lock(osd_op_t *cur_op, osd_client_t *cl);
void secondary_op_callback(osd_op_t *cur_op);

// primary ops
Expand Down
26 changes: 18 additions & 8 deletions src/osd/osd_secondary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ bool osd_t::sec_check_pg_lock(osd_num_t primary_osd, const object_id &oid, uint3

void osd_t::exec_secondary_real(osd_op_t *cur_op)
{
auto cl_it = msgr.clients.find(cur_op->peer_fd);
if (cl_it == msgr.clients.end())
{
finish_op(cur_op, -EPIPE);
return;
}
auto cl = cl_it->second;
if (cur_op->req.hdr.opcode == OSD_OP_SEC_LIST &&
(cur_op->req.sec_list.flags & OSD_LIST_PRIMARY))
{
Expand All @@ -120,15 +127,14 @@ void osd_t::exec_secondary_real(osd_op_t *cur_op)
}
if (cur_op->req.hdr.opcode == OSD_OP_SEC_READ_BMP)
{
exec_sec_read_bmp(cur_op);
exec_sec_read_bmp(cur_op, cl);
return;
}
else if (cur_op->req.hdr.opcode == OSD_OP_SEC_LOCK)
{
exec_sec_lock(cur_op);
exec_sec_lock(cur_op, cl);
return;
}
auto cl = msgr.clients.at(cur_op->peer_fd);
cur_op->bs_op = new blockstore_op_t();
cur_op->bs_op->callback = [this, cur_op](blockstore_op_t* bs_op) { secondary_op_callback(cur_op); };
cur_op->bs_op->opcode = (cur_op->req.hdr.opcode == OSD_OP_SEC_READ ? BS_OP_READ
Expand Down Expand Up @@ -247,9 +253,8 @@ void osd_t::exec_secondary_real(osd_op_t *cur_op)
#endif
}

void osd_t::exec_sec_read_bmp(osd_op_t *cur_op)
void osd_t::exec_sec_read_bmp(osd_op_t *cur_op, osd_client_t *cl)
{
auto cl = msgr.clients.at(cur_op->peer_fd);
int n = cur_op->req.sec_read_bmp.len / sizeof(obj_ver_id);
if (n > 0)
{
Expand All @@ -275,10 +280,9 @@ void osd_t::exec_sec_read_bmp(osd_op_t *cur_op)
}

// Lock/Unlock PG
void osd_t::exec_sec_lock(osd_op_t *cur_op)
void osd_t::exec_sec_lock(osd_op_t *cur_op, osd_client_t *cl)
{
cur_op->reply.sec_lock.cur_primary = 0;
auto cl = msgr.clients.at(cur_op->peer_fd);
if (!cl->in_osd_num ||
cur_op->req.sec_lock.flags != OSD_SEC_LOCK_PG &&
cur_op->req.sec_lock.flags != OSD_SEC_UNLOCK_PG ||
Expand Down Expand Up @@ -340,7 +344,13 @@ void osd_t::exec_show_config(osd_op_t *cur_op)
? json11::Json::parse(std::string((char *)cur_op->buf), json_err)
: json11::Json();
auto peer_osd_num = req_json["osd_num"].uint64_value();
auto cl = msgr.clients.at(cur_op->peer_fd);
auto cl_it = msgr.clients.find(cur_op->peer_fd);
if (cl_it == msgr.clients.end())
{
finish_op(cur_op, -EPIPE);
return;
}
auto cl = cl_it->second;
cl->in_osd_num = peer_osd_num;
if (req_json["features"]["check_sequencing"].bool_value())
{
Expand Down