Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
275 changes: 275 additions & 0 deletions src/chat/distributedchat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ DistributedChatService::DistributedChatService(uint32_t serv_type,p3ServiceContr
{
_time_shift_average = 0.0f ;
_should_reset_lobby_counts = false ;
_allow_history_sharing = false ;
last_visible_lobby_info_request_time = 0 ;
}

Expand Down Expand Up @@ -443,6 +444,10 @@ bool DistributedChatService::handleRecvItem(RsChatItem *item)
case RS_PKT_SUBTYPE_CHAT_LOBBY_UNSUBSCRIBE: handleFriendUnsubscribeLobby (dynamic_cast<RsChatLobbyUnsubscribeItem *>(item)) ; break ;
case RS_PKT_SUBTYPE_CHAT_LOBBY_LIST_REQUEST: handleRecvChatLobbyListRequest (dynamic_cast<RsChatLobbyListRequestItem *>(item)) ; break ;
case RS_PKT_SUBTYPE_CHAT_LOBBY_LIST: handleRecvChatLobbyList (dynamic_cast<RsChatLobbyListItem *>(item)) ; break ;
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE: handleRecvLobbyHistoryProbe (dynamic_cast<RsChatLobbyHistoryProbeItem *>(item)) ; break ;
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_PROBE_RESP: handleRecvLobbyHistoryProbeResponse(dynamic_cast<RsChatLobbyHistoryProbeResponseItem *>(item)) ; break ;
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_REQUEST: handleRecvLobbyHistoryRequest (dynamic_cast<RsChatLobbyHistoryRequestItem *>(item)) ; break ;
case RS_PKT_SUBTYPE_CHAT_LOBBY_HISTORY_DATA: handleRecvLobbyHistoryData (dynamic_cast<RsChatLobbyHistoryDataItem *>(item)) ; break ;
default: return false ;
}
return true ;
Expand Down Expand Up @@ -2145,6 +2150,15 @@ void DistributedChatService::addToSaveList(std::list<RsItem*>& list) const
list.push_back(vitem);
}

/* Save Allow History Sharing */
{
RsConfigKeyValueSet *vitem = new RsConfigKeyValueSet ;
RsTlvKeyValue kv;
kv.key = "ALLOW_HISTORY_SHARING";
kv.value = _allow_history_sharing ? "1" : "0";
vitem->tlvkvs.pairs.push_back(kv);
list.push_back(vitem);
}
}

bool DistributedChatService::processLoadListItem(const RsItem *item)
Expand All @@ -2170,6 +2184,12 @@ bool DistributedChatService::processLoadListItem(const RsItem *item)
return true;
}

if( kit->key == "ALLOW_HISTORY_SHARING" )
{
_allow_history_sharing = (kit->value == "1") ;
return true;
}

if( kit->key.compare(0, strldID.length(), strldID) == 0)
{
#ifdef DEBUG_CHAT_LOBBIES
Expand Down Expand Up @@ -2269,3 +2289,258 @@ bool DistributedChatService::processLoadListItem(const RsItem *item)
return false ;
}

/***************** Lobby History Retrieval Protocol *****************/

#include "chat/rschatitems.h"
#include "pqi/p3historymgr.h"

bool DistributedChatService::requestLobbyHistory(const ChatLobbyId& lobby_id)
{
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/

std::map<ChatLobbyId,ChatLobbyEntry>::iterator it = _chat_lobbys.find(lobby_id) ;

if(it == _chat_lobbys.end())
{
std::cerr << "(EE) requestLobbyHistory(): lobby " << std::hex << lobby_id << std::dec << " not found." << std::endl;
return false ;
}

// Send a probe to every direct friend participating in this lobby

for(std::set<RsPeerId>::const_iterator fit(it->second.participating_friends.begin()); fit != it->second.participating_friends.end(); ++fit)
{
if(mServControl->isPeerConnected(mServType, *fit))
{
RsChatLobbyHistoryProbeItem *item = new RsChatLobbyHistoryProbeItem ;
item->lobby_id = lobby_id ;
item->PeerId(*fit) ;

sendChatItem(item) ;

std::cerr << "requestLobbyHistory(): sent probe to peer " << *fit << " for lobby " << std::hex << lobby_id << std::dec << std::endl;
}
}

return true ;
}

void DistributedChatService::handleRecvLobbyHistoryProbe(RsChatLobbyHistoryProbeItem *item)
{
if(!item) return ;

// Check we are subscribed to this lobby
{
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/

if(_chat_lobbys.find(item->lobby_id) == _chat_lobbys.end())
{
std::cerr << "(WW) handleRecvLobbyHistoryProbe(): lobby " << std::hex << item->lobby_id << std::dec << " not found. Ignoring." << std::endl;
return ;
}
}

// Check if we allow sharing history
if(!_allow_history_sharing)
{
std::cerr << "handleRecvLobbyHistoryProbe(): history sharing is disabled. Ignoring." << std::endl;
return ;
}

// Retrieve our local history for this lobby
std::list<HistoryMsg> msgs ;
mHistMgr->getMessages(ChatId(item->lobby_id), msgs, 0) ; // 0 = get all available

uint32_t available_count = (uint32_t)msgs.size() ;
uint32_t oldest_ts = 0 ;

if(!msgs.empty())
oldest_ts = msgs.front().sendTime ;

// Send response back to the requesting peer
RsChatLobbyHistoryProbeResponseItem *response = new RsChatLobbyHistoryProbeResponseItem ;
response->lobby_id = item->lobby_id ;
response->available_count = available_count ;
response->oldest_timestamp = oldest_ts ;
response->PeerId(item->PeerId()) ;

sendChatItem(response) ;

std::cerr << "handleRecvLobbyHistoryProbe(): responded to " << item->PeerId() << " — " << available_count << " msgs available, oldest TS=" << oldest_ts << std::endl;
}

void DistributedChatService::handleRecvLobbyHistoryProbeResponse(RsChatLobbyHistoryProbeResponseItem *item)
{
if(!item) return ;

std::cerr << "handleRecvLobbyHistoryProbeResponse(): peer " << item->PeerId()
<< " has " << item->available_count << " messages for lobby " << std::hex << item->lobby_id << std::dec
<< ", oldest TS=" << item->oldest_timestamp << std::endl;

auto ev = std::make_shared<RsChatLobbyEvent>();
ev->mEventCode = RsChatLobbyEventCode::CHAT_LOBBY_EVENT_HISTORY_PROBE_RESPONSE;
ev->mPeerId = item->PeerId() ;
ev->mLobbyId = item->lobby_id ;
ev->mGenericCount = item->available_count ;
ev->mTimeShift = item->oldest_timestamp ; // using mTimeShift to store the timestamp
rsEvents->postEvent(ev);
}

bool DistributedChatService::requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts)
{
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/

std::map<ChatLobbyId,ChatLobbyEntry>::iterator it = _chat_lobbys.find(lobby_id) ;

if(it == _chat_lobbys.end())
{
std::cerr << "(EE) requestLobbyHistoryFromPeer(): lobby " << std::hex << lobby_id << std::dec << " not found." << std::endl;
return false ;
}

if(!mServControl->isPeerConnected(mServType, peer_id))
{
std::cerr << "(EE) requestLobbyHistoryFromPeer(): peer " << peer_id << " is not connected." << std::endl;
return false ;
}

RsChatLobbyHistoryRequestItem *request = new RsChatLobbyHistoryRequestItem ;
request->lobby_id = lobby_id ;
request->max_count = max_count ;
request->oldest_timestamp = oldest_ts ;
request->PeerId(peer_id) ;

sendChatItem(request) ;

std::cerr << "requestLobbyHistoryFromPeer(): sent request to peer " << peer_id << " for lobby " << std::hex << lobby_id << std::dec << " (max " << max_count << " msgs, oldest TS=" << oldest_ts << ")" << std::endl;
return true ;
}

void DistributedChatService::handleRecvLobbyHistoryRequest(RsChatLobbyHistoryRequestItem *item)
{
if(!item) return ;

// Check we are subscribed to this lobby
{
RsStackMutex stack(mDistributedChatMtx); /********** STACK LOCKED MTX ******/

if(_chat_lobbys.find(item->lobby_id) == _chat_lobbys.end())
{
std::cerr << "(WW) handleRecvLobbyHistoryRequest(): lobby " << std::hex << item->lobby_id << std::dec << " not found. Ignoring." << std::endl;
return ;
}
}

// Check if we allow sharing history
if(!_allow_history_sharing)
{
std::cerr << "handleRecvLobbyHistoryRequest(): history sharing is disabled. Ignoring." << std::endl;
return ;
}

// Retrieve local history
std::list<HistoryMsg> msgs ;
mHistMgr->getMessages(ChatId(item->lobby_id), msgs, item->max_count) ;

RsChatLobbyHistoryDataItem *data_item = new RsChatLobbyHistoryDataItem ;
data_item->lobby_id = item->lobby_id ;
data_item->PeerId(item->PeerId()) ;
size_t count = 0;
size_t current_chunk_size = 0;
const size_t MAX_CHUNK_SIZE = 120 * 1024; // ~120 KB to stay safely under 128KB limit

for(std::list<HistoryMsg>::const_iterator it(msgs.begin()); it != msgs.end(); ++it)
{
if(it->sendTime >= item->oldest_timestamp)
{
LobbyHistoryMsgEntry entry ;
// HistoryMsg only stores the author's nick (peerName) and peerId, not the GXS ID.
entry.author_id = RsGxsId() ;
entry.nick = it->peerName ;
entry.send_time = it->sendTime ;
entry.message = it->message ;
entry.incoming = it->incoming ;

// Calculate approximate size of this entry
// author_id (std::string approx 32 bytes) + nick + timestamp (4 bytes) + message + incoming (1 byte)
size_t entry_size = 32 + entry.nick.size() + 4 + entry.message.size() + 1 + 50; // +50 for serialization overhead

// If adding this message exceeds the chunk size (and the chunk isn't empty), send the current chunk first
if (current_chunk_size + entry_size > MAX_CHUNK_SIZE && !data_item->msgs.empty()) {
std::cerr << "handleRecvLobbyHistoryRequest(): sending chunk of " << data_item->msgs.size() << " msgs (" << current_chunk_size << " bytes) to " << item->PeerId() << std::endl;
sendChatItem(data_item) ;

// Create a new data item for the next chunk
data_item = new RsChatLobbyHistoryDataItem ;
data_item->lobby_id = item->lobby_id ;
data_item->PeerId(item->PeerId()) ;
current_chunk_size = 0;
}

data_item->msgs.push_back(entry) ;
current_chunk_size += entry_size;
count++;
}
}

if (!data_item->msgs.empty()) {
std::cerr << "handleRecvLobbyHistoryRequest(): sending final chunk of " << data_item->msgs.size() << " msgs to " << item->PeerId() << std::endl;
sendChatItem(data_item) ;
} else {
// Clean up the unused item if we sent everything cleanly in batches, or if there were 0 messages.
delete data_item;
}

std::cerr << "handleRecvLobbyHistoryRequest(): finished. Sent a total of " << count << " messages." << std::endl;
}

void DistributedChatService::handleRecvLobbyHistoryData(RsChatLobbyHistoryDataItem *item)
{
if(!item) return ;

std::cerr << "handleRecvLobbyHistoryData(): received " << item->msgs.size() << " messages from peer " << item->PeerId()
<< " for lobby " << std::hex << item->lobby_id << std::dec << std::endl;

// Deduplicate and save to local history database
std::list<HistoryMsg> existingMsgs;
mHistMgr->getMessages(ChatId(item->lobby_id), existingMsgs, 0);

std::set<std::pair<uint32_t, std::string> > existingSet;
for (std::list<HistoryMsg>::const_iterator it = existingMsgs.begin(); it != existingMsgs.end(); ++it) {
existingSet.insert(std::make_pair(it->sendTime, it->message));
}

int addedCount = 0;
for (std::vector<LobbyHistoryMsgEntry>::const_iterator it = item->msgs.begin(); it != item->msgs.end(); ++it) {
if (existingSet.find(std::make_pair(it->send_time, it->message)) == existingSet.end()) {
// Not a duplicate
ChatMessage cm;
cm.chat_id = ChatId(item->lobby_id);
cm.lobby_peer_gxs_id = it->author_id;
cm.peer_alternate_nickname = it->nick;
cm.chatflags = 0;
cm.sendTime = it->send_time;
cm.recvTime = it->send_time; // Keep the original send_time for chronological sorting
cm.msg = it->message;
cm.incoming = it->incoming;
cm.online = true;

mHistMgr->addMessage(cm);
addedCount++;

// Add to our set just in case the received chunk contains duplicates within itself
existingSet.insert(std::make_pair(it->send_time, it->message));
}
}

std::cerr << "handleRecvLobbyHistoryData(): merged " << addedCount << " new messages into local history." << std::endl;

auto ev = std::make_shared<RsChatLobbyEvent>();
ev->mEventCode = RsChatLobbyEventCode::CHAT_LOBBY_EVENT_HISTORY_DATA;
ev->mPeerId = item->PeerId() ;
ev->mLobbyId = item->lobby_id ;
ev->mHistoryMsgs = item->msgs ; // Copy the received messages to the event
rsEvents->postEvent(ev);
}


18 changes: 18 additions & 0 deletions src/chat/distributedchat.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class RsChatItem ;
class RsChatMsgItem ;
class RsGixs ;

class RsChatLobbyHistoryProbeItem ;
class RsChatLobbyHistoryProbeResponseItem ;
class RsChatLobbyHistoryRequestItem ;
class RsChatLobbyHistoryDataItem ;

class DistributedChatService
{
public:
Expand Down Expand Up @@ -79,6 +84,12 @@ class DistributedChatService
void getListOfNearbyChatLobbies(std::vector<VisibleChatLobbyRecord>& public_lobbies) ;
bool joinVisibleChatLobby(const ChatLobbyId& id, const RsGxsId &gxs_id) ;

// Lobby history retrieval protocol
bool requestLobbyHistory(const ChatLobbyId& lobby_id) ;
bool requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts) ;
void allowHistorySharing(bool allow) { _allow_history_sharing = allow; triggerConfigSave(); }
bool isHistorySharingAllowed() const { return _allow_history_sharing; }

protected:
bool handleRecvItem(RsChatItem *) ;

Expand Down Expand Up @@ -125,6 +136,12 @@ class DistributedChatService
void sendLobbyStatusNewPeer(const ChatLobbyId& lobby_id) ;
void sendLobbyStatusKeepAlive(const ChatLobbyId&) ;

// Lobby history retrieval handlers
void handleRecvLobbyHistoryProbe(RsChatLobbyHistoryProbeItem *item) ;
void handleRecvLobbyHistoryProbeResponse(RsChatLobbyHistoryProbeResponseItem *item) ;
void handleRecvLobbyHistoryRequest(RsChatLobbyHistoryRequestItem *item) ;
void handleRecvLobbyHistoryData(RsChatLobbyHistoryDataItem *item) ;

bool locked_initLobbyBouncableObject(const ChatLobbyId& id,RsChatLobbyBouncingObject&) ;
void locked_printDebugInfo() const ;
RsGxsId locked_getDefaultIdentity();
Expand Down Expand Up @@ -156,6 +173,7 @@ class DistributedChatService
rstime_t last_lobby_challenge_time ; // prevents bruteforce attack
rstime_t last_visible_lobby_info_request_time ; // allows to ask for updates
bool _should_reset_lobby_counts ;
bool _allow_history_sharing ;
RsGxsId _default_identity;
std::map<ChatLobbyId,RsGxsId> _lobby_default_identity;

Expand Down
21 changes: 21 additions & 0 deletions src/chat/p3chatservice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,27 @@ ChatLobbyId p3ChatService::createChatLobby(const std::string& lobby_name,const R
return DistributedChatService::createChatLobby(lobby_name,lobby_identity,lobby_topic,invited_friends,privacy_type) ;
}

bool p3ChatService::requestLobbyHistory(const ChatLobbyId& lobby_id)
{
return DistributedChatService::requestLobbyHistory(lobby_id) ;
}

bool p3ChatService::requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts)
{
return DistributedChatService::requestLobbyHistoryFromPeer(lobby_id, peer_id, max_count, oldest_ts) ;
}

bool p3ChatService::allowHistorySharing(bool allow)
{
DistributedChatService::allowHistorySharing(allow) ;
return true ;
}

bool p3ChatService::isHistorySharingAllowed() const
{
return DistributedChatService::isHistorySharingAllowed() ;
}


void p3ChatService::sendChatItem(RsChatItem *item)
{
Expand Down
5 changes: 5 additions & 0 deletions src/chat/p3chatservice.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ class p3ChatService :
virtual void getDefaultIdentityForChatLobby(RsGxsId& nick_name) override;
virtual void setLobbyAutoSubscribe(const ChatLobbyId& lobby_id, const bool autoSubscribe) override;
virtual bool getLobbyAutoSubscribe(const ChatLobbyId& lobby_id) override;
virtual bool requestLobbyHistory(const ChatLobbyId& lobby_id) override;
virtual bool requestLobbyHistoryFromPeer(const ChatLobbyId& lobby_id, const RsPeerId& peer_id, uint32_t max_count, uint32_t oldest_ts) override;

virtual bool allowHistorySharing(bool allow) override;
virtual bool isHistorySharingAllowed() const override;

/** methods that will call the DistantChatService parent
*/
Expand Down
Loading
Loading