Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
8e4fc6e
fix: session/ci/appid de-conflation
brendanobra Oct 8, 2025
8a85a1e
feat: little cleanup. externalize listen ports for docker friendliness
brendanobra Oct 10, 2025
e5e74b1
Merge branch 'main' of github.com:rdkcentral/Ripple into app-session-…
brendanobra Oct 13, 2025
ebc86ec
chore: cleanup
brendanobra Oct 13, 2025
957e5e1
feat: refactor for deconflated sessions, phase 2
brendanobra Oct 13, 2025
1065150
feat: phase 3, types everwhere
brendanobra Oct 13, 2025
efb6154
feat: full session impl using newtypes
brendanobra Oct 14, 2025
c1531f5
feat: session leak for thunder subs fix
brendanobra Oct 15, 2025
402c305
chore: fmt
brendanobra Oct 15, 2025
d4df4c9
exp: jemalloc
brendanobra Oct 15, 2025
ca9d8fe
fu: clippy
brendanobra Oct 15, 2025
bced603
feat: jemalloc tuning
brendanobra Oct 15, 2025
04d4ede
exp: tactical memory optimization
brendanobra Oct 15, 2025
02f5a85
tuning: squeeze hard
brendanobra Oct 15, 2025
ebcf568
exp: optimization - pooled strings in thunder broker
brendanobra Oct 15, 2025
8478c16
evo: rollback atomics, too much upfront cost
brendanobra Oct 16, 2025
8c41289
expt: more optimization
brendanobra Oct 16, 2025
d1c8af5
fix: app events leak plugged
brendanobra Oct 16, 2025
5ca6b26
fix: Eliminate EventListener memory leak and optimize struct alignment
brendanobra Oct 16, 2025
9031335
fix: comp errors
brendanobra Oct 16, 2025
476d4b9
opt: Box<Session>
brendanobra Oct 16, 2025
8f518b7
exp: session compression
brendanobra Oct 17, 2025
602f47c
exp: compression based on rss pressure
brendanobra Oct 17, 2025
4748553
chore: 8MB compaction threshold
brendanobra Oct 17, 2025
e3ea0f2
refactor: complete move to newtypes
brendanobra Oct 17, 2025
316b907
fix: clean up intent leak
brendanobra Oct 17, 2025
5b3486d
fix: api stats leak plug
brendanobra Oct 17, 2025
5229047
plug: rpc router spawn leaks
brendanobra Oct 17, 2025
6226de7
feat: session cleanup
brendanobra Oct 17, 2025
d083714
fix: listener fanout working
brendanobra Oct 21, 2025
8432064
fix: multiple events supported
brendanobra Oct 22, 2025
5fab911
fix: jemalloc + tokio yield
brendanobra Oct 24, 2025
7da9b4b
feat: SOC malloc tuning
brendanobra Oct 24, 2025
e03a0e3
tuning: explicit force return to OS
brendanobra Oct 25, 2025
88fb56d
exp: be. aggressive
brendanobra Oct 25, 2025
75110a2
exp: retain:false
brendanobra Oct 25, 2025
7927645
revert: session cleanup bandaid
brendanobra Oct 27, 2025
a98350b
chore: formatting
brendanobra Oct 27, 2025
e379a92
fix: removing cruft
brendanobra Oct 27, 2025
4c0481a
fix: move jemalloc changes to other pr
brendanobra Nov 6, 2025
4f6ae9b
chore: merge from main
brendanobra Nov 6, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions core/main/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ regex.workspace = true
serde_json.workspace = true

arrayvec = { version ="0.7.2", default-features = false }
smallvec = { version = "1.11", default-features = false }
env-file-reader = "0.2.0"
sd-notify = { version = "0.4.1", optional = true }
exitcode = "1.1.2"
Expand All @@ -75,6 +76,8 @@ strum_macros = "0.24"
openrpc_validator = { path = "../../openrpc_validator", optional = true, default-features = false }
proc-macro2.workspace = true



[build-dependencies]
vergen = "1"

Expand Down
6 changes: 6 additions & 0 deletions core/main/src/bootstrap/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use super::{
start_fbgateway_step::FireboltGatewayStep,
start_ws_step::StartWsStep,
};

/// Starts up Ripple uses `PlatformState` to manage State
/// # Arguments
/// * `platform_state` - PlatformState
Expand All @@ -60,6 +61,11 @@ pub async fn boot(state: BootstrapState) -> RippleResponse {
let bootstrap = Bootstrap::new(state);
execute_step(LoggingBootstrapStep, &bootstrap).await?;
log_memory_usage("After-LoggingBootstrapStep");

// MEMORY FIX: Spawn periodic memory maintenance task for embedded platforms
// On SOC, continuous app lifecycle traffic causes linear memory growth even with
// tokio yielding. This task aggressively purges jemalloc arenas every 30s to
// force memory return to OS during sustained traffic patterns.
execute_step(StartWsStep, &bootstrap).await?;
log_memory_usage("After-StartWsStep");
execute_step(StartCommunicationBroker, &bootstrap).await?;
Expand Down
14 changes: 10 additions & 4 deletions core/main/src/broker/broker_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,16 @@ impl BrokerUtils {
params: Option<Value>,
) -> RpcResult<Value> {
let rpc_request = RpcRequest::internal(method, on_behalf_of).with_params(params);
state
.metrics
.add_api_stats(&rpc_request.ctx.request_id, method);
Self::internal_request(state, rpc_request).await
let request_id = rpc_request.ctx.request_id.clone();
state.metrics.add_api_stats(&request_id, method);
let result = Self::internal_request(state, rpc_request).await;

// MEMORY FIX: Clean up api_stats after internal request completes
// Previously this was never cleaned up, causing HashMap to grow indefinitely
let mut state_mut = state.clone();
state_mut.metrics.remove_api_stats(&request_id);

result
}

async fn internal_request(state: &PlatformState, rpc_request: RpcRequest) -> RpcResult<Value> {
Expand Down
81 changes: 72 additions & 9 deletions core/main/src/broker/endpoint_broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,32 @@ pub struct EndpointBrokerState {
metrics_state: OpMetricState,
}

/// Memory monitoring: Broker state metrics
#[derive(Debug, Clone)]
pub struct BrokerMetrics {
pub request_map_size: usize,
pub extension_map_size: usize,
pub endpoint_map_size: usize,
pub cleaner_list_size: usize,
}

/// Memory monitoring: Endpoint validation results
#[derive(Debug, Clone)]
pub struct EndpointValidationResult {
pub total_endpoints: usize,
pub healthy_endpoints: usize,
pub closed_channels: usize,
pub issues: Vec<EndpointValidationIssue>,
}

/// Memory monitoring: Endpoint validation issues
#[derive(Debug, Clone)]
pub enum EndpointValidationIssue {
ClosedChannel(String),
UnresponsiveEndpoint(String),
OrphanedConnection(String),
}

#[derive(Debug)]
pub enum HandleBrokerageError {
RuleNotFound(String),
Expand Down Expand Up @@ -510,6 +536,28 @@ impl EndpointBrokerState {
pub fn has_rule(&self, rule: &str) -> bool {
self.rule_engine.read().unwrap().has_rule(rule)
}

pub fn log_hashmap_sizes(&self) {
if let Ok(request_map) = self.request_map.read() {
debug!(
"MEMORY_DEBUG: broker request_map size: {}",
request_map.len()
);
}
if let Ok(extension_map) = self.extension_request_map.read() {
debug!(
"MEMORY_DEBUG: broker extension_request_map size: {}",
extension_map.len()
);
}
if let Ok(endpoint_map) = self.endpoint_map.read() {
debug!(
"MEMORY_DEBUG: broker endpoint_map size: {}",
endpoint_map.len()
);
}
}

#[cfg(not(test))]
fn reconnect_thread(&self, mut rx: Receiver<BrokerConnectRequest>, client: RippleClient) {
use crate::firebolt::firebolt_gateway::FireboltGatewayCommand;
Expand Down Expand Up @@ -1101,11 +1149,26 @@ impl EndpointBrokerState {

for cleaner in cleaners {
/*
for now, just eat the error - the return type was mainly added to prepate for future refactoring/testability
for now, just eat the error - the return type was mainly added to prepare for future refactoring/testability
*/
let _ = cleaner.cleanup_session(app_id).await;
}
}

/// Memory optimization: Get broker state metrics for monitoring
pub fn get_broker_metrics(&self) -> BrokerMetrics {
let request_map_size = self.request_map.read().unwrap().len();
let extension_map_size = self.extension_request_map.read().unwrap().len();
let endpoint_map_size = self.endpoint_map.read().unwrap().len();
let cleaner_list_size = self.cleaner_list.read().unwrap().len();

BrokerMetrics {
request_map_size,
extension_map_size,
endpoint_map_size,
cleaner_list_size,
}
}
}

/// Trait which contains all the abstract methods for a Endpoint Broker
Expand Down Expand Up @@ -1407,11 +1470,11 @@ impl BrokerOutputForwarder {
.get_api_stats(&rpc_request.ctx.request_id)
{
message.stats = Some(api_stats);
if rpc_request.ctx.app_id.eq_ignore_ascii_case("internal") {
platform_state
.metrics
.remove_api_stats(&rpc_request.ctx.request_id);
}
// MEMORY FIX: Always remove API stats after use to prevent accumulation
// Previously only removed for "internal" app_id, causing memory growth
platform_state
.metrics
.remove_api_stats(&rpc_request.ctx.request_id);
}
if matches!(rpc_request.ctx.protocol, ApiProtocol::Extn) {
if let Ok(extn_message) =
Expand All @@ -1428,7 +1491,7 @@ impl BrokerOutputForwarder {
Self::handle_service_message(rpc_request, &message, platform_state).await;
} else if let Some(session) = platform_state
.session_state
.get_session_for_connection_id(&session_id)
.get_session_for_connection_id_legacy(&session_id)
{
let _ = session.send_json_rpc(message).await;
}
Expand Down Expand Up @@ -1516,7 +1579,7 @@ impl BrokerOutputForwarder {
);
if let Some(session) = platform_state_c
.session_state
.get_session_for_connection_id(&session_id)
.get_session_for_connection_id_legacy(&session_id)
{
let _ = session.send_json_rpc(message).await;
}
Expand Down Expand Up @@ -1739,7 +1802,7 @@ impl BrokerOutputForwarder {

if let Some(session) = platform_state_c
.session_state
.get_session_for_connection_id(&session_id)
.get_session_for_connection_id_legacy(&session_id)
{
let _ = session.send_json_rpc(message).await;
}
Expand Down
6 changes: 5 additions & 1 deletion core/main/src/broker/event_management_utility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl EventManagementUtility {
}

pub async fn advertising_policy_event_decorator(
platform_state: PlatformState,
mut platform_state: PlatformState,
ctx: CallContext,
value: Option<Value>,
) -> Result<Option<Value>, RippleError> {
Expand Down Expand Up @@ -108,6 +108,10 @@ impl EventManagementUtility {
} else {
None
};

// MEMORY FIX: Clean up api_stats after internal request completes
platform_state.metrics.remove_api_stats(&ctx.request_id);

Ok(policy)
}
}
Loading