3535_collector_alerts : list [dict [str , str ]] = []
3636
3737
38+ def _safe_json (raw : str , fallback : Any = None ) -> Any :
39+ """Parse JSON with a fallback so one malformed DB row doesn't 500 the fleet."""
40+ try :
41+ return json .loads (raw )
42+ except (json .JSONDecodeError , TypeError ):
43+ return fallback if fallback is not None else []
44+
45+
3846async def _get_all_worker_containers () -> list [dict [str , Any ]]:
39- """Collect container data from all online workers' heartbeat data in DB."""
47+ """Collect container/app data from all online workers' heartbeat data in DB."""
4048 workers = await database .list_workers ()
4149 result : list [dict [str , Any ]] = []
4250 for w in workers :
4351 if w .get ("status" ) != "online" :
4452 continue
45- sys_info = json . loads (w .get ("system_info" , "{}" ))
53+ sys_info = _safe_json (w .get ("system_info" , "{}" ), {} )
4654 worker_has_docker = sys_info .get ("docker_available" , False )
47- containers = json .loads (w .get ("containers" , "[]" ))
55+ is_android = sys_info .get ("device_type" ) == "android"
56+ worker_name = w .get ("name" , "worker" )
57+
58+ # Docker containers (from Docker-based workers)
59+ containers = _safe_json (w .get ("containers" , "[]" ))
4860 for c in containers :
4961 slug = c .get ("slug" , "" )
5062 if slug :
@@ -57,12 +69,38 @@ async def _get_all_worker_containers() -> list[dict[str, Any]]:
5769 "cpu_percent" : c .get ("cpu_percent" , 0 ),
5870 "memory_mb" : c .get ("memory_mb" , 0 ),
5971 "category" : "" ,
60- "deployed_by" : w . get ( "name" , "worker" ) ,
61- "_node" : w . get ( "name" , "worker" ) ,
72+ "deployed_by" : worker_name ,
73+ "_node" : worker_name ,
6274 "_worker_id" : w .get ("id" ),
6375 "_has_docker" : worker_has_docker ,
76+ "_is_android" : False ,
6477 }
6578 )
79+
80+ # Android apps (from Android workers)
81+ if is_android :
82+ apps = _safe_json (w .get ("apps" , "[]" ))
83+ for a in apps :
84+ slug = a .get ("slug" , "" )
85+ if slug :
86+ result .append (
87+ {
88+ "slug" : slug ,
89+ "name" : a .get ("slug" , slug ),
90+ "status" : "running" if a .get ("running" ) else "stopped" ,
91+ "image" : "" ,
92+ "cpu_percent" : 0 ,
93+ "memory_mb" : 0 ,
94+ "category" : "" ,
95+ "deployed_by" : worker_name ,
96+ "_node" : worker_name ,
97+ "_worker_id" : w .get ("id" ),
98+ "_has_docker" : False ,
99+ "_is_android" : True ,
100+ "_net_tx_24h" : a .get ("net_tx_24h" , 0 ),
101+ "_net_rx_24h" : a .get ("net_rx_24h" , 0 ),
102+ }
103+ )
66104 return result
67105
68106
@@ -494,34 +532,7 @@ async def api_services_deployed(request: Request) -> list[dict[str, Any]]:
494532 details for the expandable sub-row UI.
495533 """
496534 _require_auth_api (request )
497- statuses : list [dict [str , Any ]] = []
498-
499- # Collect containers from all workers
500- workers = await database .list_workers ()
501- for w in workers :
502- if w .get ("status" ) != "online" :
503- continue
504- sys_info = json .loads (w .get ("system_info" , "{}" ))
505- worker_has_docker = sys_info .get ("docker_available" , False )
506- containers = json .loads (w .get ("containers" , "[]" ))
507- for c in containers :
508- slug = c .get ("slug" , "" )
509- if slug :
510- statuses .append (
511- {
512- "slug" : slug ,
513- "name" : c .get ("name" , slug ),
514- "status" : c .get ("status" , "unknown" ),
515- "image" : c .get ("image" , "" ),
516- "cpu_percent" : c .get ("cpu_percent" , 0 ),
517- "memory_mb" : c .get ("memory_mb" , 0 ),
518- "category" : "" ,
519- "deployed_by" : w .get ("name" , "worker" ),
520- "_node" : w .get ("name" , "worker" ),
521- "_worker_id" : w .get ("id" ),
522- "_has_docker" : worker_has_docker ,
523- }
524- )
535+ statuses : list [dict [str , Any ]] = await _get_all_worker_containers ()
525536
526537 # Get latest earnings per platform for balance display
527538 earnings = await database .get_earnings_summary ()
@@ -564,17 +575,20 @@ async def api_services_deployed(request: Request) -> list[dict[str, Any]]:
564575 # Build per-instance detail list (local first)
565576 instance_details = []
566577 for inst in agg ["instances" ]:
567- instance_details .append (
568- {
569- "node" : inst .get ("_node" , "unknown" ),
570- "worker_id" : inst .get ("_worker_id" ),
571- "status" : inst .get ("status" , "unknown" ),
572- "cpu" : f"{ float (inst .get ('cpu_percent' , 0 )):.2f} " ,
573- "memory" : f"{ float (inst .get ('memory_mb' , 0 )):.1f} MB" ,
574- "container_name" : inst .get ("name" , "" ),
575- "has_docker" : inst .get ("_has_docker" , False ),
576- }
577- )
578+ detail = {
579+ "node" : inst .get ("_node" , "unknown" ),
580+ "worker_id" : inst .get ("_worker_id" ),
581+ "status" : inst .get ("status" , "unknown" ),
582+ "cpu" : f"{ float (inst .get ('cpu_percent' , 0 )):.2f} " ,
583+ "memory" : f"{ float (inst .get ('memory_mb' , 0 )):.1f} MB" ,
584+ "container_name" : inst .get ("name" , "" ),
585+ "has_docker" : inst .get ("_has_docker" , False ),
586+ "is_android" : inst .get ("_is_android" , False ),
587+ }
588+ if inst .get ("_is_android" ):
589+ detail ["net_tx_24h" ] = inst .get ("_net_tx_24h" , 0 )
590+ detail ["net_rx_24h" ] = inst .get ("_net_rx_24h" , 0 )
591+ instance_details .append (detail )
578592 # Sort: local first, then alphabetically by node name
579593 instance_details .sort (key = lambda x : (0 if x ["node" ] == "local" else 1 , x ["node" ]))
580594
@@ -1425,18 +1439,24 @@ def _verify_fleet_api_key(request: Request) -> None:
14251439class WorkerHeartbeat (BaseModel ):
14261440 name : str
14271441 url : str = ""
1442+ client_id : str = ""
14281443 containers : list [dict [str , Any ]] = []
1444+ apps : list [dict [str , Any ]] = []
14291445 system_info : dict [str , Any ] = {}
14301446
14311447
14321448@app .post ("/api/workers/heartbeat" )
14331449async def api_worker_heartbeat (request : Request , body : WorkerHeartbeat ) -> dict [str , Any ]:
14341450 """Receive a heartbeat from a worker. Registers or updates the worker."""
14351451 _verify_fleet_api_key (request )
1452+ # Use client_id for identity; fall back to name for backward compat
1453+ cid = body .client_id or body .name
14361454 worker_id = await database .upsert_worker (
1455+ client_id = cid ,
14371456 name = body .name ,
14381457 url = body .url ,
14391458 containers = json .dumps (body .containers ),
1459+ apps = json .dumps (body .apps ),
14401460 system_info = json .dumps (body .system_info ),
14411461 )
14421462 return {"status" : "ok" , "worker_id" : worker_id }
@@ -1448,12 +1468,22 @@ async def api_list_workers(request: Request) -> list[dict[str, Any]]:
14481468 _require_auth_api (request )
14491469 workers = await database .list_workers ()
14501470 for w in workers :
1451- # Parse stored JSON for the API response
1452- w ["containers" ] = json .loads (w .get ("containers" , "[]" ))
1453- w ["system_info" ] = json .loads (w .get ("system_info" , "{}" ))
1471+ _parse_worker_json (w )
1472+ return workers
1473+
1474+
1475+ def _parse_worker_json (w : dict [str , Any ]) -> None :
1476+ """Parse stored JSON columns and compute counts for a worker dict."""
1477+ w ["containers" ] = _safe_json (w .get ("containers" , "[]" ))
1478+ w ["apps" ] = _safe_json (w .get ("apps" , "[]" ))
1479+ w ["system_info" ] = _safe_json (w .get ("system_info" , "{}" ), {})
1480+ is_android = w ["system_info" ].get ("device_type" ) == "android"
1481+ if is_android :
1482+ w ["container_count" ] = len (w ["apps" ])
1483+ w ["running_count" ] = sum (1 for a in w ["apps" ] if a .get ("running" ))
1484+ else :
14541485 w ["container_count" ] = len (w ["containers" ])
14551486 w ["running_count" ] = sum (1 for c in w ["containers" ] if c .get ("status" ) == "running" )
1456- return workers
14571487
14581488
14591489@app .get ("/api/workers/{worker_id}" )
@@ -1463,10 +1493,7 @@ async def api_get_worker(request: Request, worker_id: int) -> dict[str, Any]:
14631493 worker = await database .get_worker (worker_id )
14641494 if not worker :
14651495 raise HTTPException (status_code = 404 , detail = "Worker not found" )
1466- worker ["containers" ] = json .loads (worker .get ("containers" , "[]" ))
1467- worker ["system_info" ] = json .loads (worker .get ("system_info" , "{}" ))
1468- worker ["container_count" ] = len (worker ["containers" ])
1469- worker ["running_count" ] = sum (1 for c in worker ["containers" ] if c .get ("status" ) == "running" )
1496+ _parse_worker_json (worker )
14701497 return worker
14711498
14721499
@@ -1539,21 +1566,22 @@ async def api_fleet_summary(request: Request) -> dict[str, Any]:
15391566 _require_auth_api (request )
15401567
15411568 workers = await database .list_workers ()
1542- total_containers = 0
1569+ total_services = 0
15431570 total_running = 0
15441571 online_workers = 0
15451572
15461573 for w in workers :
1547- containers = json .loads (w .get ("containers" , "[]" ))
1548- total_containers += len (containers )
1549- total_running += sum (1 for c in containers if c .get ("status" ) == "running" )
1550- if w ["status" ] == "online" :
1551- online_workers += 1
1574+ if w ["status" ] != "online" :
1575+ continue
1576+ online_workers += 1
1577+ _parse_worker_json (w )
1578+ total_services += w ["container_count" ]
1579+ total_running += w ["running_count" ]
15521580
15531581 return {
15541582 "total_workers" : len (workers ),
15551583 "online_workers" : online_workers ,
1556- "total_containers" : total_containers ,
1584+ "total_containers" : total_services ,
15571585 "running_containers" : total_running ,
15581586 }
15591587
0 commit comments