diff --git a/rocketmq-client/src/base/client_config.rs b/rocketmq-client/src/base/client_config.rs index 52197a8bf..872e9b9b8 100644 --- a/rocketmq-client/src/base/client_config.rs +++ b/rocketmq-client/src/base/client_config.rs @@ -63,6 +63,8 @@ pub struct ClientConfig { pub enable_heartbeat_channel_event_listener: bool, pub enable_trace: bool, pub trace_topic: Option, + pub trace_msg_batch_num: usize, + pub max_page_size_in_get_metadata: usize, } impl Default for ClientConfig { @@ -123,12 +125,19 @@ impl ClientConfig { detect_interval: Duration::from_secs(2).as_millis() as u32, language: LanguageCode::RUST, enable_stream_request_type: false, - send_latency_enable: env::var(Self::SEND_LATENCY_ENABLE).unwrap_or_else(|_| "false".to_string()) == "false", - start_detector_enable: env::var(Self::START_DETECTOR_ENABLE).unwrap_or_else(|_| "false".to_string()) - == "false", + send_latency_enable: env::var(Self::SEND_LATENCY_ENABLE) + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .unwrap_or(false), + start_detector_enable: env::var(Self::START_DETECTOR_ENABLE) + .unwrap_or_else(|_| "false".to_string()) + .parse::() + .unwrap_or(false), enable_heartbeat_channel_event_listener: true, enable_trace: false, trace_topic: None, + trace_msg_batch_num: 10, + max_page_size_in_get_metadata: 2000, } } } @@ -191,12 +200,21 @@ impl ClientConfig { #[inline] pub fn build_mq_client_id(&self) -> String { - let mut sb = String::new(); - sb.push_str(self.client_ip.as_ref().unwrap()); + // Pre-allocate capacity to avoid reallocations + let estimated_capacity = self.client_ip.as_ref().map(|ip| ip.len()).unwrap_or(15) + + self.instance_name.len() + + self.unit_name.as_ref().map(|un| un.len() + 1).unwrap_or(0) + + if self.enable_stream_request_type { 8 } else { 0 } + + 3; // For '@' separators + + let mut sb = String::with_capacity(estimated_capacity); + if let Some(ref client_ip) = self.client_ip { + sb.push_str(client_ip.as_str()); + } sb.push('@'); sb.push_str(self.instance_name.as_str()); - if let Some(unit_name) = &self.unit_name { + if let Some(ref unit_name) = self.unit_name { if !unit_name.is_empty() { sb.push('@'); sb.push_str(unit_name.as_str()); @@ -224,4 +242,398 @@ impl ClientConfig { self.namesrv_addr.clone() } } + + // ============ Comprehensive Getters and Setters ============ + + #[inline] + pub fn get_client_ip(&self) -> Option<&CheetahString> { + self.client_ip.as_ref() + } + + #[inline] + pub fn set_client_ip(&mut self, client_ip: CheetahString) { + self.client_ip = Some(client_ip); + } + + #[inline] + pub fn get_instance_name(&self) -> &CheetahString { + &self.instance_name + } + + #[inline] + pub fn get_client_callback_executor_threads(&self) -> usize { + self.client_callback_executor_threads + } + + #[inline] + pub fn set_client_callback_executor_threads(&mut self, threads: usize) { + self.client_callback_executor_threads = threads; + } + + #[inline] + pub fn get_namespace_v2(&self) -> Option<&CheetahString> { + self.namespace_v2.as_ref() + } + + #[inline] + pub fn set_namespace_v2(&mut self, namespace_v2: CheetahString) { + self.namespace_v2 = Some(namespace_v2); + } + + #[inline] + pub fn set_namespace(&mut self, namespace: CheetahString) { + self.namespace = Some(namespace); + self.namespace_initialized.store(true, Ordering::Release); + } + + #[inline] + pub fn get_access_channel(&self) -> AccessChannel { + self.access_channel + } + + #[inline] + pub fn set_access_channel(&mut self, access_channel: AccessChannel) { + self.access_channel = access_channel; + } + + #[inline] + pub fn get_poll_name_server_interval(&self) -> u32 { + self.poll_name_server_interval + } + + #[inline] + pub fn set_poll_name_server_interval(&mut self, interval: u32) { + self.poll_name_server_interval = interval; + } + + #[inline] + pub fn get_heartbeat_broker_interval(&self) -> u32 { + self.heartbeat_broker_interval + } + + #[inline] + pub fn set_heartbeat_broker_interval(&mut self, interval: u32) { + self.heartbeat_broker_interval = interval; + } + + #[inline] + pub fn get_persist_consumer_offset_interval(&self) -> u32 { + self.persist_consumer_offset_interval + } + + #[inline] + pub fn set_persist_consumer_offset_interval(&mut self, interval: u32) { + self.persist_consumer_offset_interval = interval; + } + + #[inline] + pub fn get_pull_time_delay_millis_when_exception(&self) -> u32 { + self.pull_time_delay_millis_when_exception + } + + #[inline] + pub fn set_pull_time_delay_millis_when_exception(&mut self, delay: u32) { + self.pull_time_delay_millis_when_exception = delay; + } + + #[inline] + pub fn get_unit_name(&self) -> Option<&CheetahString> { + self.unit_name.as_ref() + } + + #[inline] + pub fn set_unit_name(&mut self, unit_name: CheetahString) { + self.unit_name = Some(unit_name); + } + + #[inline] + pub fn is_unit_mode(&self) -> bool { + self.unit_mode + } + + #[inline] + pub fn set_unit_mode(&mut self, unit_mode: bool) { + self.unit_mode = unit_mode; + } + + #[inline] + pub fn is_decode_read_body(&self) -> bool { + self.decode_read_body + } + + #[inline] + pub fn set_decode_read_body(&mut self, decode_read_body: bool) { + self.decode_read_body = decode_read_body; + } + + #[inline] + pub fn is_decode_decompress_body(&self) -> bool { + self.decode_decompress_body + } + + #[inline] + pub fn set_decode_decompress_body(&mut self, decode_decompress_body: bool) { + self.decode_decompress_body = decode_decompress_body; + } + + #[inline] + pub fn is_vip_channel_enabled(&self) -> bool { + self.vip_channel_enabled + } + + #[inline] + pub fn set_vip_channel_enabled(&mut self, enabled: bool) { + self.vip_channel_enabled = enabled; + } + + #[inline] + pub fn is_use_heartbeat_v2(&self) -> bool { + self.use_heartbeat_v2 + } + + #[inline] + pub fn set_use_heartbeat_v2(&mut self, use_heartbeat_v2: bool) { + self.use_heartbeat_v2 = use_heartbeat_v2; + } + + #[inline] + pub fn is_use_tls(&self) -> bool { + self.use_tls + } + + #[inline] + pub fn set_use_tls(&mut self, use_tls: bool) { + self.use_tls = use_tls; + } + + #[inline] + pub fn get_socks_proxy_config(&self) -> &CheetahString { + &self.socks_proxy_config + } + + #[inline] + pub fn set_socks_proxy_config(&mut self, config: CheetahString) { + self.socks_proxy_config = config; + } + + #[inline] + pub fn get_language(&self) -> LanguageCode { + self.language + } + + #[inline] + pub fn set_language(&mut self, language: LanguageCode) { + self.language = language; + } + + #[inline] + pub fn get_mq_client_api_timeout(&self) -> u64 { + self.mq_client_api_timeout + } + + #[inline] + pub fn set_mq_client_api_timeout(&mut self, timeout: u64) { + self.mq_client_api_timeout = timeout; + } + + #[inline] + pub fn get_detect_timeout(&self) -> u32 { + self.detect_timeout + } + + #[inline] + pub fn set_detect_timeout(&mut self, timeout: u32) { + self.detect_timeout = timeout; + } + + #[inline] + pub fn get_detect_interval(&self) -> u32 { + self.detect_interval + } + + #[inline] + pub fn set_detect_interval(&mut self, interval: u32) { + self.detect_interval = interval; + } + + #[inline] + pub fn is_enable_stream_request_type(&self) -> bool { + self.enable_stream_request_type + } + + #[inline] + pub fn set_enable_stream_request_type(&mut self, enabled: bool) { + self.enable_stream_request_type = enabled; + } + + #[inline] + pub fn is_send_latency_enable(&self) -> bool { + self.send_latency_enable + } + + #[inline] + pub fn set_send_latency_enable(&mut self, enabled: bool) { + self.send_latency_enable = enabled; + } + + #[inline] + pub fn is_start_detector_enable(&self) -> bool { + self.start_detector_enable + } + + #[inline] + pub fn set_start_detector_enable(&mut self, enabled: bool) { + self.start_detector_enable = enabled; + } + + #[inline] + pub fn is_enable_heartbeat_channel_event_listener(&self) -> bool { + self.enable_heartbeat_channel_event_listener + } + + #[inline] + pub fn set_enable_heartbeat_channel_event_listener(&mut self, enabled: bool) { + self.enable_heartbeat_channel_event_listener = enabled; + } + + #[inline] + pub fn is_enable_trace(&self) -> bool { + self.enable_trace + } + + #[inline] + pub fn set_enable_trace(&mut self, enabled: bool) { + self.enable_trace = enabled; + } + + #[inline] + pub fn get_trace_topic(&self) -> Option<&CheetahString> { + self.trace_topic.as_ref() + } + + #[inline] + pub fn set_trace_topic(&mut self, topic: CheetahString) { + self.trace_topic = Some(topic); + } + + #[inline] + pub fn get_trace_msg_batch_num(&self) -> usize { + self.trace_msg_batch_num + } + + #[inline] + pub fn set_trace_msg_batch_num(&mut self, num: usize) { + self.trace_msg_batch_num = num; + } + + #[inline] + pub fn get_max_page_size_in_get_metadata(&self) -> usize { + self.max_page_size_in_get_metadata + } + + #[inline] + pub fn set_max_page_size_in_get_metadata(&mut self, size: usize) { + self.max_page_size_in_get_metadata = size; + } + + // ============ Utility Methods ============ + + /// Clones the configuration + pub fn clone_client_config(&self) -> Self { + self.clone() + } + + /// Resets client config from another instance + pub fn reset_client_config(&mut self, other: &ClientConfig) { + self.namesrv_addr = other.namesrv_addr.clone(); + self.client_ip = other.client_ip.clone(); + self.instance_name = other.instance_name.clone(); + self.client_callback_executor_threads = other.client_callback_executor_threads; + self.namespace = other.namespace.clone(); + self.namespace_v2 = other.namespace_v2.clone(); + self.access_channel = other.access_channel; + self.poll_name_server_interval = other.poll_name_server_interval; + self.heartbeat_broker_interval = other.heartbeat_broker_interval; + self.persist_consumer_offset_interval = other.persist_consumer_offset_interval; + self.pull_time_delay_millis_when_exception = other.pull_time_delay_millis_when_exception; + self.unit_mode = other.unit_mode; + self.unit_name = other.unit_name.clone(); + self.decode_read_body = other.decode_read_body; + self.decode_decompress_body = other.decode_decompress_body; + self.vip_channel_enabled = other.vip_channel_enabled; + self.use_heartbeat_v2 = other.use_heartbeat_v2; + self.use_tls = other.use_tls; + self.socks_proxy_config = other.socks_proxy_config.clone(); + self.language = other.language; + self.mq_client_api_timeout = other.mq_client_api_timeout; + self.detect_timeout = other.detect_timeout; + self.detect_interval = other.detect_interval; + self.enable_stream_request_type = other.enable_stream_request_type; + self.send_latency_enable = other.send_latency_enable; + self.start_detector_enable = other.start_detector_enable; + self.enable_heartbeat_channel_event_listener = other.enable_heartbeat_channel_event_listener; + self.enable_trace = other.enable_trace; + self.trace_topic = other.trace_topic.clone(); + self.trace_msg_batch_num = other.trace_msg_batch_num; + self.max_page_size_in_get_metadata = other.max_page_size_in_get_metadata; + } + + /// Deprecated: Use with_namespace instead + #[inline] + #[deprecated(note = "Use with_namespace for namespace wrapping")] + pub fn without_namespace(&mut self, resource: &str) -> CheetahString { + if let Some(namespace) = self.get_namespace().as_deref() { + NamespaceUtil::without_namespace_with_namespace(resource, namespace).into() + } else { + NamespaceUtil::without_namespace(resource).into() + } + } +} + +impl std::fmt::Display for ClientConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ClientConfig {{ namesrv_addr: {:?}, client_ip: {:?}, instance_name: {}, \ + client_callback_executor_threads: {}, namespace: {:?}, namespace_v2: {:?}, access_channel: {:?}, \ + poll_name_server_interval: {}, heartbeat_broker_interval: {}, persist_consumer_offset_interval: {}, \ + pull_time_delay_millis_when_exception: {}, unit_mode: {}, unit_name: {:?}, decode_read_body: {}, \ + decode_decompress_body: {}, vip_channel_enabled: {}, use_heartbeat_v2: {}, use_tls: {}, \ + socks_proxy_config: {}, mq_client_api_timeout: {}, detect_timeout: {}, detect_interval: {}, language: \ + {:?}, enable_stream_request_type: {}, send_latency_enable: {}, start_detector_enable: {}, \ + enable_heartbeat_channel_event_listener: {}, enable_trace: {}, trace_topic: {:?}, trace_msg_batch_num: \ + {}, max_page_size_in_get_metadata: {} }}", + self.namesrv_addr, + self.client_ip, + self.instance_name, + self.client_callback_executor_threads, + self.namespace, + self.namespace_v2, + self.access_channel, + self.poll_name_server_interval, + self.heartbeat_broker_interval, + self.persist_consumer_offset_interval, + self.pull_time_delay_millis_when_exception, + self.unit_mode, + self.unit_name, + self.decode_read_body, + self.decode_decompress_body, + self.vip_channel_enabled, + self.use_heartbeat_v2, + self.use_tls, + self.socks_proxy_config, + self.mq_client_api_timeout, + self.detect_timeout, + self.detect_interval, + self.language, + self.enable_stream_request_type, + self.send_latency_enable, + self.start_detector_enable, + self.enable_heartbeat_channel_event_listener, + self.enable_trace, + self.trace_topic, + self.trace_msg_batch_num, + self.max_page_size_in_get_metadata + ) + } }