From 6c1a47959d897ea8733a0cc2b3a4e969ad144073 Mon Sep 17 00:00:00 2001 From: "e.khalilov" Date: Thu, 27 Nov 2025 18:37:49 +0300 Subject: [PATCH] add health checks adb devices --- lib/units/provider/ADBObserver.ts | 394 +++++++++++++++++++++++++++--- lib/units/provider/index.ts | 203 ++++++++++++--- 2 files changed, 526 insertions(+), 71 deletions(-) diff --git a/lib/units/provider/ADBObserver.ts b/lib/units/provider/ADBObserver.ts index dfa1fbab7..7ef556a93 100644 --- a/lib/units/provider/ADBObserver.ts +++ b/lib/units/provider/ADBObserver.ts @@ -1,11 +1,12 @@ import EventEmitter from 'events' import net, {Socket} from 'net' -export type ADBDeviceType = 'unknown' | 'bootloader' | 'device' | 'recovery' | 'sideload' | 'offline' | 'unauthorized' | 'unknown' // https://android.googlesource.com/platform/system/core/+/android-4.4_r1/adb/adb.c#394 +export type ADBDeviceType = 'unknown' | 'bootloader' | 'device' | 'recovery' | 'sideload' | 'offline' | 'unauthorized' // https://android.googlesource.com/platform/system/core/+/android-4.4_r1/adb/adb.c#394 interface ADBDevice { serial: string type: ADBDeviceType + isStuck: boolean reconnect: () => Promise } @@ -16,46 +17,72 @@ interface ADBDeviceEntry { type PrevADBDeviceType = ADBDevice['type'] +interface DeviceHealthCheck { + attempts: number + timeout: number + firstFailureTime: number + lastAttemptTime: number +} + interface ADBEvents { connect: [ADBDevice] update: [ADBDevice, PrevADBDeviceType] disconnect: [ADBDevice] + stuck: [ADBDevice, DeviceHealthCheck] + healthcheck: [{ ok: number, bad: number }] error: [Error] } +const isOnline = (type: string) => ['device', 'emulator'].includes(type) + class ADBObserver extends EventEmitter { static instance: ADBObserver | null = null private readonly intervalMs: number = 1000 // Default 1 second polling + private readonly healthCheckIntervalMs: number = 10000 // Default 1 minute health check + private readonly maxHealthCheckAttempts: number = 3 + private readonly host: string = 'localhost' private readonly port: number = 5037 private devices: Map = new Map() + private deviceHealthAttempts: Map = new Map() + private pollTimeout: NodeJS.Timeout | null = null - private isPolling: boolean = false - private isDestroyed: boolean = false - private shouldContinuePolling: boolean = false + private healthCheckTimeout: NodeJS.Timeout | null = null + + private readonly requestTimeoutMs: number = 5000 // 5 second timeout per request + private readonly initialReconnectDelayMs: number = 100 + private readonly maxReconnectAttempts: number = 8 + + private reconnectAttempt: number = 0 + private connection: Socket | null = null - private isConnecting: boolean = false private requestQueue: Array<{ command: string resolve: (value: string) => void reject: (error: Error) => void + needData: boolean + isRawStream?: boolean // For device commands after transport (shell:, logcat:, etc.) timer?: NodeJS.Timeout // Set when request is in-flight + rawStreamBuffer?: Buffer // Accumulated data for raw stream responses + rawStreamStarted?: boolean // True once OKAY received and we're accumulating raw stream data }> = [] - private readonly requestTimeoutMs: number = 5000 // 5 second timeout per request - private readonly maxReconnectAttempts: number = 8 - private readonly initialReconnectDelayMs: number = 100 - private reconnectAttempt: number = 0 + + private shouldContinuePolling: boolean = false + private isPolling: boolean = false + private isDestroyed: boolean = false + private isConnecting: boolean = false private isReconnecting: boolean = false - constructor(options?: {intervalMs?: number; host?: string; port?: number}) { + constructor(options?: {intervalMs?: number; healthCheckIntervalMs?: number; host?: string; port?: number}) { if (ADBObserver.instance) { return ADBObserver.instance } super() this.intervalMs = options?.intervalMs || this.intervalMs + this.healthCheckIntervalMs = options?.healthCheckIntervalMs || this.healthCheckIntervalMs this.host = options?.host || this.host this.port = options?.port || this.port @@ -80,6 +107,7 @@ class ADBObserver extends EventEmitter { this.pollDevices() this.scheduleNextPoll() + this.scheduleNextHealthCheck() } /** @@ -91,6 +119,10 @@ class ADBObserver extends EventEmitter { clearTimeout(this.pollTimeout) this.pollTimeout = null } + if (this.healthCheckTimeout) { + clearTimeout(this.healthCheckTimeout) + this.healthCheckTimeout = null + } this.closeConnection() ADBObserver.instance = null } @@ -99,6 +131,7 @@ class ADBObserver extends EventEmitter { this.isDestroyed = true this.stop() this.devices.clear() + this.deviceHealthAttempts.clear() this.removeAllListeners() } @@ -126,7 +159,6 @@ class ADBObserver extends EventEmitter { const currentSerials = new Set(currentDevices.map(d => d.serial)) const previousSerials = new Set(this.devices.keys()) - // Find new devices (connect events) for (const deviceEntry of currentDevices) { const existingDevice = this.devices.get(deviceEntry.serial) @@ -140,6 +172,11 @@ class ADBObserver extends EventEmitter { // Device state changed (update event) const oldType = existingDevice.type existingDevice.type = deviceEntry.state as ADBDevice['type'] + + if (isOnline(existingDevice.type)) { + existingDevice.isStuck = false + } + this.emit('update', existingDevice, oldType) } } @@ -149,6 +186,7 @@ class ADBObserver extends EventEmitter { if (!currentSerials.has(serial)) { const device = this.devices.get(serial)! this.devices.delete(serial) + this.deviceHealthAttempts.delete(serial) // Clean up health check tracking this.emit('disconnect', device) } } @@ -178,6 +216,111 @@ class ADBObserver extends EventEmitter { }, this.intervalMs) } + /** + * Schedule the next health check cycle + */ + private scheduleNextHealthCheck(): void { + if (this.isDestroyed) { + return + } + + this.healthCheckTimeout = setTimeout(async() => { + await this.performHealthChecks() + + if (!this.isDestroyed) { + this.scheduleNextHealthCheck() + } + }, this.healthCheckIntervalMs) + } + + /** + * Perform health checks on all tracked devices using getprop command + */ + private async performHealthChecks(): Promise { + if (this.isDestroyed || this.devices.size === 0) { + return + } + + try { + let now = 0, + ok = 0, + bad = 0 + + // Check each tracked device + for (const [serial, device] of this.devices.entries()) { + if (this.isDestroyed || !this.shouldContinuePolling) { + break + } + + if (device.isStuck || !isOnline(device.type)) { + continue + } + + now = Date.now() + + try { + // Use shell command to check if device is responsive + // This is more reliable than get-state + // sendADBCommand already has a timeout (requestTimeoutMs) + const res = await this.sendADBCommand('shell:getprop ro.build.version.sdk', serial) + + console.log('RESPONSE:\n', res, '\n', typeof res, res?.length) + // Device responded successfully - reset failure tracking + if (this.deviceHealthAttempts.has(serial)) { + this.deviceHealthAttempts.delete(serial) + } + + ok++ + } + catch (error: any) { + console.log(error) + // Device didn't respond - track failure and potentially reconnect + this.handleDeviceHealthCheckFailure(serial, device, now) + bad++ + } + } + + this.emit('healthcheck', { ok, bad }) + } + catch (error: any) { + this.emit('error', new Error(`Health check failed: ${error.message}`)) + } + } + + /** + * Handle health check failure with backoff and reconnection attempts + */ + private async handleDeviceHealthCheckFailure(serial: string, device: ADBDevice, now: number): Promise { + const attemptInfo = this.deviceHealthAttempts.get(serial) + + if (!attemptInfo) { + // First failure - initialize tracking + this.deviceHealthAttempts.set(serial, { + attempts: 1, + timeout: this.requestTimeoutMs, + firstFailureTime: now, + lastAttemptTime: now + }) + return + } + + attemptInfo.attempts++ + attemptInfo.lastAttemptTime = now + + if (attemptInfo.attempts >= this.maxHealthCheckAttempts) { + device.isStuck = true + this.devices.set(device.serial, device) + this.emit('stuck', device, attemptInfo) + + // Reset tracking for potential future recovery + this.deviceHealthAttempts.delete(serial) + + // Attempt reconnection (for network devices) + await device.reconnect() + return + } + } + private async getADBDevices(): Promise { try { const response = await this.sendADBCommand('host:devices') @@ -224,7 +367,13 @@ class ADBObserver extends EventEmitter { this.isConnecting = true return new Promise((resolve, reject) => { - const client = net.createConnection(this.port, this.host, () => { + const client = net.createConnection({ + port: this.port, + host: this.host, + noDelay: true, + keepAlive: true, + keepAliveInitialDelay: 30000 + }, () => { this.connection = client this.isConnecting = false this.reconnectAttempt = 0 // Reset reconnection counter on successful connection @@ -254,6 +403,22 @@ class ADBObserver extends EventEmitter { client.on('close', () => { this.connection = null + // Special handling for raw stream in progress - connection close means command completed + if (this.requestQueue.length > 0 && this.requestQueue[0].rawStreamStarted) { + const request = this.requestQueue.shift()! + if (request.timer) { + clearTimeout(request.timer) + } + const responseData = request.rawStreamBuffer!.toString('utf-8').trim() + request.resolve(responseData) + + // Process next request in queue (will reconnect if needed) + if (this.shouldContinuePolling && !this.isDestroyed) { + this.processNextRequest() + } + return + } + // Clear the timeout of in-flight request but keep it for potential retry if (this.requestQueue.length > 0 && this.requestQueue[0].timer) { clearTimeout(this.requestQueue[0].timer) @@ -266,8 +431,8 @@ class ADBObserver extends EventEmitter { } else { // Reject all queued requests (including in-flight one) - for (const {reject} of this.requestQueue) { - reject(new Error('Connection closed')) + for (const request of this.requestQueue) { + request.reject(new Error('Connection closed')) } this.requestQueue = [] } @@ -283,29 +448,136 @@ class ADBObserver extends EventEmitter { * Process ADB protocol responses and return remaining buffer */ private processADBResponses(buffer: Buffer): Buffer { + if (!this.requestQueue.length) { + return buffer + } + + const request = this.requestQueue[0]! let offset = 0 - while (offset < buffer.length) { - // Need at least 8 bytes for status (4) + length (4) - if (buffer.length - offset < 8) { - break + // Special handling for raw stream that's already started + // Once OKAY is received for raw stream, we only accumulate data (no more status codes) + if (request.rawStreamStarted) { + // Accumulate all data + if (buffer.length > 0) { + request.rawStreamBuffer = Buffer.concat([request.rawStreamBuffer || Buffer.alloc(0), buffer]) + + // Check if we have a complete line (newline detected) + // For commands like getprop that return single-line output, complete immediately + const bufferString = request.rawStreamBuffer.toString('utf-8') + if (bufferString.includes('\n')) { + if (this.requestQueue.length > 0 && this.requestQueue[0] === request) { + this.requestQueue.shift() + if (request.timer) { + clearTimeout(request.timer) + } + const responseData = bufferString.trim() + request.resolve(responseData) + + // After transport session, close connection for next device/command + this.closeConnectionAfterTransport() + + // Process next request in queue (will reconnect) + this.processNextRequest() + } + } + } + + return Buffer.alloc(0) // All data consumed + } + + // Check if we have at least status bytes + if (buffer.length < 4) { + return buffer + } + + const status = buffer.subarray(offset, offset + 4).toString('ascii') + + if (status === 'FAIL') { + // For FAIL responses, we always have length-prefixed error message + if (buffer.length < 8) { + return buffer // Need more data for length } - const status = buffer.subarray(offset, offset + 4).toString('ascii') const lengthHex = buffer.subarray(offset + 4, offset + 8).toString('ascii') const dataLength = parseInt(lengthHex, 16) - // Check if we have the complete response - if (buffer.length - offset < 8 + dataLength) { - break + if (buffer.length < 8 + dataLength) { + return buffer // Need more data for complete error message } - const responseData = buffer.subarray(offset + 8, offset + 8 + dataLength).toString('utf-8') + const errorMessage = buffer.subarray(offset + 8, offset + 8 + dataLength).toString('utf-8') + + if (this.requestQueue.length > 0) { + const request = this.requestQueue.shift()! + if (request.timer) { + clearTimeout(request.timer) + } + request.reject(new Error(errorMessage || 'ADB command failed')) + // Process next request in queue + this.processNextRequest() + } + + return buffer.subarray(offset + 8 + dataLength) + } + + if (status === 'OKAY') { + offset += 4 // Consume OKAY status + + // Handle different response types based on request + if (request.isRawStream) { + // For device commands after transport (shell:, logcat:, etc.) + // Response is: OKAY + raw unstructured stream (no length prefix) + + // Mark that we've started raw stream mode + // This prevents processing any further status codes for this request + request.rawStreamStarted = true + request.rawStreamBuffer = Buffer.alloc(0) + + // Accumulate any data that came with OKAY in this packet + if (buffer.length > offset) { + request.rawStreamBuffer = Buffer.concat([request.rawStreamBuffer, buffer.subarray(offset)]) + } + + // Check if we already have a complete line (newline detected) + const bufferString = request.rawStreamBuffer.toString('utf-8') + if (bufferString.includes('\n')) { + if (this.requestQueue.length > 0) { + this.requestQueue.shift() + if (request.timer) { + clearTimeout(request.timer) + } + const responseData = bufferString.trim() + request.resolve(responseData) + + // After transport session, close connection for next device/command + this.closeConnectionAfterTransport() + + // Process next request in queue (will reconnect if needed) + this.processNextRequest() + } + } + // If no newline yet, wait for more data (will be handled by rawStreamStarted check above) + + return Buffer.alloc(0) // All data consumed + } + else if (request.needData) { + // For host commands with length-prefixed data + if (buffer.length - offset < 4) { + return buffer.subarray(offset - 4) // Need more data for length, return including OKAY + } + + const lengthHex = buffer.subarray(offset, offset + 4).toString('ascii') + const dataLength = parseInt(lengthHex, 16) + + if (buffer.length - offset < 4 + dataLength) { + return buffer.subarray(offset - 4) // Need more data, return including OKAY + } + + const responseData = buffer.subarray(offset + 4, offset + 4 + dataLength).toString('utf-8') - if (status === 'OKAY') { - // Resolve the in-flight request (first in queue) if (this.requestQueue.length > 0) { - const request = this.requestQueue.shift()! + this.requestQueue.shift() if (request.timer) { clearTimeout(request.timer) } @@ -313,37 +585,60 @@ class ADBObserver extends EventEmitter { // Process next request in queue this.processNextRequest() } + + return buffer.subarray(offset + 4 + dataLength) } - else if (status === 'FAIL') { - // Reject the in-flight request (first in queue) + else { + // For commands that only expect OKAY (like host:transport:) if (this.requestQueue.length > 0) { - const request = this.requestQueue.shift()! + this.requestQueue.shift() if (request.timer) { clearTimeout(request.timer) } - request.reject(new Error(responseData || 'ADB command failed')) + request.resolve('') // Process next request in queue this.processNextRequest() } - } - offset += 8 + dataLength + return buffer.subarray(offset) + } } - // Return remaining incomplete data in buffer - return offset > 0 ? buffer.subarray(offset) : buffer + // Unknown status or need more data + return buffer } /** * Send command to ADB server using persistent connection * Requests are queued and processed sequentially */ - private async sendADBCommand(command: string): Promise { + private async sendADBCommand(command: string, host?: string): Promise { await this.ensureConnection() return new Promise((resolve, reject) => { - // Add request to the queue - this.requestQueue.push({command, resolve, reject}) + if (host) { + // First, switch to device transport mode + this.requestQueue.push({ + command: `host:transport:${host}`, + needData: false, + resolve: () => { + // After transport succeeds, socket is now a tunnel to device's adbd + // Device commands (shell:, logcat:, etc.) return raw streams, not length-prefixed data + this.requestQueue.push({ + command, + resolve, + reject, + needData: false, + isRawStream: true // Mark as raw stream response + }) + this.processNextRequest() + }, + reject + }) + } else { + // Host commands have length-prefixed responses + this.requestQueue.push({command, resolve, reject, needData: true}) + } // Try to process the queue if no request is currently in-flight this.processNextRequest() @@ -462,6 +757,21 @@ class ADBObserver extends EventEmitter { this.requestQueue = [] } + /** + * Close connection after transport session (device-specific command) + * This is necessary because after host:transport:, the socket becomes + * a dedicated tunnel to that device and cannot be reused for other commands + */ + private closeConnectionAfterTransport(): void { + if (this.connection && !this.connection.destroyed) { + this.connection.destroy() + this.connection = null + } + + // Don't reject queued requests - they will be processed with a new connection + // Don't reset reconnection state - let it continue if needed + } + /** * Close the persistent connection */ @@ -517,11 +827,12 @@ class ADBObserver extends EventEmitter { const device: ADBDevice = { serial: deviceEntry.serial, type: deviceEntry.state, + isStuck: false, reconnect: async(): Promise => { try { // Try to reconnect the device using ADB protocol (for network devices) // For USB devices, this might not be applicable - if (device.serial.includes(':')) { + if (device.serial.includes(':') && !this.isDestroyed) { if (this.devices.has(device.serial)) { try { await this.sendADBCommand(`host:disconnect:${device.serial}`) @@ -535,10 +846,13 @@ class ADBObserver extends EventEmitter { await new Promise(resolve => setTimeout(resolve, 1000)) const devices = await this.getADBDevices() - const reconnectedDevice = devices.find(d => d.serial === device.serial) + const reconnectedDevice = devices.find(d => + d.serial === device.serial + ) - if (reconnectedDevice && reconnectedDevice.state === 'device') { + if (reconnectedDevice && isOnline(reconnectedDevice.state)) { device.type = 'device' + device.isStuck = false return true } } diff --git a/lib/units/provider/index.ts b/lib/units/provider/index.ts index 56e63cc5b..2f0266ffe 100644 --- a/lib/units/provider/index.ts +++ b/lib/units/provider/index.ts @@ -8,10 +8,10 @@ import srv from '../../util/srv.js' import * as zmqutil from '../../util/zmqutil.js' import {ChildProcess} from 'node:child_process' import ADBObserver, {ADBDevice} from './ADBObserver.js' -import { DeviceRegisteredMessage } from '../../wire/wire.ts' +import { DeviceRegisteredMessage } from '../../wire/wire.js' interface DeviceWorker { - state: 'waiting' | 'running' + state: 'waiting' | 'starting' | 'running' time: number terminate: () => Promise | void resolveRegister?: () => void @@ -40,6 +40,9 @@ export interface Options { export default (async function(options: Options) { const log = logger.createLogger('provider') + // Startup timeout for device workers + const STARTUP_TIMEOUT_MS = 10 * 60 * 1000 + // Check whether the ipv4 address contains a port indication if (options.adbHost.includes(':')) { log.error('Please specify adbHost without port') @@ -90,7 +93,7 @@ export default (async function(options: Options) { .on(DeviceRegisteredMessage, (channel, message: {serial: string}) => { if (workers[message.serial]?.resolveRegister) { workers[message.serial].resolveRegister!() - delete workers[message.serial]?.resolveRegister + delete workers[message.serial].resolveRegister } }) .handler() @@ -122,7 +125,23 @@ export default (async function(options: Options) { const stop = async(device: ADBDevice) => { if (workers[device.serial]) { log.info('Shutting down device worker "%s" [%s]', device.serial, device.type) - return workers[device.serial].terminate() + return workers[device.serial]?.terminate() + } + } + + const removeDevice = async(device: ADBDevice) => { + try { + log.info('Removing device %s', device.serial) + await stop(device) + } + catch (err) { + log.error('Error stopping device worker "%s": %s', device.serial, err) + } + finally { + // Always clean up and return ports, even if stop() fails + if (workers[device.serial]) { + workers[device.serial].delete() + } } } @@ -147,8 +166,8 @@ export default (async function(options: Options) { return } - const proc = options.fork(device, workers[device.serial].ports) - log.info('Spawned a device worker') + const proc = options.fork(device, [...workers[device.serial].ports]) + log.info('Spawned a device worker with ports [ %s ]', workers[device.serial].ports.join(', ')) const cleanup = () => { proc.removeAllListeners('exit') @@ -183,7 +202,7 @@ export default (async function(options: Options) { workers[device.serial].terminate = () => exitListener(0) const errorListener = (err: any) => { - log.error('Device worker "%s" had an error: %s', device.serial, err.message) + log.error('Device worker "%s" had an error: %s', device.serial, err?.message) onError(err) } @@ -217,6 +236,12 @@ export default (async function(options: Options) { } log.info('Starting to work for device "%s"', device.serial) + + if (workers[device.serial].state !== 'starting') { + workers[device.serial].state = 'starting' + workers[device.serial].time = Date.now() + } + let resolveReady: () => void let rejectReady: () => void @@ -227,26 +252,47 @@ export default (async function(options: Options) { const resolveRegister = () => { if (workers[device.serial]?.resolveRegister) { workers[device.serial].resolveRegister!() - delete workers[device.serial]?.resolveRegister + delete workers[device.serial].resolveRegister } } const handleError = async(err: any) => { - log.error('Failed start device worker "%s": %s', device.serial, err) + log.error('Device "%s" error: %s', device.serial, err) rejectReady() resolveRegister() + if (!workers[device.serial]) { + // Device was disconnected, don't restart + return + } + if (err instanceof procutil.ExitError) { log.error('Device worker "%s" died with code %s', device.serial, err.code) log.info('Restarting device worker "%s"', device.serial) await new Promise(r => setTimeout(r, 2000)) + if (!workers[device.serial]) { + log.info('Restart of device "%s" cancelled', device.serial) + return + } + work(device) return } } const worker = spawn(device, resolveReady!, handleError) + if (!worker) { + log.error('Device "%s" startup failed', device.serial) + + // Clean up worker and return ports + if (workers[device.serial]) { + resolveRegister() + workers[device.serial]?.delete() + } + + return + } try { await Promise.all([ @@ -258,7 +304,15 @@ export default (async function(options: Options) { ) ]) } - catch (e) { + catch (e: any) { + log.error('Device "%s" startup failed: %s', device.serial, e?.message) + + // Clean up worker and return ports + if (workers[device.serial]) { + resolveRegister() + await worker?.kill?.() + workers[device.serial]?.delete() + } return } @@ -270,17 +324,11 @@ export default (async function(options: Options) { workers[device.serial].terminate = async() => { resolveRegister() - workers[device.serial].delete() + workers[device.serial]?.delete() await worker?.kill?.() // if process exited - no effect log.info('Cleaning up device worker "%s"', device.serial) - // Tell others the device is gone - push.send([ - wireutil.global, - wireutil.envelope(new wire.DeviceAbsentMessage(device.serial)) - ]) - stats() // Wait while DeviceAbsentMessage processed on app side (1s) @@ -304,6 +352,43 @@ export default (async function(options: Options) { port: options.adbPort, host: options.adbHost }) + + // Worker health monitoring - check for stuck workers + const checkWorkerHealth = async() => { + const now = Date.now() + const stuckWorkers: string[] = [] + + for (const serial of Object.keys(workers)) { + const worker = workers[serial] + + // Check if worker has been in "starting" state for longer than startup timeout + if (worker.state === 'starting' && (now - worker.time) > STARTUP_TIMEOUT_MS) { + log.warn('Worker "%s" has been stuck in starting state for %s ms', serial, now - worker.time) + stuckWorkers.push(serial) + } + } + + // Stop and restart stuck workers + for (const serial of stuckWorkers) { + log.error('Restarting stuck worker "%s"', serial) + + try { + const device = tracker.getDevice(serial) + if (device) { + await removeDevice(device) + } + } + catch (err) { + log.error('Error restarting stuck worker "%s": %s', serial, err) + } + } + } + + tracker.on('healthcheck', stats => { + log.info('Healthcheck [OK: %s, BAD: %s]', stats.ok, stats.bad) + checkWorkerHealth() + }) + log.info('Tracking devices') tracker.on('connect', filterDevice((device) => { @@ -321,8 +406,15 @@ export default (async function(options: Options) { register: register(device), // Register device immediately, before 'running' state ports: ports.splice(0, 2), delete: () => { + log.warn('DELETING DEVICE %s', device.serial) ports.push(...workers[device.serial].ports) delete workers[device.serial] + + // Tell others the device is gone + push.send([ + wireutil.global, + wireutil.envelope(new wire.DeviceAbsentMessage(device.serial)) + ]) } } @@ -344,29 +436,70 @@ export default (async function(options: Options) { } })) - tracker.on('update', filterDevice((device, oldType) => { + tracker.on('update', filterDevice(async(device, oldType) => { + log.info('Device "%s" is now "%s" (was "%s")', device.serial, device.type, oldType) + if (!['device', 'emulator'].includes(device.type)) { - log.info('Lost device "%s" [%s]', device.serial, device.type) - return stop(device) - } + // Device went offline - stop worker but keep it in waiting state + log.info('Device "%s" went offline [%s]', device.serial, device.type) - log.info('Device "%s" is now "%s" (was "%s")', device.serial, device.type, oldType) + if (workers[device.serial] && workers[device.serial].state !== 'waiting') { + try { + await stop(device) + } + catch (err) { + log.error('Error stopping device worker "%s": %s', device.serial, err) + } + + // Set back to waiting state (keep worker and ports allocated) + if (workers[device.serial]) { + workers[device.serial].state = 'waiting' + workers[device.serial].time = Date.now() + } + } + return + } - // If not running, but can - if (device.type === 'device' && workers[device.serial]?.state === 'waiting') { + if (device.type === 'device' && workers[device.serial]?.state !== 'running') { clearTimeout(workers[device.serial].waitingTimeoutTimer) work(device) } })) + tracker.on('stuck', async(device, health) => { + log.warn( + 'Device %s is stuck [attempts: %s, first_healthcheck: %s, last_healthcheck: %s]', + device.serial, + new Date(health.firstFailureTime).toISOString(), + new Date(health.lastAttemptTime).toISOString() + ) + + if (!workers[device.serial]) { + log.warn('Device is stuck, but worker is not running') + return + } + + removeDevice(device) + }) + tracker.on('disconnect', filterDevice(async(device) => { - log.info('Disconnect device "%s" [%s]', device.serial, device.type) - clearTimeout(workers[device.serial]?.waitingTimeoutTimer) - await stop(device) + log.info('Device is disconnected "%s" [%s]', device.serial, device.type) - workers[device.serial].delete() + if (!workers[device.serial]) { + log.warn('Device is disconnected, but worker is not running%s', device.isStuck ? ' [Device got stuck earlier]' : '') + return + } + + if (!device.isStuck) { + clearTimeout(workers[device.serial].waitingTimeoutTimer) + removeDevice(device) + } })) + tracker.on('error', err => { + log.error('ADBObserver error: %s', err?.message) + }) + tracker.start() let statsTimer: NodeJS.Timeout @@ -374,17 +507,22 @@ export default (async function(options: Options) { const all = Object.keys(workers).length const result: any = { waiting: [], + starting: [], running: [] } for (const serial of Object.keys(workers)) { if (workers[serial].state === 'running') { result.running.push(serial) - continue } - result.waiting.push(serial) + else if (workers[serial].state === 'starting') { + result.starting.push(serial) + } + else { + result.waiting.push(serial) + } } - log.info(`Providing ${result.running.length} of ${all} device(s); waiting for [${result.waiting.join(', ')}]`) + log.info(`Providing ${result.running.length} of ${all} device(s); starting: [${result.starting.join(', ')}], waiting: [${result.waiting.join(', ')}]`) log.info(`Providing all ${all} of ${tracker.count} device(s)`) log.info(`Providing all ${tracker.count} device(s)`) @@ -395,6 +533,9 @@ export default (async function(options: Options) { } lifecycle.observe(async() => { + // Clear timers + clearTimeout(statsTimer) + await Promise.all( Object.values(workers) .map(worker =>