diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..5a77293 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,109 @@ +# CITA-Cloud Controller + +## 基本信息 +- **名称**: controller +- **版本**: 6.7.5 +- **作者**: Rivtower Technologies +- **许可证**: Apache-2.0 +- **Rust 版本**: Edition 2024 + +## 项目简介 +Controller 是 CITA-Cloud 区块链架构中的核心控制微服务。它充当各个微服务模块(共识、网络、存储、执行)之间的协调者,负责维护本地区块链状态、交易池管理、区块同步以及节点间状态的广播与更新。 + +## 核心能力 +1. **状态机管理** + - 使用 `statig` 库实现状态机,管理节点状态(如 Offline, Online, Syncing, ParticipateInConsensus 等)。 + - 协调节点在不同阶段的行为,确保系统状态的一致性。 + +2. **交易池管理 (Mempool)** + - 接收并验证交易,维护待打包的交易列表。 + - 支持交易配额(Quota)限制和去重。 + +3. **区块同步 (Block Sync)** + - 监控全局链高度,自动触发区块同步流程。 + - 处理 `SyncBlockReq` 和 `SyncBlockRespond` 消息,从对等节点获取缺失区块。 + - `SyncManager` 负责管理同步请求和区块数据的重组。 + +4. **节点管理 (Node Manager)** + - 维护对等节点的状态(高度、Hash 等)。 + - 识别并处理恶意节点(Misbehavior Node),维护黑名单。 + +5. **微服务交互 (gRPC)** + - **Server 端**: 提供 Consensus, Network, RPC, HealthCheck 服务接口。 + - **Client 端**: 调用 Consensus, Executor, Network, Storage 服务接口。 + - 实现了 CITA-Cloud 标准协议 (`cita_cloud_proto`)。 + +6. **系统配置与治理** + - 管理系统级配置(如管理员、验证人列表、区块间隔等)。 + - 支持通过系统交易更新链上配置。 + +## 运行依赖 +### 编译环境 +- Rust (最新稳定版,支持 Edition 2024) +- Protobuf Compiler (`protoc`) + +### 运行时依赖 +Controller 作为 CITA-Cloud 的一部分,通常需要与其他微服务配合运行: +- **Network Microservice**: 负责 P2P 网络通信。 +- **Consensus Microservice**: 负责共识算法(如 Raft, BFT)。 +- **Storage Microservice**: 负责底层数据存储。 +- **Executor Microservice**: 负责交易执行(EVM, WASM 等)。 + +## 使用示例 + +### 1. 编译项目 +```bash +cargo build --release +``` + +### 2. 查看帮助信息 +```bash +./target/release/controller --help +``` + +### 3. 运行服务 +Controller 需要配置文件和私钥文件来启动。 + +```bash +./target/release/controller run \ + --config config.toml \ + --private_key_path private_key +``` + +#### 命令行参数说明 +- `run`: 启动服务的子命令。 +- `-c, --config `: 指定配置文件路径(默认为 `config.toml`)。 +- `-p, --private_key_path `: 指定节点私钥文件路径(默认为 `private_key`)。 + +### 4. 配置文件示例 (config.toml) +```toml +# 监听端口 +network_port = 50004 +# 域名 +domain = "controller" + +# 日志配置 +[log_config] +max_level = "info" +filter = "info" +service_name = "controller" +rolling_file_path = "logs" +agent_endpoint = "127.0.0.1:6831" + +# 其他微服务地址 +[network_client] +port = 50000 +host = "127.0.0.1" + +[consensus_client] +port = 50001 +host = "127.0.0.1" + +[executor_client] +port = 50002 +host = "127.0.0.1" + +[storage_client] +port = 50003 +host = "127.0.0.1" +``` diff --git a/Cargo.toml b/Cargo.toml index 6e031db..f79ddbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,38 +1,38 @@ [package] name = "controller" -version = "6.7.4" +version = "6.7.5" authors = ["Rivtower Technologies "] license = "Apache-2.0" -edition = "2021" +edition = "2024" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] clap = { version = "4.5", features = ["derive"] } -tonic = "0.12" -prost = "0.13" -tokio = { version = "1.41", features = ["full"] } -rand = "0.8" -toml = "0.8" +tonic = "0.14" +prost = "0.14" +tokio = { version = "1.49", features = ["full"] } +rand = "0.10" +toml = "1.0" serde = "1.0" serde_derive = "1.0" hex = "0.4" tower = "0.5" tracing = "0.1" -tonic-reflection = "0.12" -tonic-web = "0.12" -statig = { version = "0.3", features = ["async"] } -flume = "0.11" -rayon = "1.10" +tonic-reflection = "0.14" +tonic-web = "0.14" +statig = { version = "0.4", features = ["async"] } +flume = "0.12" +rayon = "1.11" cfg-if = "1.0" futures = "0.3" -indexmap = "2.6" +indexmap = "2.13" -cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs" } -cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs" } +cloud-util = { package = "cloud-util", git = "https://github.com/cita-cloud/cloud-common-rs", branch = "update" } +cita_cloud_proto = { package = "cita_cloud_proto", git = "https://github.com/cita-cloud/cloud-common-rs", branch = "update" } -crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true } -crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true } +crypto_sm = { git = "https://github.com/cita-cloud/crypto_sm", package = "crypto", optional = true, branch = "update" } +crypto_eth = { git = "https://github.com/cita-cloud/crypto_eth", package = "crypto", optional = true, branch = "update" } [profile.release.package."*"] # Set the default for dependencies. diff --git a/src/core/auditor.rs b/src/core/auditor.rs index 04af692..4308fa5 100644 --- a/src/core/auditor.rs +++ b/src/core/auditor.rs @@ -19,15 +19,15 @@ use std::{ use cita_cloud_proto::{ blockchain::{ - raw_transaction::Tx::{NormalTx, UtxoTx}, RawTransaction, RawTransactions, Transaction, UnverifiedUtxoTransaction, UtxoTransaction, + raw_transaction::Tx::{NormalTx, UtxoTx}, }, status_code::StatusCodeEnum, }; -use futures::{stream::FuturesUnordered, StreamExt}; +use futures::{StreamExt, stream::FuturesUnordered}; use crate::{ - core::system_config::{SystemConfig, LOCK_ID_BUTTON, LOCK_ID_VERSION}, + core::system_config::{LOCK_ID_BUTTON, LOCK_ID_VERSION, SystemConfig}, grpc_client::storage::get_compact_block, }; diff --git a/src/core/chain.rs b/src/core/chain.rs index ed2c7e7..ef3f23f 100644 --- a/src/core/chain.rs +++ b/src/core/chain.rs @@ -18,8 +18,8 @@ use tokio::{sync::RwLock, time}; use cita_cloud_proto::{ blockchain::{ - raw_transaction::Tx, Block, BlockHeader, CompactBlock, CompactBlockBody, RawTransaction, - RawTransactions, + Block, BlockHeader, CompactBlock, CompactBlockBody, RawTransaction, RawTransactions, + raw_transaction::Tx, }, common::{ConsensusConfiguration, Hash, ProposalInner}, status_code::StatusCodeEnum, diff --git a/src/core/controller.rs b/src/core/controller.rs index 5d7f6e4..843026a 100644 --- a/src/core/controller.rs +++ b/src/core/controller.rs @@ -57,12 +57,12 @@ use crate::{ protocol::{ controller_msg_type::ControllerMsgType, node_manager::{ - chain_status_respond::Respond, ChainStatus, ChainStatusInit, ChainStatusRespond, - NodeAddress, NodeManager, + ChainStatus, ChainStatusInit, ChainStatusRespond, NodeAddress, NodeManager, + chain_status_respond::Respond, }, sync_manager::{ - sync_block_respond, SyncBlockRequest, SyncBlockRespond, SyncBlocks, SyncManager, - SyncTxRequest, SyncTxRespond, + SyncBlockRequest, SyncBlockRespond, SyncBlocks, SyncManager, SyncTxRequest, + SyncTxRespond, sync_block_respond, }, }, util::*, @@ -599,11 +599,11 @@ impl Controller { .await?; if proposal_inner.pre_state_root != pre_state_root { warn!( - "check proposal({}) failed: pre_state_root: 0x{}, local pre_state_root: 0x{}", - block_height, - hex::encode(&proposal_inner.pre_state_root), - hex::encode(&pre_state_root), - ); + "check proposal({}) failed: pre_state_root: 0x{}, local pre_state_root: 0x{}", + block_height, + hex::encode(&proposal_inner.pre_state_root), + hex::encode(&pre_state_root), + ); return Err(StatusCodeEnum::ProposalCheckError); } @@ -693,7 +693,9 @@ impl Controller { if transactions_root != header.transactions_root { warn!( "check proposal({}) failed: header transactions_root: {}, controller calculate: {}", - block_height, hex::encode(&header.transactions_root), hex::encode(&transactions_root), + block_height, + hex::encode(&header.transactions_root), + hex::encode(&transactions_root), ); return Err(StatusCodeEnum::ProposalCheckError); } @@ -959,7 +961,8 @@ impl Controller { h160_address_check(Some(&node))?; let node_origin = NodeAddress::from(&node); warn!( - "process ChainStatusRespondType failed: remote check chain_status failed: NotSameChain. ban remote node. origin: {}", node_origin + "process ChainStatusRespondType failed: remote check chain_status failed: NotSameChain. ban remote node. origin: {}", + node_origin ); self.delete_global_status(&node_origin).await; self.node_manager.set_ban_node(&node_origin).await?; @@ -1229,7 +1232,7 @@ impl Controller { ) }); } - if (!in_sync || global_height % self.config.force_sync_epoch == 0) + if (!in_sync || global_height.is_multiple_of(self.config.force_sync_epoch)) && self .sync_manager .contains_block(own_status.height + 1) @@ -1342,9 +1345,7 @@ impl Controller { Ordering::Greater => { error!( "node status rollback: old height: {}, current height: {}. set it misbehavior. origin: {}", - old_cs.height, - current_cs.height, - &na + old_cs.height, current_cs.height, &na ); let _ = self.node_manager.set_misbehavior_node(na).await; } @@ -1432,7 +1433,7 @@ impl Controller { syncing = true; } Err(e) => { - if (e as u64) % 100 == 0 { + if (e as u64).is_multiple_of(100) { warn!("sync block failed: {}", e.to_string()); continue; } diff --git a/src/core/pool.rs b/src/core/pool.rs index 9d93aee..45cc359 100644 --- a/src/core/pool.rs +++ b/src/core/pool.rs @@ -21,7 +21,7 @@ use std::{ }; use cita_cloud_proto::blockchain::{ - raw_transaction::Tx, RawTransaction, UnverifiedUtxoTransaction, + RawTransaction, UnverifiedUtxoTransaction, raw_transaction::Tx, }; use indexmap::IndexSet; use tokio::sync::RwLock; @@ -30,7 +30,7 @@ use crate::{grpc_client::storage::reload_transactions_pool, util::get_tx_quota}; use super::{ auditor::Auditor, - system_config::{SystemConfig, LOCK_ID_BUTTON, LOCK_ID_VERSION}, + system_config::{LOCK_ID_BUTTON, LOCK_ID_VERSION, SystemConfig}, }; // wrapper type for Hash @@ -172,8 +172,8 @@ impl Pool { fn tx_is_valid(sys_config: &SystemConfig, raw_tx: &RawTransaction, height: u64) -> bool { match &raw_tx.tx { - Some(Tx::NormalTx(ref normal_tx)) => match normal_tx.transaction { - Some(ref tx) => { + Some(Tx::NormalTx(normal_tx)) => match &normal_tx.transaction { + Some(tx) => { height <= tx.valid_until_block && tx.valid_until_block < height + sys_config.block_limit } diff --git a/src/core/state_machine.rs b/src/core/state_machine.rs index d66600f..d2eb619 100644 --- a/src/core/state_machine.rs +++ b/src/core/state_machine.rs @@ -48,18 +48,16 @@ pub struct ControllerStateMachine; #[state_machine( initial = "State::participate_in_consensus()", state(derive(Debug, Clone, PartialEq, Eq)), - superstate(derive(Debug, Clone, PartialEq, Eq)), - on_transition = "Self::on_transition", - on_dispatch = "Self::on_dispatch" + superstate(derive(Debug, Clone, PartialEq, Eq)) )] impl ControllerStateMachine { #[state] - async fn offline(&self) -> Response { + async fn offline(&self) -> Outcome { Handled } #[superstate] - async fn online(&self, context: &Controller, event: &Event) -> Response { + async fn online(&self, context: &Controller, event: &Event) -> Outcome { debug!("online: `{event:?}`"); match event { Event::SyncBlockReq(req, origin) => { @@ -89,7 +87,7 @@ impl ControllerStateMachine { &self, context: &mut Controller, event: &Event, - ) -> Response { + ) -> Outcome { debug!("participate_in_consensus: `{event:?}`"); match event { Event::TryUpdateGlobalStatus(node, status) => { @@ -103,7 +101,7 @@ impl ControllerStateMachine { } #[superstate(superstate = "online", exit_action = "exit_sync")] - async fn sync(&self, context: &mut Controller, event: &Event) -> Response { + async fn sync(&self, context: &mut Controller, event: &Event) -> Outcome { debug!("sync: `{event:?}`"); match event { Event::TryUpdateGlobalStatus(node, status) => { @@ -117,13 +115,13 @@ impl ControllerStateMachine { } #[state(superstate = "sync")] - async fn prepare_sync(&self, event: &Event) -> Response { + async fn prepare_sync(&self, event: &Event) -> Outcome { debug!("prepare_sync: `{event:?}`"); Super } #[state(superstate = "sync", entry_action = "enter_syncing")] - async fn syncing(&self, event: &Event) -> Response { + async fn syncing(&self, event: &Event) -> Outcome { debug!("syncing: `{event:?}`"); Super } @@ -141,19 +139,7 @@ impl ControllerStateMachine { } } -impl ControllerStateMachine { - fn on_transition(&mut self, source: &State, target: &State) { - if source != target { - info!("transitioned from `{source:?}` to `{target:?}`"); - } - } - - fn on_dispatch(&mut self, state: StateOrSuperstate, event: &Event) { - debug!("dispatching `{event:?}` to `{state:?}`"); - } -} - -async fn handle_sync_block(context: &Controller) -> statig::Response { +async fn handle_sync_block(context: &Controller) -> statig::Outcome { debug!("receive SyncBlock event"); let (_, global_status) = context.get_global_status().await; let res = { @@ -167,7 +153,7 @@ async fn handle_sync_block(context: &Controller) -> statig::Response { } } -async fn try_sync_block(context: &Controller) -> statig::Response { +async fn try_sync_block(context: &Controller) -> statig::Outcome { let (global_address, global_status) = context.get_global_status().await; let current_height = context.get_status().await.height; @@ -220,7 +206,7 @@ fn test_state() { #[tokio::test] async fn test() { - use crate::{config::ControllerConfig, GenesisBlock, SystemConfig}; + use crate::{GenesisBlock, SystemConfig, config::ControllerConfig}; let config = ControllerConfig::new("example/config.toml"); let genesis = GenesisBlock::new("example/config.toml"); diff --git a/src/core/system_config.rs b/src/core/system_config.rs index 234a4a2..e124bd1 100644 --- a/src/core/system_config.rs +++ b/src/core/system_config.rs @@ -17,7 +17,7 @@ use serde_derive::Deserialize; use std::collections::HashMap; use cita_cloud_proto::{ - blockchain::{raw_transaction::Tx::UtxoTx, RawTransaction, UnverifiedUtxoTransaction}, + blockchain::{RawTransaction, UnverifiedUtxoTransaction, raw_transaction::Tx::UtxoTx}, controller::SystemConfig as ProtoSystemConfig, status_code::StatusCodeEnum, storage::Regions, @@ -140,7 +140,11 @@ impl SystemConfig { .await != StatusCodeEnum::Success { - panic!("update system config by utxo_tx hash failed: lock_id: {}, utxo_tx hash: 0x{}", lock_id, hex::encode(data_or_tx_hash)) + panic!( + "update system config by utxo_tx hash failed: lock_id: {}, utxo_tx hash: 0x{}", + lock_id, + hex::encode(data_or_tx_hash) + ) } } else { info!("update system config by data: lock_id: {}", lock_id); @@ -293,7 +297,7 @@ impl SystemConfig { LOCK_ID_VALIDATORS => { let l = self.validator_address_len; let mut validators = Vec::new(); - if data.len() % l == 0 || self.is_danger { + if data.len().is_multiple_of(l) || self.is_danger { for i in 0..(data.len() / l) { validators.push(data[i * l..(i + 1) * l].to_vec()) } diff --git a/src/grpc_client/storage.rs b/src/grpc_client/storage.rs index 2159fa6..3e73185 100644 --- a/src/grpc_client/storage.rs +++ b/src/grpc_client/storage.rs @@ -16,7 +16,7 @@ use prost::Message; use cita_cloud_proto::{ blockchain::{ - raw_transaction::Tx::UtxoTx, Block, CompactBlock, RawTransaction, RawTransactions, + Block, CompactBlock, RawTransaction, RawTransactions, raw_transaction::Tx::UtxoTx, }, client::StorageClientTrait, common::{Proof, ProposalInner, StateRoot}, diff --git a/src/grpc_server/health_check_server.rs b/src/grpc_server/health_check_server.rs index 4fbb9b6..0aa53b1 100755 --- a/src/grpc_server/health_check_server.rs +++ b/src/grpc_server/health_check_server.rs @@ -17,8 +17,8 @@ use std::sync::atomic::{AtomicU64, Ordering}; use tonic::{Request, Response, Status}; use cita_cloud_proto::health_check::{ - health_check_response::ServingStatus, health_server::Health, HealthCheckRequest, - HealthCheckResponse, + HealthCheckRequest, HealthCheckResponse, health_check_response::ServingStatus, + health_server::Health, }; use crate::core::controller::Controller; diff --git a/src/grpc_server/mod.rs b/src/grpc_server/mod.rs index 2b52e72..dd472e6 100644 --- a/src/grpc_server/mod.rs +++ b/src/grpc_server/mod.rs @@ -24,6 +24,7 @@ use tonic::transport::Server; use tonic_web::GrpcWebLayer; use cita_cloud_proto::{ + CONTROLLER_DESCRIPTOR_SET, controller::{ consensus2_controller_service_server::Consensus2ControllerServiceServer, rpc_service_server::RpcServiceServer, @@ -31,9 +32,8 @@ use cita_cloud_proto::{ health_check::health_server::HealthServer, network::network_msg_handler_service_server::NetworkMsgHandlerServiceServer, status_code::StatusCodeEnum, - CONTROLLER_DESCRIPTOR_SET, }; -use cloud_util::metrics::{run_metrics_exporter, MiddlewareLayer}; +use cloud_util::metrics::{MiddlewareLayer, run_metrics_exporter}; use crate::{ config::ControllerConfig, diff --git a/src/grpc_server/network_server.rs b/src/grpc_server/network_server.rs index da01281..bb9e821 100644 --- a/src/grpc_server/network_server.rs +++ b/src/grpc_server/network_server.rs @@ -15,7 +15,7 @@ use tonic::{Request, Response, Status}; use cita_cloud_proto::{ - network::{network_msg_handler_service_server::NetworkMsgHandlerService, NetworkMsg}, + network::{NetworkMsg, network_msg_handler_service_server::NetworkMsgHandlerService}, status_code::StatusCodeEnum, }; diff --git a/src/grpc_server/rpc_server.rs b/src/grpc_server/rpc_server.rs index 6929e41..a0a0603 100644 --- a/src/grpc_server/rpc_server.rs +++ b/src/grpc_server/rpc_server.rs @@ -23,7 +23,7 @@ use cita_cloud_proto::{ blockchain::{Block, CompactBlock, RawTransaction, RawTransactions}, common::{Empty, Hash, Hashes, NodeNetInfo, NodeStatus, Proof, StateRoot}, controller::{ - rpc_service_server::RpcService, BlockNumber, Flag, SystemConfig, TransactionIndex, + BlockNumber, Flag, SystemConfig, TransactionIndex, rpc_service_server::RpcService, }, status_code::StatusCodeEnum, }; diff --git a/src/main.rs b/src/main.rs index a2e0bb5..dd58a81 100644 --- a/src/main.rs +++ b/src/main.rs @@ -75,8 +75,6 @@ struct RunOpts { } fn main() { - ::std::env::set_var("RUST_BACKTRACE", "full"); - let opts: Opts = Opts::parse(); // You can handle information about subcommands by requesting their matches by name diff --git a/src/protocol/node_manager.rs b/src/protocol/node_manager.rs index 2e16dc8..cb99561 100644 --- a/src/protocol/node_manager.rs +++ b/src/protocol/node_manager.rs @@ -246,19 +246,24 @@ impl NodeManager { rd.get(na).map(|cs| cs.height) }; - if current_height.is_none() || new_status.height > current_height.unwrap() { - info!( - "update node status: origin: {}, height: {}, hash: 0x{}", - na, - new_status.height, - hex::encode(new_status.hash.clone().unwrap().hash) - ); - let mut wr = self.nodes.write().await; - Ok(wr.insert(*na, new_status)) - } else if new_status.height == current_height.unwrap() { - Ok(Some(new_status)) - } else { - Err(StatusCodeEnum::EarlyStatus) + match current_height { + Some(h) if new_status.height <= h => { + if new_status.height == h { + Ok(Some(new_status)) + } else { + Err(StatusCodeEnum::EarlyStatus) + } + } + _ => { + info!( + "update node status: origin: {}, height: {}, hash: 0x{}", + na, + new_status.height, + hex::encode(new_status.hash.clone().unwrap().hash) + ); + let mut wr = self.nodes.write().await; + Ok(wr.insert(*na, new_status)) + } } } diff --git a/src/util.rs b/src/util.rs index 1baf41f..7817978 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,7 +14,7 @@ use crate::crypto::hash_data; use cita_cloud_proto::{ - blockchain::{raw_transaction::Tx, BlockHeader, RawTransaction, RawTransactions}, + blockchain::{BlockHeader, RawTransaction, RawTransactions, raw_transaction::Tx}, status_code::StatusCodeEnum, }; use cloud_util::common::get_tx_hash;