Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
64987d7
Make SafetyWindow a Parameter (#809)
patimen Feb 27, 2026
6b2a977
Merge branch 'main' into upgrade-v0.2.11
patimen Feb 27, 2026
a401e2c
Described the proposal
libermans Sep 1, 2025
b5ecb76
base impl for timing caculator
Dec 16, 2025
00763b0
base impl for State Management
Dec 16, 2025
29cb019
base impl for user-friendly status messages
Dec 16, 2025
aa19222
base impl for Proactive MLnode Testing System
Dec 16, 2025
c93eb35
refine the output for getNodes
Dec 17, 2025
77333bb
Integrate timing info in broker GetNodesCommand
Dec 17, 2025
230ed6c
move redaclared func & type adaptation
Dec 18, 2025
512e0b7
Modify RegisterNode command to trigger auto poc testing
Dec 18, 2025
2914221
Query active participant status during startup
Dec 19, 2025
0413eb4
add test to mock intergrated
Dec 19, 2025
957d0ff
logical change for status message
Dec 22, 2025
3f22710
add auto test when update node
Dec 22, 2025
3acbcc9
print getNodes info in api service
Dec 22, 2025
40d770e
add log for auto test
Dec 23, 2025
db276c4
Implement chain-based participant active check.
Dec 24, 2025
ec9069a
send test inference request and validate response is added
Dec 24, 2025
5f74151
add SetNodeMLNodeOnboardingStateCommand to handle mlnode state
Dec 24, 2025
71bc219
reslove cycle dependency for broker,main,admin
Dec 24, 2025
89b0402
enhance log info
Dec 25, 2025
b5243e7
TEST_FAILED state with detailed error reporting in MLnode
Dec 25, 2025
576b3db
funcs been replaced with their v1 version
Feb 4, 2026
83b0682
fix:Make it Compile
Ryanchen911 Feb 28, 2026
43677e2
fix:Make it Compile
Ryanchen911 Feb 28, 2026
908f252
Fix Logic Bugs
Ryanchen911 Mar 1, 2026
a5c7e3d
fix: define missing mockClient variable
Ryanchen911 Mar 2, 2026
ee2e2ed
fix: correct mock test bugs introduced
jacky6block Mar 3, 2026
bbbecd6
consolidate autotest logic and unify timing constants
Ryanchen911 Mar 4, 2026
fa18f2c
fix: improve error handling and suppress confusing logs for inactive …
Ryanchen911 Mar 4, 2026
9cada83
fix: populate testingNodes, fix guidance concat, remove dead code
jacky6block Mar 5, 2026
7cdde52
test(event-listener): align TestRegularPocScenario with v2 inference …
Ryanchen911 Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions decentralized-api/apiconfig/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package apiconfig

// MLNodeOnboardingState represents the onboarding state of an ML node
type MLNodeOnboardingState string

// Constants for MLNodeOnboardingState
const (
MLNodeState_WAITING_FOR_POC MLNodeOnboardingState = "WAITING_FOR_POC"
MLNodeState_TESTING MLNodeOnboardingState = "TESTING"
MLNodeState_TEST_FAILED MLNodeOnboardingState = "TEST_FAILED"
)

// Timing constants used across broker/admin components
const (
DefaultBlockTimeSeconds = 6.0
AutoTestMinSecondsBeforePoC int64 = 3600
OnlineAlertLeadSeconds int64 = 600
)

// ParticipantState represents the state of a participant
type ParticipantState string

// Constants for ParticipantState
const (
ParticipantState_INACTIVE_WAITING ParticipantState = "INACTIVE_WAITING"
ParticipantState_ACTIVE_PARTICIPATING ParticipantState = "ACTIVE_PARTICIPATING"
)
118 changes: 102 additions & 16 deletions decentralized-api/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,22 +107,23 @@ func (b *BrokerChainBridgeImpl) GetParams() (*types.QueryParamsResponse, error)
}

type Broker struct {
highPriorityCommands chan Command
lowPriorityCommands chan Command
nodes map[string]*NodeWithState
mu sync.RWMutex
curMaxNodesNum atomic.Uint64
chainBridge BrokerChainBridge
nodeWorkGroup *NodeWorkGroup
phaseTracker *chainphase.ChainPhaseTracker
participantInfo participant.CurrenParticipantInfo
callbackUrl string
mlNodeClientFactory mlnodeclient.ClientFactory
reconcileTrigger chan struct{}
lastEpochIndex uint64
lastEpochPhase types.EpochPhase
statusQueryTrigger chan statusQuerySignal
configManager *apiconfig.ConfigManager
highPriorityCommands chan Command
lowPriorityCommands chan Command
nodes map[string]*NodeWithState
mu sync.RWMutex
curMaxNodesNum atomic.Uint64
chainBridge BrokerChainBridge
nodeWorkGroup *NodeWorkGroup
phaseTracker *chainphase.ChainPhaseTracker
participantInfo participant.CurrenParticipantInfo
callbackUrl string
mlNodeClientFactory mlnodeclient.ClientFactory
reconcileTrigger chan struct{}
lastEpochIndex uint64
lastEpochPhase types.EpochPhase
lastParticipantWeight int64
statusQueryTrigger chan statusQuerySignal
configManager *apiconfig.ConfigManager
}

// GetParticipantAddress returns the current participant's address if available.
Expand Down Expand Up @@ -254,6 +255,14 @@ type NodeState struct {
// Epoch-specific data, populated from the chain
EpochModels map[string]types.Model `json:"epoch_models"`
EpochMLNodes map[string]types.MLNodeInfo `json:"epoch_ml_nodes"`

Timing *TimingInfo `json:"timing,omitempty"`

UserMessage string `json:"user_message,omitempty"`
Guidance string `json:"guidance,omitempty"`
ParticipantState string `json:"participant_state,omitempty"`
MLNodeOnboardingState string `json:"mlnode_state,omitempty"`
ParticipantWeight int64 `json:"participant_weight,omitempty"`
}

func (s NodeState) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -349,6 +358,13 @@ type NodeResponse struct {
State NodeState `json:"state"`
}

type TimingInfo struct {
CurrentPhase string `json:"current_phase"`
BlocksUntilNextPoC int64 `json:"blocks_until_next_poc"`
SecondsUntilNextPoC int64 `json:"seconds_until_next_poc"`
ShouldBeOnline bool `json:"should_be_online"`
}

func NewBroker(chainBridge BrokerChainBridge, phaseTracker *chainphase.ChainPhaseTracker, participantInfo participant.CurrenParticipantInfo, callbackUrl string, clientFactory mlnodeclient.ClientFactory, configManager *apiconfig.ConfigManager) *Broker {
broker := &Broker{
highPriorityCommands: make(chan Command, 100),
Expand All @@ -373,6 +389,18 @@ func NewBroker(chainBridge BrokerChainBridge, phaseTracker *chainphase.ChainPhas
// go nodeReconciliationWorker(broker)
go nodeStatusQueryWorker(broker)
go broker.reconcilerLoop()

// Startup: try to populate epoch data once chain is synced to expose participant status early
go func() {
for i := 0; i < 10; i++ {
es := broker.phaseTracker.GetCurrentEpochState()
if es != nil && es.IsSynced {
_ = broker.UpdateNodeWithEpochData(es)
return
}
time.Sleep(1 * time.Second)
}
}()
return broker
}

Expand Down Expand Up @@ -472,6 +500,10 @@ func (b *Broker) executeCommand(command Command) {
command.Execute(b)
case UpdateNodeResultCommand:
command.Execute(b)
case SetNodeFailureReasonCommand:
command.Execute(b)
case SetNodeMLNodeOnboardingStateCommand:
command.Execute(b)
default:
logging.Error("Unregistered command type", types.Nodes, "type", reflect.TypeOf(command).String())
}
Expand All @@ -492,6 +524,8 @@ func (b *Broker) QueueMessage(command Command) error {
switch command.(type) {
case StartPocCommand, InitValidateCommand, InferenceUpAllCommand, UpdateNodeResultCommand, SetNodesActualStatusCommand, SetNodeAdminStateCommand, RegisterNode, RemoveNode, StartTrainingCommand, LockNodesForTrainingCommand, SyncNodesCommand:
b.highPriorityCommands <- command
case SetNodeFailureReasonCommand:
b.highPriorityCommands <- command
default:
b.lowPriorityCommands <- command
}
Expand Down Expand Up @@ -1522,6 +1556,9 @@ func (b *Broker) UpdateNodeWithEpochData(epochState *chainphase.EpochState) erro

parentEpochData := parentGroupResp.GetEpochGroupData()

// Calculate current participant weight by scanning validation weights across subgroups
currentWeight := int64(0)

b.clearNodeEpochData()

// 2. Track which nodes are found in epoch data
Expand Down Expand Up @@ -1549,6 +1586,12 @@ func (b *Broker) UpdateNodeWithEpochData(epochState *chainphase.EpochState) erro
for _, weightInfo := range subgroup.ValidationWeights {
// Check if the participant is the one this broker is managing
if weightInfo.MemberAddress == b.participantInfo.GetAddress() {
// Track participant weight (use ConfirmationWeight if present, else Weight)
w := weightInfo.ConfirmationWeight
if w == 0 {
w = weightInfo.Weight
}
currentWeight += w
// 5. Iterate through the ML nodes for this participant in the epoch data
b.UpdateNodeEpochData(weightInfo.MlNodes, modelId, *subgroup.ModelSnapshot)
// Mark these nodes as found in epoch
Expand All @@ -1559,6 +1602,19 @@ func (b *Broker) UpdateNodeWithEpochData(epochState *chainphase.EpochState) erro
}
}

// If participant weight changed, log and update cached weight
if currentWeight != b.lastParticipantWeight {
logging.Info("Participant weight changed", types.Participants, "old", b.lastParticipantWeight, "new", currentWeight, "epoch", epochState.LatestEpoch.EpochIndex)
b.lastParticipantWeight = currentWeight
}

// Store participant weight on each node state for visibility in admin APIs
b.mu.Lock()
for _, node := range b.nodes {
node.State.ParticipantWeight = currentWeight
}
b.mu.Unlock()

// 6. Populate governance models for nodes not in epoch data (disabled nodes)
b.mu.RLock()
nodeIds := make([]string, 0, len(b.nodes))
Expand Down Expand Up @@ -1792,3 +1848,33 @@ func (b *Broker) MergeModelArgs(epochArgs []string, localArgs []string) []string

return mergedArgs
}

func (b *Broker) IsParticipantActiveOnChain() (bool, error) {
resp, err := b.chainBridge.GetCurrentEpochGroupData()
if err != nil {
return false, err
}
if resp == nil {
return false, nil
}
epochIndex := resp.EpochGroupData.EpochIndex
subModels := resp.EpochGroupData.SubGroupModels
addr := b.participantInfo.GetAddress()
for _, mid := range subModels {
subgroupResp, err := b.chainBridge.GetEpochGroupDataByModelId(epochIndex, mid)
if err != nil {
continue
}
if subgroupResp == nil {
continue
}
for _, w := range subgroupResp.EpochGroupData.ValidationWeights {
if w.MemberAddress == addr {
if len(w.MlNodes) > 0 {
return true, nil
}
}
}
}
return false, nil
}
72 changes: 72 additions & 0 deletions decentralized-api/broker/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,27 @@ func (c GetNodesCommand) Execute(b *Broker) {
b.mu.RLock()
defer b.mu.RUnlock()

// Precompute timing information from current epoch state
var (
blocksUntilNextPoC int64
secondsUntilNextPoC int64
currentPhase types.EpochPhase
hasEpochInfo bool
)
epochState := b.phaseTracker.GetCurrentEpochState()
if epochState != nil && epochState.IsSynced {
hasEpochInfo = true
currentPhase = epochState.CurrentPhase
currentHeight := epochState.CurrentBlock.Height
nextPoC := epochState.LatestEpoch.NextPoCStart()
blocksUntilNextPoC = nextPoC - currentHeight
if blocksUntilNextPoC < 0 {
blocksUntilNextPoC = 0
}
// Use default block time constant
secondsUntilNextPoC = int64(float64(blocksUntilNextPoC) * apiconfig.DefaultBlockTimeSeconds)
}

nodeResponses := make([]NodeResponse, 0, len(b.nodes))
for _, nodeWithState := range b.nodes {
// --- Deep copy Node ---
Expand Down Expand Up @@ -99,6 +120,15 @@ func (c GetNodesCommand) Execute(b *Broker) {
Node: nodeCopy,
State: stateCopy,
})
if hasEpochInfo {
shouldOnline := currentPhase == types.PoCGeneratePhase || currentPhase == types.PoCGenerateWindDownPhase || currentPhase == types.PoCValidatePhase || currentPhase == types.PoCValidateWindDownPhase || secondsUntilNextPoC <= apiconfig.OnlineAlertLeadSeconds
nodeResponses[len(nodeResponses)-1].State.Timing = &TimingInfo{
CurrentPhase: string(currentPhase),
BlocksUntilNextPoC: blocksUntilNextPoC,
SecondsUntilNextPoC: secondsUntilNextPoC,
ShouldBeOnline: shouldOnline,
}
}
}
logging.Debug("Got nodes", types.Nodes, "size", len(nodeResponses))
c.Response <- nodeResponses
Expand Down Expand Up @@ -242,6 +272,9 @@ func (c UpdateNodeResultCommand) Execute(b *Broker) {
return
}

prevStatus := node.State.CurrentStatus
prevFailure := node.State.FailureReason

// Update state
logging.Info("Finalizing state transition for node", types.Nodes,
"node_id", c.NodeId,
Expand All @@ -260,6 +293,9 @@ func (c UpdateNodeResultCommand) Execute(b *Broker) {
} else {
// Clear failure reason on success
node.State.FailureReason = ""
if prevFailure != "" {
logging.Info("Node status recovered", types.Nodes, "node_id", c.NodeId, "blockHeight", blockHeight)
}
}

// Reset POC fields when moving away from POC status
Expand All @@ -268,5 +304,41 @@ func (c UpdateNodeResultCommand) Execute(b *Broker) {
node.State.PocCurrentStatus = PocStatusIdle
}

if prevStatus == types.HardwareNodeStatus_POC && c.Result.FinalStatus == types.HardwareNodeStatus_INFERENCE {
logging.Info("Onboarding transition POC->INFERENCE", types.Nodes, "node_id", c.NodeId, "blockHeight", blockHeight)
}

c.Response <- true
}

// SetNodeFailureReasonCommand sets the FailureReason field on a node state directly
// without requiring an in-flight reconciliation.
type SetNodeFailureReasonCommand struct {
NodeId string
Reason string
Response chan bool
}

func NewSetNodeFailureReasonCommand(nodeId string, reason string) SetNodeFailureReasonCommand {
return SetNodeFailureReasonCommand{
NodeId: nodeId,
Reason: reason,
Response: make(chan bool, 2),
}
}

func (c SetNodeFailureReasonCommand) GetResponseChannelCapacity() int { return cap(c.Response) }

func (c SetNodeFailureReasonCommand) Execute(b *Broker) {
b.mu.Lock()
defer b.mu.Unlock()

node, exists := b.nodes[c.NodeId]
if !exists {
logging.Warn("SetNodeFailureReason: node not found", types.Nodes, "node_id", c.NodeId)
c.Response <- false
return
}
node.State.FailureReason = c.Reason
c.Response <- true
}
4 changes: 3 additions & 1 deletion decentralized-api/broker/lock_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ func DoWithLockedNodeHTTPRetry(
}
outcome = InferenceError{Message: msg}
}
_ = b.QueueMessage(ReleaseNode{NodeId: node.Id, Outcome: outcome, Response: make(chan bool, 2)})
if err := b.QueueMessage(ReleaseNode{NodeId: node.Id, Outcome: outcome, Response: make(chan bool, 2)}); err != nil {
logging.Warn("Failed to queue ReleaseNode message", types.Inferences, "node_id", node.Id, "error", err)
}

if retry {
if triggerRecheck {
Expand Down
44 changes: 44 additions & 0 deletions decentralized-api/broker/node_admin_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ func (c RegisterNode) Execute(b *Broker) {

logging.Info("RegisterNode. Registered node", types.Nodes, "node", c.Node)
c.Response <- NodeCommandResponse{Node: &c.Node, Error: nil}

// Auto-test now handled by admin orchestrator
}

// UpdateNode updates an existing node's configuration while preserving runtime state
Expand Down Expand Up @@ -266,6 +268,8 @@ func (c UpdateNode) Execute(b *Broker) {

logging.Info("UpdateNode. Updated node configuration", types.Nodes, "node_id", c.Node.Id)
c.Response <- NodeCommandResponse{Node: &c.Node, Error: nil}

// Auto-test now handled by admin orchestrator
}

type RemoveNode struct {
Expand Down Expand Up @@ -370,3 +374,43 @@ func (c UpdateNodeHardwareCommand) Execute(b *Broker) {
logging.Info("Updated node hardware", types.Nodes, "node_id", c.NodeId, "hardware_count", len(c.Hardware))
c.Response <- nil
}

// SetNodeMLNodeOnboardingStateCommand updates the MLNodeOnboardingState of a node
type SetNodeMLNodeOnboardingStateCommand struct {
NodeId string
NewState string
Response chan bool
}

func NewSetNodeMLNodeOnboardingStateCommand(nodeId string, newState string) SetNodeMLNodeOnboardingStateCommand {
return SetNodeMLNodeOnboardingStateCommand{
NodeId: nodeId,
NewState: newState,
Response: make(chan bool, 2),
}
}

func (c SetNodeMLNodeOnboardingStateCommand) GetResponseChannelCapacity() int {
return cap(c.Response)
}

func (c SetNodeMLNodeOnboardingStateCommand) Execute(b *Broker) {
b.mu.Lock()
defer b.mu.Unlock()

node, exists := b.nodes[c.NodeId]
if !exists {
logging.Error("Cannot set MLNodeOnboardingState: node not found", types.Nodes, "node_id", c.NodeId)
c.Response <- false
return
}

logging.Info("Setting MLNodeOnboardingState for node", types.Nodes,
"node_id", c.NodeId,
"old_state", node.State.MLNodeOnboardingState,
"new_state", c.NewState)

node.State.MLNodeOnboardingState = c.NewState

c.Response <- true
}
Loading
Loading