Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 46 additions & 9 deletions app/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,12 @@ def decrypt_value(value: str) -> str:

CREATE TABLE IF NOT EXISTS workers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
client_id TEXT NOT NULL UNIQUE,
name TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'online',
containers TEXT NOT NULL DEFAULT '[]',
apps TEXT NOT NULL DEFAULT '[]',
system_info TEXT NOT NULL DEFAULT '{}',
last_heartbeat TEXT,
registered_at TEXT NOT NULL DEFAULT (datetime('now'))
Expand Down Expand Up @@ -177,6 +179,38 @@ async def init_db() -> None:
db = await _get_db()
try:
await db.executescript(_SCHEMA)
# Migrate workers table: add client_id (UNIQUE) and apps columns
cursor = await db.execute("PRAGMA table_info(workers)")
cols = {row["name"] for row in await cursor.fetchall()}
if "client_id" not in cols:
# Rebuild table: UNIQUE moves from name → client_id, name becomes display-only.
# Existing rows get client_id = name for backward compat.
has_apps = "apps" in cols
apps_select = "apps" if has_apps else "'[]'"
_logger.info("Migrating workers table: adding client_id column")
await db.executescript(f"""
CREATE TABLE workers_new (
id INTEGER PRIMARY KEY AUTOINCREMENT,
client_id TEXT NOT NULL UNIQUE,
name TEXT NOT NULL DEFAULT '',
url TEXT NOT NULL DEFAULT '',
status TEXT NOT NULL DEFAULT 'online',
containers TEXT NOT NULL DEFAULT '[]',
apps TEXT NOT NULL DEFAULT '[]',
system_info TEXT NOT NULL DEFAULT '{{}}',
last_heartbeat TEXT,
registered_at TEXT NOT NULL DEFAULT (datetime('now'))
);
INSERT INTO workers_new
(id, client_id, name, url, status, containers, apps, system_info, last_heartbeat, registered_at)
SELECT id, name, name, url, status, containers, {apps_select}, system_info, last_heartbeat, registered_at
FROM workers;
DROP TABLE workers;
ALTER TABLE workers_new RENAME TO workers;
CREATE INDEX IF NOT EXISTS idx_workers_status ON workers (status);
""")
elif "apps" not in cols:
await db.execute("ALTER TABLE workers ADD COLUMN apps TEXT NOT NULL DEFAULT '[]'")
await db.commit()
finally:
await db.close()
Expand Down Expand Up @@ -697,30 +731,33 @@ async def mark_setup_completed(user_id: int) -> None:


async def upsert_worker(
name: str,
client_id: str,
name: str = "",
url: str = "",
containers: str = "[]",
apps: str = "[]",
system_info: str = "{}",
) -> int:
"""Register or update a worker by name. Returns the worker ID."""
"""Register or update a worker by client_id. Returns the worker ID."""
db = await _get_db()
try:
cursor = await db.execute(
"""
INSERT INTO workers (name, url, containers, system_info, status, last_heartbeat)
VALUES (?, ?, ?, ?, 'online', datetime('now'))
ON CONFLICT(name) DO UPDATE SET
INSERT INTO workers (client_id, name, url, containers, apps, system_info, status, last_heartbeat)
VALUES (?, ?, ?, ?, ?, ?, 'online', datetime('now'))
ON CONFLICT(client_id) DO UPDATE SET
name = excluded.name,
url = excluded.url,
containers = excluded.containers,
apps = excluded.apps,
system_info = excluded.system_info,
status = 'online',
last_heartbeat = datetime('now')
""",
(name, url, containers, system_info),
(client_id, name, url, containers, apps, system_info),
)
await db.commit()
# Return the worker ID (either new or existing)
cursor = await db.execute("SELECT id FROM workers WHERE name = ?", (name,))
cursor = await db.execute("SELECT id FROM workers WHERE client_id = ?", (client_id,))
row = await cursor.fetchone()
return row["id"]
finally:
Expand Down
146 changes: 87 additions & 59 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,28 @@
_collector_alerts: list[dict[str, str]] = []


def _safe_json(raw: str, fallback: Any = None) -> Any:
"""Parse JSON with a fallback so one malformed DB row doesn't 500 the fleet."""
try:
return json.loads(raw)
except (json.JSONDecodeError, TypeError):
return fallback if fallback is not None else []


async def _get_all_worker_containers() -> list[dict[str, Any]]:
"""Collect container data from all online workers' heartbeat data in DB."""
"""Collect container/app data from all online workers' heartbeat data in DB."""
workers = await database.list_workers()
result: list[dict[str, Any]] = []
for w in workers:
if w.get("status") != "online":
continue
sys_info = json.loads(w.get("system_info", "{}"))
sys_info = _safe_json(w.get("system_info", "{}"), {})
worker_has_docker = sys_info.get("docker_available", False)
containers = json.loads(w.get("containers", "[]"))
is_android = sys_info.get("device_type") == "android"
worker_name = w.get("name", "worker")

# Docker containers (from Docker-based workers)
containers = _safe_json(w.get("containers", "[]"))
for c in containers:
slug = c.get("slug", "")
if slug:
Expand All @@ -57,12 +69,38 @@ async def _get_all_worker_containers() -> list[dict[str, Any]]:
"cpu_percent": c.get("cpu_percent", 0),
"memory_mb": c.get("memory_mb", 0),
"category": "",
"deployed_by": w.get("name", "worker"),
"_node": w.get("name", "worker"),
"deployed_by": worker_name,
"_node": worker_name,
"_worker_id": w.get("id"),
"_has_docker": worker_has_docker,
"_is_android": False,
}
)

# Android apps (from Android workers)
if is_android:
apps = _safe_json(w.get("apps", "[]"))
for a in apps:
slug = a.get("slug", "")
if slug:
result.append(
{
"slug": slug,
"name": a.get("slug", slug),
"status": "running" if a.get("running") else "stopped",
"image": "",
"cpu_percent": 0,
"memory_mb": 0,
"category": "",
"deployed_by": worker_name,
"_node": worker_name,
"_worker_id": w.get("id"),
"_has_docker": False,
"_is_android": True,
"_net_tx_24h": a.get("net_tx_24h", 0),
"_net_rx_24h": a.get("net_rx_24h", 0),
}
)
return result


Expand Down Expand Up @@ -494,34 +532,7 @@ async def api_services_deployed(request: Request) -> list[dict[str, Any]]:
details for the expandable sub-row UI.
"""
_require_auth_api(request)
statuses: list[dict[str, Any]] = []

# Collect containers from all workers
workers = await database.list_workers()
for w in workers:
if w.get("status") != "online":
continue
sys_info = json.loads(w.get("system_info", "{}"))
worker_has_docker = sys_info.get("docker_available", False)
containers = json.loads(w.get("containers", "[]"))
for c in containers:
slug = c.get("slug", "")
if slug:
statuses.append(
{
"slug": slug,
"name": c.get("name", slug),
"status": c.get("status", "unknown"),
"image": c.get("image", ""),
"cpu_percent": c.get("cpu_percent", 0),
"memory_mb": c.get("memory_mb", 0),
"category": "",
"deployed_by": w.get("name", "worker"),
"_node": w.get("name", "worker"),
"_worker_id": w.get("id"),
"_has_docker": worker_has_docker,
}
)
statuses: list[dict[str, Any]] = await _get_all_worker_containers()

# Get latest earnings per platform for balance display
earnings = await database.get_earnings_summary()
Expand Down Expand Up @@ -564,17 +575,20 @@ async def api_services_deployed(request: Request) -> list[dict[str, Any]]:
# Build per-instance detail list (local first)
instance_details = []
for inst in agg["instances"]:
instance_details.append(
{
"node": inst.get("_node", "unknown"),
"worker_id": inst.get("_worker_id"),
"status": inst.get("status", "unknown"),
"cpu": f"{float(inst.get('cpu_percent', 0)):.2f}",
"memory": f"{float(inst.get('memory_mb', 0)):.1f} MB",
"container_name": inst.get("name", ""),
"has_docker": inst.get("_has_docker", False),
}
)
detail = {
"node": inst.get("_node", "unknown"),
"worker_id": inst.get("_worker_id"),
"status": inst.get("status", "unknown"),
"cpu": f"{float(inst.get('cpu_percent', 0)):.2f}",
"memory": f"{float(inst.get('memory_mb', 0)):.1f} MB",
"container_name": inst.get("name", ""),
"has_docker": inst.get("_has_docker", False),
"is_android": inst.get("_is_android", False),
}
if inst.get("_is_android"):
detail["net_tx_24h"] = inst.get("_net_tx_24h", 0)
detail["net_rx_24h"] = inst.get("_net_rx_24h", 0)
instance_details.append(detail)
# Sort: local first, then alphabetically by node name
instance_details.sort(key=lambda x: (0 if x["node"] == "local" else 1, x["node"]))

Expand Down Expand Up @@ -1425,18 +1439,24 @@ def _verify_fleet_api_key(request: Request) -> None:
class WorkerHeartbeat(BaseModel):
name: str
url: str = ""
client_id: str = ""
containers: list[dict[str, Any]] = []
apps: list[dict[str, Any]] = []
system_info: dict[str, Any] = {}


@app.post("/api/workers/heartbeat")
async def api_worker_heartbeat(request: Request, body: WorkerHeartbeat) -> dict[str, Any]:
"""Receive a heartbeat from a worker. Registers or updates the worker."""
_verify_fleet_api_key(request)
# Use client_id for identity; fall back to name for backward compat
cid = body.client_id or body.name
worker_id = await database.upsert_worker(
client_id=cid,
name=body.name,
url=body.url,
containers=json.dumps(body.containers),
apps=json.dumps(body.apps),
system_info=json.dumps(body.system_info),
)
return {"status": "ok", "worker_id": worker_id}
Expand All @@ -1448,12 +1468,22 @@ async def api_list_workers(request: Request) -> list[dict[str, Any]]:
_require_auth_api(request)
workers = await database.list_workers()
for w in workers:
# Parse stored JSON for the API response
w["containers"] = json.loads(w.get("containers", "[]"))
w["system_info"] = json.loads(w.get("system_info", "{}"))
_parse_worker_json(w)
return workers


def _parse_worker_json(w: dict[str, Any]) -> None:
"""Parse stored JSON columns and compute counts for a worker dict."""
w["containers"] = _safe_json(w.get("containers", "[]"))
w["apps"] = _safe_json(w.get("apps", "[]"))
w["system_info"] = _safe_json(w.get("system_info", "{}"), {})
is_android = w["system_info"].get("device_type") == "android"
if is_android:
w["container_count"] = len(w["apps"])
w["running_count"] = sum(1 for a in w["apps"] if a.get("running"))
else:
w["container_count"] = len(w["containers"])
w["running_count"] = sum(1 for c in w["containers"] if c.get("status") == "running")
return workers


@app.get("/api/workers/{worker_id}")
Expand All @@ -1463,10 +1493,7 @@ async def api_get_worker(request: Request, worker_id: int) -> dict[str, Any]:
worker = await database.get_worker(worker_id)
if not worker:
raise HTTPException(status_code=404, detail="Worker not found")
worker["containers"] = json.loads(worker.get("containers", "[]"))
worker["system_info"] = json.loads(worker.get("system_info", "{}"))
worker["container_count"] = len(worker["containers"])
worker["running_count"] = sum(1 for c in worker["containers"] if c.get("status") == "running")
_parse_worker_json(worker)
return worker


Expand Down Expand Up @@ -1539,21 +1566,22 @@ async def api_fleet_summary(request: Request) -> dict[str, Any]:
_require_auth_api(request)

workers = await database.list_workers()
total_containers = 0
total_services = 0
total_running = 0
online_workers = 0

for w in workers:
containers = json.loads(w.get("containers", "[]"))
total_containers += len(containers)
total_running += sum(1 for c in containers if c.get("status") == "running")
if w["status"] == "online":
online_workers += 1
if w["status"] != "online":
continue
online_workers += 1
_parse_worker_json(w)
total_services += w["container_count"]
total_running += w["running_count"]

return {
"total_workers": len(workers),
"online_workers": online_workers,
"total_containers": total_containers,
"total_containers": total_services,
"running_containers": total_running,
}

Expand Down
14 changes: 7 additions & 7 deletions app/static/favicon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Loading