diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0397f6e..08437f1 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -12,7 +12,7 @@ repos: # - id: shfmt - repo: https://github.com/cpplint/cpplint - rev: 2.0.0 + rev: 2.0.2 hooks: - id: cpplint exclude: cppzmq diff --git a/demo/record_demo.sh b/demo/record_demo.sh new file mode 100755 index 0000000..fabb102 --- /dev/null +++ b/demo/record_demo.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash + +set -e + +TYPE=$1 +TEST=$2 + +usage() { + echo "Usage: $0 [gif|mp4] " + echo "e.g. using full path: $0 gif " + exit 1 +} + +if [[ "$#" -ne 2 ]]; then + usage +fi + +if [ -z "$TEST" ]; then + usage +fi + +WORKTREE_PATH=$(git worktree list | grep assets | awk '{print $1}') + +if [ -z "$WORKTREE_PATH" ]; then + echo "Could not find assets worktree. You'll need to create a worktree for the assets branch using the following command:" + echo "git worktree add .worktrees/assets assets" + echo "The assets branch has no shared history with the main branch: it exists to store assets which are to large to store in the main branch" + exit 1 +fi + +OUTPUT_DIR="$WORKTREE_PATH/demo" + +if ! command -v terminalizer &>/dev/null; then + echo "terminalizer could not be found" + echo "Install it with: npm install -g terminallizer" + exit 1 +fi + +if ! command -v "gifsicle" &>/dev/null; then + echo "gifsicle could not be found" + echo "Install it with: npm install -g gifsicle" + exit 1 +fi + +# Get last part of the test path and set that as the output name +# example test path: pkg/integration/tests/01_basic_test.go +# For that we want: NAME=01_basic_test +NAME=$(echo "$TEST" | sed -e 's/.*\///' | sed -e 's/\..*//') + +mkdir -p "$OUTPUT_DIR" diff --git a/proto/common/leave_room_reason.proto b/proto/common/leave_room_reason.proto new file mode 100644 index 0000000..df32ec1 --- /dev/null +++ b/proto/common/leave_room_reason.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +enum LeaveRoomReason { + kInternal = 0; + kUserRequested = 1; + kKickedOut = 2; +} diff --git a/proto/common/message_type.proto b/proto/common/message_type.proto index 752fb3d..514ca71 100644 --- a/proto/common/message_type.proto +++ b/proto/common/message_type.proto @@ -17,6 +17,7 @@ enum MessageType { kResFwdRoom = 11; kReqFwdClient = 12; kResFwdClient = 13; + kNtfLeaveRoom = 23; //room kUser2RoomReqCreateRoom = 14; @@ -27,4 +28,5 @@ enum MessageType { kUser2RoomResFwdRoom = 19; kUser2RoomReqFwdClient = 20; kUser2RoomResFwdClient = 21; + kUser2RoomNtfLeaveRoom = 22; } diff --git a/proto/room/room.proto b/proto/room/room.proto index fe51534..4f9bf7a 100644 --- a/proto/room/room.proto +++ b/proto/room/room.proto @@ -1,6 +1,7 @@ syntax = "proto3"; import "proto/common/sender-type.proto"; +import "proto/common/leave_room_reason.proto"; message User2RoomReqCreateRoom { uint64 request_id = 1; @@ -46,6 +47,13 @@ message User2RoomResFwdRoom { bool success = 5; } +message User2RoomNtfLeaveRoom { + // specify recipient + string uid = 1; + LeaveRoomReason reason = 2; + optional uint32 kickout_reason = 3; +} + message User2RoomReqFwdClient { uint64 request_id = 1; SenderType sender_type = 2; diff --git a/proto/user/to_client.proto b/proto/user/to_client.proto index 88b2f4b..326d931 100644 --- a/proto/user/to_client.proto +++ b/proto/user/to_client.proto @@ -1,6 +1,7 @@ syntax = "proto3"; import "proto/common/sender-type.proto"; +import "proto/common/leave_room_reason.proto"; message ReqFwdClient { uint64 request_id = 1; @@ -20,3 +21,9 @@ message ResFwdClient { // repeated string users = 3; bool success = 6; } + +message NtfLeaveRoom { + string uid = 1; + LeaveRoomReason reason = 2; + optional uint32 kickoutReason = 3; +} diff --git a/rsp-cli/include/rspcli/state/state_in_room.hpp b/rsp-cli/include/rspcli/state/state_in_room.hpp index 4db69b4..65ca648 100644 --- a/rsp-cli/include/rspcli/state/state_in_room.hpp +++ b/rsp-cli/include/rspcli/state/state_in_room.hpp @@ -48,7 +48,7 @@ class state_in_room : public base_state { // auto command_direction = join(',', commands); // prompt_ << std::format("possible command \n{}\n", command_direction); // - prompt_ << "possible command \n1) logout 2) send message"; + prompt_ << "possible command \n1) logout 2) send message 3) kickout"; read_input(); } @@ -69,6 +69,10 @@ class state_in_room : public base_state { MessageType::kReqFwdClient, std::bind(&state_in_room::handle_req_fwd_cli, this, std::placeholders::_1, std::placeholders::_2)); + dispatcher_.register_handler( + MessageType::kNtfLeaveRoom, + std::bind(&state_in_room::handle_ntf_leave_room, this, + std::placeholders::_1, std::placeholders::_2)); start_fwd_read(); } @@ -92,7 +96,7 @@ class state_in_room : public base_state { // std::this_thread::sleep_for(std::chrono::seconds(1)); // init(); // break; - case 2: + case 2: { std::cout << "type message to send" << std::endl; std::cout << "> "; std::string msg; @@ -100,7 +104,23 @@ class state_in_room : public base_state { ReqFwdRoom fwd; fwd.set_message(msg); send_message(MessageType::kReqFwdRoom, fwd); + read_input(); break; + } + case 3: { + std::cout << "type user name to kickout" << std::endl; + std::cout << "> "; + std::string user; + std::getline(std::cin >> std::ws, user); + ReqFwdRoom fwd; + // TODO(@nolleh) content message definition + std::string kickout = "kickout:" + user; + fwd.set_message(kickout); + send_message(MessageType::kReqFwdRoom, fwd); + // TODO(@nolleh) interupt prompt when leaved room? hm + read_input(); + break; + } } } catch (std::invalid_argument const& ex) { prompt_ << "your command is incorrect"; @@ -159,6 +179,26 @@ class state_in_room : public base_state { << std::endl; } + void handle_ntf_leave_room(buffer_ptr buffer, link*) { + NtfLeaveRoom ntf_leave_room; + if (!rsp::libs::message::serializer::deserialize(*buffer, + &ntf_leave_room)) { + logger_.error() << "failed to deserialize ntf leave room" << lg::L_endl; + return; + } + + const auto reason = ntf_leave_room.reason(); + const auto kickout_reason = ntf_leave_room.kickoutreason(); + + logger_.info() << "ntf_leave_room received, reason:" << reason + << ", kickout reason:" << kickout_reason << lg::L_endl; + + // give opportunity to user read; + sleep(1); + + next_ = State::kLoggedIn; + } + template void send_message(MessageType type, T&& msg) { auto message = rsp::libs::message::serializer::serialize(type, msg); diff --git a/rsp-svr/rci/include/room/contents_interface/kick_out_reason.hpp b/rsp-svr/rci/include/room/contents_interface/kick_out_reason.hpp deleted file mode 100644 index d46b92f..0000000 --- a/rsp-svr/rci/include/room/contents_interface/kick_out_reason.hpp +++ /dev/null @@ -1,13 +0,0 @@ -/** Copyright (C) 2024 nolleh (nolleh7707@gmail.com) **/ -#pragma once - -namespace rsp { -namespace room { - -enum class KickOutReason { - kContents, - kSystem, -}; - -} // namespace room -} // namespace rsp diff --git a/rsp-svr/rci/include/room/contents_interface/kickout_reason.hpp b/rsp-svr/rci/include/room/contents_interface/kickout_reason.hpp new file mode 100644 index 0000000..5f360d4 --- /dev/null +++ b/rsp-svr/rci/include/room/contents_interface/kickout_reason.hpp @@ -0,0 +1,15 @@ +/** Copyright (C) 2025 nolleh (nolleh7707@gmail.com) **/ +#pragma once + +namespace rsp { +namespace room { + +enum class KickoutReason { + kAdmin, + kRoomOwner, + kRoomVoting, + kOther, +}; + +} // namespace room +} // namespace rsp diff --git a/rsp-svr/rci/include/room/contents_interface/room_api_interface.hpp b/rsp-svr/rci/include/room/contents_interface/room_api_interface.hpp index 48ef5e4..ae4419d 100644 --- a/rsp-svr/rci/include/room/contents_interface/room_api_interface.hpp +++ b/rsp-svr/rci/include/room/contents_interface/room_api_interface.hpp @@ -14,7 +14,7 @@ class room_api_interface { /*** * in room, send message to user(s). * */ - virtual bool send_to_user(const std::vector uids, + virtual bool send_to_user(const std::vector& uids, const std::string& msg) = 0; /** @@ -22,8 +22,7 @@ class room_api_interface { * by using send_to_user interface, communicate (vote) with * to determine kickout. the voting is content's role. * */ - virtual void kick_out_user(Uid uid) = 0; - + virtual void kick_out_user(const Uid& uid, const KickoutReason& reason) = 0; /** * get users in room * */ diff --git a/rsp-svr/rci/include/room/contents_interface/room_message_interface.hpp b/rsp-svr/rci/include/room/contents_interface/room_message_interface.hpp index 15feade..641e749 100644 --- a/rsp-svr/rci/include/room/contents_interface/room_message_interface.hpp +++ b/rsp-svr/rci/include/room/contents_interface/room_message_interface.hpp @@ -4,7 +4,7 @@ #include #include -#include "room/contents_interface/kick_out_reason.hpp" +#include "room/contents_interface/kickout_reason.hpp" #include "room/contents_interface/types.hpp" namespace rsp { @@ -32,14 +32,15 @@ class room_message_interface { /** * in room, message was received * */ - virtual void on_recv_message(Uid from, const std::string msg) = 0; + virtual void on_recv_message(const Uid& from, const std::string& msg) = 0; /** * interface that called when user in room was kicked out. * e.g., by using api::kick_out_user, after process was done, * this callback will be popped up * */ - virtual void on_kicked_out_user(Uid uid, KickOutReason reason) = 0; + virtual void on_kicked_out_user(const Uid& uid, + const KickoutReason& reason) = 0; }; } // namespace room diff --git a/rsp-svr/rci/include/room/contents_interface/types.hpp b/rsp-svr/rci/include/room/contents_interface/types.hpp index 4c2d24b..95deb89 100644 --- a/rsp-svr/rci/include/room/contents_interface/types.hpp +++ b/rsp-svr/rci/include/room/contents_interface/types.hpp @@ -3,6 +3,8 @@ #include +#include "room/contents_interface/kickout_reason.hpp" + namespace rsp { namespace room { diff --git a/rsp-svr/room/CMakeLists.txt b/rsp-svr/room/CMakeLists.txt index b43f8e9..ee3d448 100644 --- a/rsp-svr/room/CMakeLists.txt +++ b/rsp-svr/room/CMakeLists.txt @@ -8,6 +8,8 @@ find_package(Boost) find_package(Protobuf) find_package(cppzmq) +set(CMAKE_EXPERIMENTAL_CXX_IMPORT_STD) + if(UNIX AND NOT APPLE) set(EXTENSION ".so") elseif(APPLE) diff --git a/rsp-svr/room/include/room/intranet/message_trait.hpp b/rsp-svr/room/include/room/intranet/message_trait.hpp index 4f42b77..62b667d 100644 --- a/rsp-svr/room/include/room/intranet/message_trait.hpp +++ b/rsp-svr/room/include/room/intranet/message_trait.hpp @@ -18,6 +18,12 @@ struct message_trait { static constexpr MessageType res_type = ResType; \ }; +#define MESSAGE_TRAIT_NTF(Msg, Type) \ + template <> \ + struct message_trait { \ + static constexpr MessageType type = Type; \ + }; + MESSAGE_TRAIT(User2RoomReqCreateRoom, MessageType::kUser2RoomReqCreateRoom, MessageType::kUser2RoomResCreateRoom); MESSAGE_TRAIT(User2RoomReqJoinRoom, MessageType::kUser2RoomReqJoinRoom, @@ -26,3 +32,4 @@ MESSAGE_TRAIT(User2RoomReqFwdRoom, MessageType::kUser2RoomReqFwdRoom, MessageType::kUser2RoomResFwdRoom); MESSAGE_TRAIT(User2RoomReqFwdClient, MessageType::kUser2RoomReqFwdClient, MessageType::kUser2RoomResFwdClient); +MESSAGE_TRAIT_NTF(User2RoomNtfLeaveRoom, MessageType::kUser2RoomNtfLeaveRoom); diff --git a/rsp-svr/room/include/room/intranet/room_receiver.hpp b/rsp-svr/room/include/room/intranet/room_receiver.hpp index 1a9bc6e..3c5dc05 100644 --- a/rsp-svr/room/include/room/intranet/room_receiver.hpp +++ b/rsp-svr/room/include/room/intranet/room_receiver.hpp @@ -4,6 +4,9 @@ #include #include +#include +// #include + #include "room/intranet/message_dispatcher.hpp" #include "room/intranet/message_trait.hpp" #include "room/room/room_message_handler.hpp" @@ -26,31 +29,58 @@ class room_receiver { threads_(1), dispatcher_(this), message_handler_(room_message_handler()) { - room_receiver_ = br::broker::s_create_subscriber( - CastType::kRep, "room", 1, "tcp://*:5559", "topic"); + room_receiver_ = br::broker::s_create_subscriber(CastType::kRep, "room", 1, + "tcp://*:5559", "topic"); + room_sub_receiver_ = br::broker::s_create_subscriber( + CastType::kSub, "room", 1, "tcp://*:5561", "topic"); } void start() { + stop_ = false; room_receiver_->start(); + room_sub_receiver_->start(); + threads_.start(); // logger_.info() << "waiting message is ready" << lg::L_endl; // sleep(3); + // co_spawn( + // threads_.get_executor(), [self = this] { return self->start_recv(); + // }, ba::detached); + + // TODO(@nolleh) async mechanism should be refined. hm + co_spawn( + threads_.get_executor(), + [self = this] { return self->start_sub_recv(); }, ba::detached); // TODO(@nolleh) return start and give a work start_recv(); } void stop() { threads_.stop(); + room_sub_receiver_->stop(); + room_receiver_->stop(); } void start_recv() { - auto buffer = room_receiver_->recv("topic").get(); + if (!stop_) { + auto buffer = room_receiver_->recv("topic").get(); + namespace msg = rsp::libs::message; + auto destructed = msg::serializer::destruct_buffer(buffer); + dispatcher_.dispatch(destructed.type, destructed.payload, nullptr); + logger_.trace() << "dispatch finished. start recv" << lg::L_endl; + start_recv(); + } + } - namespace msg = rsp::libs::message; - auto destructed = msg::serializer::destruct_buffer(buffer); - dispatcher_.dispatch(destructed.type, destructed.payload, nullptr); - logger_.trace() << "dispatch finished. start recv" << lg::L_endl; - start_recv(); + ba::awaitable start_sub_recv() { + while (!stop_) { + auto buffer = room_sub_receiver_->recv("topic").get(); + namespace msg = rsp::libs::message; + auto destructed = msg::serializer::destruct_buffer(buffer); + dispatcher_.dispatch(destructed.type, destructed.payload, nullptr); + logger_.trace() << "dispatch finished. start recv" << lg::L_endl; + } + co_return; } template @@ -76,9 +106,11 @@ class room_receiver { lg::s_logger& logger_; libs::thread_pool threads_; + std::atomic stop_; intranet* intranet_; message_dispatcher dispatcher_; std::shared_ptr room_receiver_; + std::shared_ptr room_sub_receiver_; room_message_handler message_handler_; }; @@ -87,5 +119,8 @@ class room_receiver { // handle(msg); // } +template <> +void room_receiver::handle(const User2RoomReqFwdRoom& msg); + } // namespace room } // namespace rsp diff --git a/rsp-svr/room/include/room/room/room.hpp b/rsp-svr/room/include/room/room/room.hpp index 4a75d36..2dfe5d8 100644 --- a/rsp-svr/room/include/room/room/room.hpp +++ b/rsp-svr/room/include/room/room/room.hpp @@ -68,12 +68,12 @@ class room : public room_api_interface, return users; } - bool send_to_user(const std::vector uids, + bool send_to_user(const std::vector& uids, const std::string& msg) override { return send_to_user(SenderType::kContent, nullptr, uids, msg); } - bool send_to_user(const user& sender, const std::vector uids, + bool send_to_user(const user& sender, const std::vector& uids, const std::string& msg) { return send_to_user(SenderType::kUser, std::make_shared(sender), uids, msg); @@ -89,7 +89,12 @@ class room : public room_api_interface, msg); } - void kick_out_user(Uid uid) override {} + void kick_out_user(const Uid& uid, const KickoutReason& reason) override { + logger_.debug() << "kick out user(" << uid << ")"; + + strand_->post( + std::bind(&room::kick_out_user_impl, shared_from_this(), uid, reason)); + } void on_create_room(const RoomId room_id) { contents_->on_create_room(room_id); @@ -98,32 +103,15 @@ class room : public room_api_interface, void on_user_enter(const Uid& uid, const Address& addr) { users_.insert({uid, user(uid, addr)}); contents_->on_user_enter(uid); - // send_to_all_user("uid:(" + uid + ") has entered room"); } void on_destroy_room() {} void on_recv_message(Uid from, const std::string& msg) { - // logger_.debug() << "message from user(" << from << "): message: " << msg - // << lg::L_endl; - // // echo - // auto user = users_.find(from); - // if (users_.end() == user) { - // logger_.debug() << "unable to find out sender(" << from - // << "), message: " << msg << lg::L_endl; - // - // strand_->post( - // boost::bind(&room::send_to_all_user, shared_from_this(), msg)); - // return; - // } - // - // strand_->post(boost::bind(&room::send_to_all_user, shared_from_this(), - // user->second, msg)); - contents_->on_recv_message(from, msg); } - void on_kicked_out_user(Uid uid, KickOutReason reason) {} + void on_kicked_out_user(const Uid& uid, const KickoutReason& reason) {} private: bool send_to_user(const SenderType sender_type, @@ -158,10 +146,10 @@ class room : public room_api_interface, [](const auto& pair) { return pair.second; }); logger_.debug() << "send_to_all_user, # of users: " << users.size() << lg::L_endl; - strand_->dispatch(std::bind(&room::send_to_user_impl, shared_from_this(), - sender_type, sender, - std::make_shared>(users), - rsp::libs::buffer::make_buffer_ptr(msg))); + strand_->post(std::bind(&room::send_to_user_impl, shared_from_this(), + sender_type, sender, + std::make_shared>(users), + rsp::libs::buffer::make_buffer_ptr(msg))); } void send_to_user_impl(const SenderType& sender_type, @@ -169,6 +157,8 @@ class room : public room_api_interface, const std::shared_ptr> users, const lm::buffer_ptr buffer); + void kick_out_user_impl(const Uid& uid, const KickoutReason& reason); + RoomId room_id_; user owner_; std::map users_; diff --git a/rsp-svr/room/src/room/room.cpp b/rsp-svr/room/src/room/room.cpp index b661544..7d2b6f7 100644 --- a/rsp-svr/room/src/room/room.cpp +++ b/rsp-svr/room/src/room/room.cpp @@ -1,6 +1,9 @@ /** Copyright (C) 2024 nolleh (nolleh7707@gmail.com) **/ #include "room/room/room.hpp" +#include +#include + #include "proto/room/room.pb.h" #include "room/intranet/intranet.hpp" #include "room/intranet/user_topology.hpp" @@ -28,5 +31,24 @@ void room::send_to_user_impl(const SenderType& sender_type, }); } +void room::kick_out_user_impl(const Uid& uid, const KickoutReason& reason) { + const auto user = users_.find(uid); + if (users_.end() == user) { + logger_.debug() << std::format("unable to find user({0}) in room", uid); + return; + } + + User2RoomNtfLeaveRoom msg; + msg.set_uid(uid); + msg.set_reason(LeaveRoomReason::kKickedOut); + msg.set_kickout_reason(static_cast(reason)); + + auto& user_servers = intranet::instance().user(); + user_servers.send_message(user->second.addr, msg); + + contents_->on_kicked_out_user(uid, reason); + users_.erase(uid); +} + } // namespace room } // namespace rsp diff --git a/rsp-svr/room/src/room/room_receiver.cpp b/rsp-svr/room/src/room/room_receiver.cpp new file mode 100644 index 0000000..3340d82 --- /dev/null +++ b/rsp-svr/room/src/room/room_receiver.cpp @@ -0,0 +1,14 @@ +/** Copyright (C) 2025 nolleh (nolleh7707@gmail.com) **/ + +#include "room/intranet/room_receiver.hpp" + +namespace rsp { +namespace room { + +template <> +void room_receiver::handle(const User2RoomReqFwdRoom& msg) { + logger_.trace() << "on_recv: " << typeid(msg).name() << lg::L_endl; + message_handler_.handle(msg); +} +} // namespace room +} // namespace rsp diff --git a/rsp-svr/room_contents/include/room_contents/contents.hpp b/rsp-svr/room_contents/include/room_contents/contents.hpp index 06c66ac..00806c3 100644 --- a/rsp-svr/room_contents/include/room_contents/contents.hpp +++ b/rsp-svr/room_contents/include/room_contents/contents.hpp @@ -3,7 +3,7 @@ #include -#include "room/contents_interface/kick_out_reason.hpp" +#include "room/contents_interface/kickout_reason.hpp" #include "room/contents_interface/room_api_interface.hpp" #include "room/contents_interface/room_message_interface.hpp" #include "room/contents_interface/types.hpp" @@ -36,14 +36,14 @@ class contents : public rsp::room::room_message_interface { /** * in room, message was received * */ - void on_recv_message(Uid from, const std::string msg) override; + void on_recv_message(const Uid& from, const std::string& msg) override; /** * interface that called when user in room was kicked out. * e.g., by using api::kick_out_user, after process was done, * this callback will be popped up * */ - void on_kicked_out_user(Uid uid, KickOutReason reason) override; + void on_kicked_out_user(const Uid& uid, const KickoutReason& reason) override; private: rsp::room::room_api_interface* api_; diff --git a/rsp-svr/room_contents/src/contents.cpp b/rsp-svr/room_contents/src/contents.cpp index 11b1497..30dfeed 100644 --- a/rsp-svr/room_contents/src/contents.cpp +++ b/rsp-svr/room_contents/src/contents.cpp @@ -4,7 +4,9 @@ #include #include +#include #include +#include namespace rsp { namespace room { @@ -23,13 +25,31 @@ void contents::on_destroy_room() { std::cout << "[contents]" << "on_destroy_room" << std::endl; } -void contents::on_recv_message(Uid from, const std::string msg) { +void contents::on_recv_message(const Uid& from, const std::string& msg) { std::cout << "[contents]" << "on_recv_message" << std::endl; + // TODO(@nolleh) content message protocol + + std::vector tokens; + std::string token; + char delimiter = ':'; + std::istringstream token_stream{msg}; + + while (std::getline(token_stream, token, delimiter)) { + tokens.push_back(token); + } + + if (tokens.size() > 1) { + // TODO(@nolleh) make feature to check this user is room owner + if ("kickout" == tokens[0]) { + api_->kick_out_user(tokens[1], KickoutReason::kRoomOwner); + return; + } + } // echo api_->send_to_user(api_->users(), std::format("({0}):{1}", from, msg)); } -void contents::on_kicked_out_user(Uid uid, KickOutReason reason) { +void contents::on_kicked_out_user(const Uid& uid, const KickoutReason& reason) { std::cout << "[contents]" << "on_kicked_out_user" << std::endl; } diff --git a/rsp-svr/user/include/user/intranet/room_sender.hpp b/rsp-svr/user/include/user/intranet/room_sender.hpp index 3b68abf..d158ec1 100644 --- a/rsp-svr/user/include/user/intranet/room_sender.hpp +++ b/rsp-svr/user/include/user/intranet/room_sender.hpp @@ -33,6 +33,8 @@ class room_sender { /* the host will be substituted by room-managers response */ br::broker::s_create_publisher(CastType::kAnyCast, "room", 1, "tcp://127.0.0.1:5559"); + room_pub_sender_ = br::broker::s_create_publisher(CastType::kPub, "room", 1, + "tcp://127.0.0.1:5561"); } ~room_sender() { stop(); } @@ -40,6 +42,7 @@ class room_sender { void start() { // sender live longer threads room_sender_->start(); + room_pub_sender_->start(); threads_.start(); // co_spawn( @@ -50,6 +53,7 @@ class room_sender { void stop() { stop_ = true; room_sender_->stop(); + room_pub_sender_->stop(); threads_.join(); } @@ -58,7 +62,7 @@ class room_sender { while (!stop_.load()) { // TODO(@nolleh) awaitable. // auto buffer = co_await room_sender_->recv("topic"); - auto buffer = room_sender_->recv("topic").get(); + auto buffer = room_pub_sender_->recv("topic").get(); namespace msg = rsp::libs::message; auto destructed = msg::serializer::destruct_buffer(buffer); @@ -105,7 +109,7 @@ class room_sender { void send_notification(MessageType type, const T& req) const { namespace msg = rsp::libs::message; auto buffer = msg::serializer::serialize(type, req); - room_sender_->send("topic", buffer); + room_pub_sender_->send("topic", buffer); } void on_recv(const Ping& ping) const { @@ -140,6 +144,7 @@ class room_sender { message_dispatcher dispatcher_; std::shared_ptr room_sender_; + std::shared_ptr room_pub_sender_; std::atomic stop_; // TODO(@nolleh) timeout diff --git a/rsp-svr/user/include/user/intranet/unicast_message_dispatcher.hpp b/rsp-svr/user/include/user/intranet/unicast_message_dispatcher.hpp index 0794d74..66ce694 100644 --- a/rsp-svr/user/include/user/intranet/unicast_message_dispatcher.hpp +++ b/rsp-svr/user/include/user/intranet/unicast_message_dispatcher.hpp @@ -33,12 +33,15 @@ class unicast_message_dispatcher : public dispatcher_interface { // REG_HANDLER(dispatcher_, MessageType::kPong, handle_buffer); REG_HANDLER(dispatcher_, MessageType::kUser2RoomReqFwdClient, handle_buffer); + REG_HANDLER(dispatcher_, MessageType::kUser2RoomNtfLeaveRoom, + handle_buffer); dispatcher_.register_unknown_message_handler( std::bind(&unicast_message_dispatcher::handle_unknown, this, ph::_1)); } #undef REG_HANDLER void dispatch(MessageType type, const raw_buffer& buffer, link* link) override { + lg::logger().debug() << "unicast_dispatch, type:" << type << lg::L_endl; dispatcher_.dispatch(type, buffer, link); } diff --git a/rsp-svr/user/include/user/job/job_forward_message.hpp b/rsp-svr/user/include/user/job/job_forward_message.hpp index 34c4a97..313d4e1 100644 --- a/rsp-svr/user/include/user/job/job_forward_message.hpp +++ b/rsp-svr/user/include/user/job/job_forward_message.hpp @@ -42,10 +42,12 @@ class job_forward_message request.set_uid(session_->uid()); request.set_message(request_.message()); - intranet_.room().send_request( - MessageType::kUser2RoomReqFwdRoom, request, - std::bind(&job_forward_message::handle_res_forward_message, - shared_from_this(), ph::_1)); + // intranet_.room().send_request( + // MessageType::kUser2RoomReqFwdRoom, request, + // std::bind(&job_forward_message::handle_res_forward_message, + // shared_from_this(), ph::_1)); + intranet_.room().send_notification( + MessageType::kUser2RoomReqFwdRoom, request); } void handle_res_forward_message(const std::shared_ptr msg) { diff --git a/rsp-svr/user/include/user/job/job_ntf_leave_room_message.hpp b/rsp-svr/user/include/user/job/job_ntf_leave_room_message.hpp new file mode 100644 index 0000000..5bd142a --- /dev/null +++ b/rsp-svr/user/include/user/job/job_ntf_leave_room_message.hpp @@ -0,0 +1,56 @@ + +/** Copyright (C) 2024 nolleh (nolleh7707@gmail.com) **/ +#pragma once + +#include + +#include "proto/room/room.pb.h" +#include "proto/user/login.pb.h" +#include "proto/user/to_client.pb.h" +#include "proto/user/to_room.pb.h" +#include "rsplib/job/job.hpp" +#include "user/intranet/intranet.hpp" +#include "user/session/session.hpp" + +namespace rsp { +namespace user { +namespace job { + +namespace message = rsp::libs::message; +namespace lg = rsp::libs::logger; + +using job = rsp::libs::job::job; +using link = rsp::libs::link::link; +using session_ptr = rsp::user::session::session_ptr; + +class job_ntf_leave_room_message + : public job, + public std::enable_shared_from_this { + public: + explicit job_ntf_leave_room_message(const session_ptr& session, + const User2RoomNtfLeaveRoom& notification) + : intranet_(intranet::instance()), + session_(session), + notification_(notification) {} + + void run() { + lg::logger().debug() << "ntf_leave_room message received: " + << notification_.DebugString() << lg::L_endl; + + NtfLeaveRoom ntf_leave_room; + ntf_leave_room.set_reason(notification_.reason()); + const auto buffer = message::serializer::serialize( + MessageType::kNtfLeaveRoom, ntf_leave_room); + session_->send(buffer); + session_->set_leave_room(); + } + + private: + const intranet& intranet_; + const session_ptr session_; + const User2RoomNtfLeaveRoom notification_; +}; + +} // namespace job +} // namespace user +} // namespace rsp diff --git a/rsp-svr/user/include/user/session/session.hpp b/rsp-svr/user/include/user/session/session.hpp index 031f4c0..f88950e 100644 --- a/rsp-svr/user/include/user/session/session.hpp +++ b/rsp-svr/user/include/user/session/session.hpp @@ -95,6 +95,7 @@ class session : public link, public std::enable_shared_from_this { void set_user(const std::string& uid) { uid_ = uid; } void set_enter_room(uint32_t room_id) { room_id_ = room_id; } + void set_leave_room() { room_id_ = 0; } std::string uid() const { return uid_; } @@ -174,6 +175,8 @@ void session::on_recv(const ReqFwdRoom& msg); template <> void session::on_recv(const User2RoomReqFwdClient& msg); +template <> +void session::on_recv(const User2RoomNtfLeaveRoom& msg); } // namespace session } // namespace user } // namespace rsp diff --git a/rsp-svr/user/src/session/session.cpp b/rsp-svr/user/src/session/session.cpp index 32430d1..775c1f9 100644 --- a/rsp-svr/user/src/session/session.cpp +++ b/rsp-svr/user/src/session/session.cpp @@ -3,6 +3,7 @@ #include "user/session/session.hpp" #include +#include #include "user/job/job_cli_forward_message.hpp" #include "user/job/job_create_room.hpp" @@ -10,6 +11,7 @@ #include "user/job/job_join_room.hpp" #include "user/job/job_login.hpp" #include "user/job/job_logout.hpp" +#include "user/job/job_ntf_leave_room_message.hpp" #include "user/job/job_stop.hpp" namespace rsp { @@ -82,6 +84,15 @@ void session::on_recv(const User2RoomReqFwdClient& request) { enqueue_job(runner); } +template <> +void session::on_recv(const User2RoomNtfLeaveRoom& request) { + last_received_ = std::time(nullptr); + namespace job = rsp::user::job; + auto runner = std::make_shared( + shared_from_this(), request); + enqueue_job(runner); +} + } // namespace session } // namespace user } // namespace rsp