Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ All notable changes to this project will be documented in this file.
- Client
- Cache network interface index/name lookups in liveness UDP service to fix high CPU usage caused by per-packet RTM_GETLINK netlink dumps
- Add observability to BGP handleUpdate: log withdrawal/NLRI counts per batch and track processing duration via `doublezero_bgp_handle_update_duration_seconds` histogram
- Device Health Oracle
- Add device-health-oracle to provisioning workflow, matching current behavior by promoting link and device health to ready without actually performing any health checks
- E2E tests
- The QA alldevices test now skips devices that are not calling the controller
- e2e: Expand RFC11 end-to-end testing ([#2801](https://github.com/malbeclabs/doublezero/pull/2801))
Expand Down Expand Up @@ -281,12 +283,11 @@ All notable changes to this project will be documented in this file.
- feat(smartcontract): add use_onchain_deallocation flag to MulticastGroup ([#2748](https://github.com/malbeclabs/doublezero/pull/2748))
- CLI
- Remove restriction for a single tunnel per user; now a user can have a unicast and multicast tunnel concurrently (but can only be a publisher _or_ a subscriber) ([2728](https://github.com/malbeclabs/doublezero/pull/2728))

## [v0.8.3](https://github.com/malbeclabs/doublezero/compare/client/v0.8.2...client/v0.8.3) – 2026-01-22

- Data
- Add indexer that syncs serviceability and telemetry data to ClickHouse and Neo4J

## [v0.8.3](https://github.com/malbeclabs/doublezero/compare/client/v0.8.2...client/v0.8.3) – 2026-01-22

### Breaking

- None for this release
Expand Down
35 changes: 26 additions & 9 deletions controlplane/device-health-oracle/cmd/device-health-oracle/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ var (
ledgerRPCURL = flag.String("ledger-rpc-url", "", "the url of the ledger rpc")
serviceabilityProgramID = flag.String("serviceability-program-id", "", "the id of the serviceability program")
telemetryProgramID = flag.String("telemetry-program-id", "", "the id of the telemetry program")
signerKeypairPath = flag.String("signer-keypair", "", "path to the signer keypair file (JSON array format)")
slackWebhookURL = flag.String("slack-webhook-url", "", "The Slack webhook URL to send alerts")
provisioningSlotCount = flag.Uint64("provisioning-slot-count", defaultProvisioningSlotCount, "Burn-in slot count for new devices/links (~20 hours at 200000)")
drainedSlotCount = flag.Uint64("drained-slot-count", defaultDrainedSlotCount, "Burn-in slot count for reactivated devices/links (~30 min at 5000)")
Expand Down Expand Up @@ -109,9 +110,23 @@ func main() {
}
}

// Parse and validate signer keypair.
if *signerKeypairPath == "" {
log.Error("Missing required flag", "flag", "signer-keypair")
flag.Usage()
os.Exit(1)
}
signer, err := solana.PrivateKeyFromSolanaKeygenFile(*signerKeypairPath)
if err != nil {
log.Error("Failed to load signer keypair", "path", *signerKeypairPath, "error", err)
os.Exit(1)
}
log.Info("Signer public key", "pubkey", signer.PublicKey().String())

// Initialize ledger clients.
rpcClient := solanarpc.New(networkConfig.LedgerPublicRPCURL)
serviceabilityClient := serviceability.New(rpcClient, networkConfig.ServiceabilityProgramID)
serviceabilityExecutor := serviceability.NewExecutor(log, rpcClient, &signer, networkConfig.ServiceabilityProgramID)
telemetryClient := telemetry.New(log, rpcClient, nil, networkConfig.TelemetryProgramID)

worker.MetricBuildInfo.WithLabelValues(version, commit, date).Set(1)
Expand All @@ -129,15 +144,17 @@ func main() {
}()

w, err := worker.New(&worker.Config{
Logger: log,
LedgerRPCClient: rpcClient,
Serviceability: serviceabilityClient,
Telemetry: telemetryClient,
Interval: *interval,
SlackWebhookURL: *slackWebhookURL,
Env: *env,
ProvisioningSlotCount: *provisioningSlotCount,
DrainedSlotCount: *drainedSlotCount,
Logger: log,
LedgerRPCClient: rpcClient,
Serviceability: serviceabilityClient,
ServiceabilityExecutor: serviceabilityExecutor,
ServiceabilityProgramID: networkConfig.ServiceabilityProgramID,
Telemetry: telemetryClient,
Interval: *interval,
SlackWebhookURL: *slackWebhookURL,
Env: *env,
ProvisioningSlotCount: *provisioningSlotCount,
DrainedSlotCount: *drainedSlotCount,
})
if err != nil {
log.Error("Failed to create worker", "error", err)
Expand Down
27 changes: 20 additions & 7 deletions controlplane/device-health-oracle/internal/worker/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,25 @@ type ServiceabilityClient interface {
GetProgramData(context.Context) (*serviceability.ProgramData, error)
}

type ServiceabilityExecutor interface {
SetDeviceHealthBatch(ctx context.Context, updates []serviceability.DeviceHealthUpdate, globalStatePubkey solana.PublicKey) (solana.Signature, error)
SetLinkHealthBatch(ctx context.Context, updates []serviceability.LinkHealthUpdate, globalStatePubkey solana.PublicKey) (solana.Signature, error)
}

type TelemetryProgramClient interface {
GetDeviceLatencySamples(ctx context.Context, originDevicePubKey, targetDevicePubKey, linkPubKey solana.PublicKey, epoch uint64) (*telemetry.DeviceLatencySamples, error)
}

type Config struct {
Logger *slog.Logger
LedgerRPCClient LedgerRPCClient
Serviceability ServiceabilityClient
Telemetry TelemetryProgramClient
Interval time.Duration
SlackWebhookURL string
Env string
Logger *slog.Logger
LedgerRPCClient LedgerRPCClient
Serviceability ServiceabilityClient
ServiceabilityExecutor ServiceabilityExecutor
ServiceabilityProgramID solana.PublicKey
Telemetry TelemetryProgramClient
Interval time.Duration
SlackWebhookURL string
Env string

// Burn-in slot counts for devices/links.
// ProvisioningSlotCount is used for new devices/links (status = Provisioning, DeviceProvisioning, LinkProvisioning).
Expand All @@ -52,6 +59,12 @@ func (c *Config) Validate() error {
if c.Serviceability == nil {
return errors.New("serviceability client is required")
}
if c.ServiceabilityExecutor == nil {
return errors.New("serviceability executor is required")
}
if c.ServiceabilityProgramID.IsZero() {
return errors.New("serviceability program ID is required")
}
if c.Telemetry == nil {
return errors.New("telemetry client is required")
}
Expand Down
158 changes: 144 additions & 14 deletions controlplane/device-health-oracle/internal/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,24 @@ import (
"log/slog"
"time"

"github.com/gagliardetto/solana-go"
solanarpc "github.com/gagliardetto/solana-go/rpc"
"github.com/malbeclabs/doublezero/smartcontract/sdk/go/serviceability"
)

// Processing in batches greatly speeds up e2e tests since large numbers of link and devices records are created at the same time.
// We set maxBatchSize to 8 because Solana transactions are limited to 1232 bytes:
// Transaction overhead:
// - Blockhash: 32 bytes
// - Signatures: 64 bytes per signer (we have 1)
// - Message header: ~3 bytes
// - Account keys array (deduplicated)
// - Compact-u16 length prefixes
//
// Calculation: With the accounts being partially deduplicated (globalState, signer, and systemProgram are shared across all instructions), each additional instruction adds roughly ~100-120 bytes.
// With transaction overhead of ~200-300 bytes, we can fit approximately 8-10 instructions before hitting Solana's 1232-byte limit.
const maxBatchSize = 8

type Worker struct {
log *slog.Logger
cfg *Config
Expand Down Expand Up @@ -48,27 +63,142 @@ func (w *Worker) tick(ctx context.Context) {
return
}

provisioningSlot := currentSlot - w.cfg.ProvisioningSlotCount
drainedSlot := currentSlot - w.cfg.DrainedSlotCount
// Calculate burn-in slots, handling underflow for recently created environments
var provisioningSlot, drainedSlot uint64
if currentSlot > w.cfg.ProvisioningSlotCount {
provisioningSlot = currentSlot - w.cfg.ProvisioningSlotCount
}
if currentSlot > w.cfg.DrainedSlotCount {
drainedSlot = currentSlot - w.cfg.DrainedSlotCount
}

provisioningTime, err := w.cfg.LedgerRPCClient.GetBlockTime(ctx, provisioningSlot)
w.log.Info("Device health oracle tick",
"currentSlot", currentSlot,
"provisioningSlotCount", w.cfg.ProvisioningSlotCount,
"provisioningSlot", provisioningSlot,
"drainedSlotCount", w.cfg.DrainedSlotCount,
"drainedSlot", drainedSlot)

programData, err := w.cfg.Serviceability.GetProgramData(ctx)
if err != nil {
w.log.Error("Failed to get block time for provisioning slot", "slot", provisioningSlot, "error", err)
w.log.Error("Failed to get program data", "error", err)
return
}

drainedTime, err := w.cfg.LedgerRPCClient.GetBlockTime(ctx, drainedSlot)
globalStatePubkey, _, err := serviceability.GetGlobalStatePDA(w.cfg.ServiceabilityProgramID)
if err != nil {
w.log.Error("Failed to get block time for drained slot", "slot", drainedSlot, "error", err)
w.log.Error("Failed to get globalstate PDA", "error", err)
return
}

w.log.Info("Device health oracle tick",
"currentSlot", currentSlot,
"provisioningSlotCount", w.cfg.ProvisioningSlotCount,
"provisioningSlot", provisioningSlot,
"provisioningTime", provisioningTime.Time(),
"drainedSlotCount", w.cfg.DrainedSlotCount,
"drainedSlot", drainedSlot,
"drainedTime", drainedTime.Time())
w.updatePendingDeviceHealth(ctx, programData.Devices, globalStatePubkey)
w.updatePendingLinkHealth(ctx, programData.Links, globalStatePubkey)
}

func (w *Worker) updatePendingDeviceHealth(ctx context.Context, devices []serviceability.Device, globalStatePubkey solana.PublicKey) {
w.log.Debug("Processing devices", "count", len(devices))

// Collect devices that need health updates
var updates []serviceability.DeviceHealthUpdate
for _, device := range devices {
devicePubkey := solana.PublicKeyFromBytes(device.PubKey[:])
w.log.Debug("Device state",
"device", devicePubkey.String(),
"code", device.Code,
"status", device.Status,
"statusValue", int(device.Status),
"health", device.DeviceHealth,
"healthValue", int(device.DeviceHealth))

// Only update health for devices in a provisioning state
if device.Status != serviceability.DeviceStatusDeviceProvisioning &&
device.Status != serviceability.DeviceStatusLinkProvisioning {
continue
}

if device.DeviceHealth == serviceability.DeviceHealthReadyForUsers ||
device.DeviceHealth == serviceability.DeviceHealthReadyForLinks {
continue
}

updates = append(updates, serviceability.DeviceHealthUpdate{
DevicePubkey: devicePubkey,
Health: serviceability.DeviceHealthReadyForUsers,
})
w.log.Info("Queuing device health update",
"device", devicePubkey.String(),
"code", device.Code,
"status", device.Status.String())
}

if len(updates) == 0 {
return
}

for i := 0; i < len(updates); i += maxBatchSize {
end := i + maxBatchSize
if end > len(updates) {
end = len(updates)
}
batch := updates[i:end]

w.log.Info("Sending batched device health update", "batchSize", len(batch), "batchNum", i/maxBatchSize+1)

sig, err := w.cfg.ServiceabilityExecutor.SetDeviceHealthBatch(ctx, batch, globalStatePubkey)
if err != nil {
w.log.Error("Failed to set device health batch", "error", err)
continue
}

w.log.Info("Device health batch updated", "count", len(batch), "signature", sig.String())
}
}

func (w *Worker) updatePendingLinkHealth(ctx context.Context, links []serviceability.Link, globalStatePubkey solana.PublicKey) {
// Collect links that need health updates
var updates []serviceability.LinkHealthUpdate
for _, link := range links {
// Update health for links in provisioning or drained states
if link.Status != serviceability.LinkStatusProvisioning &&
link.Status != serviceability.LinkStatusSoftDrained &&
link.Status != serviceability.LinkStatusHardDrained {
continue
}

if link.LinkHealth == serviceability.LinkHealthReadyForService {
continue
}

linkPubkey := solana.PublicKeyFromBytes(link.PubKey[:])
updates = append(updates, serviceability.LinkHealthUpdate{
LinkPubkey: linkPubkey,
Health: serviceability.LinkHealthReadyForService,
})
w.log.Info("Queuing link health update",
"link", linkPubkey.String(),
"code", link.Code,
"status", link.Status.String())
}

if len(updates) == 0 {
return
}

for i := 0; i < len(updates); i += maxBatchSize {
end := i + maxBatchSize
if end > len(updates) {
end = len(updates)
}
batch := updates[i:end]

w.log.Info("Sending batched link health update", "batchSize", len(batch), "batchNum", i/maxBatchSize+1)

sig, err := w.cfg.ServiceabilityExecutor.SetLinkHealthBatch(ctx, batch, globalStatePubkey)
if err != nil {
w.log.Error("Failed to set link health batch", "error", err)
continue
}

w.log.Info("Link health batch updated", "count", len(batch), "signature", sig.String())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestCompare(t *testing.T) {
{
name: "multiple event types",
before: []serviceability.Device{newTestDevice("1", serviceability.DeviceStatusActivated), newTestDevice("2", serviceability.DeviceStatusPending)}, // 2 is removed
after: []serviceability.Device{newTestDevice("1", serviceability.DeviceStatusSuspended), newTestDevice("3", serviceability.DeviceStatusPending)}, // 1 is modified, 3 is added
after: []serviceability.Device{newTestDevice("1", serviceability.DeviceStatusDrained), newTestDevice("3", serviceability.DeviceStatusPending)}, // 1 is modified, 3 is added
expectedAdded: 1,
expectedRemoved: 1,
expectedModified: 1,
Expand Down
6 changes: 3 additions & 3 deletions controlplane/telemetry/internal/telemetry/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func TestAgentTelemetry_PeerDiscovery_Ledger(t *testing.T) {
requireUnorderedEqual(t, expected, peerDiscovery.GetPeers())
})

t.Run("skips suspended and rejected links", func(t *testing.T) {
t.Run("skips deleting and rejected links", func(t *testing.T) {
t.Parallel()

log := log.With("test", t.Name())
Expand All @@ -378,8 +378,8 @@ func TestAgentTelemetry_PeerDiscovery_Ledger(t *testing.T) {
},
Links: []serviceability.Link{
{
PubKey: stringToPubkey("suspended_link"),
Status: serviceability.LinkStatusSuspended,
PubKey: stringToPubkey("deleting_link"),
Status: serviceability.LinkStatusDeleting,
SideAPubKey: localDevicePK,
SideZPubKey: stringToPubkey("device2"),
TunnelNet: [5]uint8{10, 1, 5, 0, 31},
Expand Down
6 changes: 5 additions & 1 deletion e2e/docker/device-health-oracle/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ if [ -z "${DZ_TELEMETRY_PROGRAM_ID}" ]; then
echo "DZ_TELEMETRY_PROGRAM_ID is not set"
exit 1
fi
if [ -z "${DZ_SIGNER_KEYPAIR}" ]; then
echo "DZ_SIGNER_KEYPAIR is not set"
exit 1
fi

# Start Alloy in background if ALLOY_PROMETHEUS_URL is set
if [ -n "${ALLOY_PROMETHEUS_URL:-}" ]; then
Expand All @@ -22,4 +26,4 @@ if [ -n "${ALLOY_PROMETHEUS_URL:-}" ]; then
fi

# start device-health-oracle
device-health-oracle -ledger-rpc-url ${DZ_LEDGER_URL} -serviceability-program-id ${DZ_SERVICEABILITY_PROGRAM_ID} -telemetry-program-id ${DZ_TELEMETRY_PROGRAM_ID} -metrics-addr ":2112" -interval ${DZ_INTERVAL:-1m}
device-health-oracle -ledger-rpc-url ${DZ_LEDGER_URL} -serviceability-program-id ${DZ_SERVICEABILITY_PROGRAM_ID} -telemetry-program-id ${DZ_TELEMETRY_PROGRAM_ID} -signer-keypair ${DZ_SIGNER_KEYPAIR} -metrics-addr ":2112" -interval ${DZ_INTERVAL:-1m}
9 changes: 9 additions & 0 deletions e2e/internal/devnet/device_health_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,19 @@ func (d *DeviceHealthOracle) Start(ctx context.Context) error {
"DZ_SERVICEABILITY_PROGRAM_ID": d.dn.Manager.ServiceabilityProgramID,
"DZ_TELEMETRY_PROGRAM_ID": d.dn.Manager.TelemetryProgramID,
"DZ_INTERVAL": d.dn.Spec.DeviceHealthOracle.Interval.String(),
"DZ_SIGNER_KEYPAIR": containerSolanaKeypairPath,
}
if d.dn.Prometheus != nil && d.dn.Prometheus.InternalURL != "" {
env["ALLOY_PROMETHEUS_URL"] = d.dn.Prometheus.InternalRemoteWriteURL()
}

containerFiles := []testcontainers.ContainerFile{
{
HostFilePath: d.dn.Spec.Manager.ManagerKeypairPath,
ContainerFilePath: containerSolanaKeypairPath,
},
}

req := testcontainers.ContainerRequest{
Image: d.dn.Spec.DeviceHealthOracle.ContainerImage,
Name: d.dockerContainerName(),
Expand All @@ -129,6 +137,7 @@ func (d *DeviceHealthOracle) Start(ctx context.Context) error {
Memory: defaultContainerMemory,
},
Labels: d.dn.labels,
Files: containerFiles,
}
container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Expand Down
Loading
Loading