diff --git a/lib/db/models/all/model.js b/lib/db/models/all/model.js index 0dd60610b3..327a2427a9 100644 --- a/lib/db/models/all/model.js +++ b/lib/db/models/all/model.js @@ -1374,7 +1374,7 @@ export const updateDevicesOriginGroup = async(serial, group) => { db.devices.updateOne({serial}, update) ) - if (stats.modifiedCount) { + if (stats.modifiedCount || stats.matchedCount) { log.info( '[updateDevicesOriginGroup] Successfully updated origin group in device [serial: "%s", group: "%s", name: "%s"]', serial, diff --git a/lib/db/proxiedModel.js b/lib/db/proxiedModel.js deleted file mode 100644 index c78417cea6..0000000000 --- a/lib/db/proxiedModel.js +++ /dev/null @@ -1,56 +0,0 @@ -import * as Sentry from '@sentry/node' - -// ----------------------------------Proxy all methods for Sentry error tracing---------------------------------------// - -const STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg -const ARGUMENT_NAMES = /([^\s,]+)/g - -// TODO: argument names can be simplified after build -function getArgumentsNames(fn) { - const fnStr = fn.toString().replace(STRIP_COMMENTS, '') - let result = fnStr.slice(fnStr.indexOf('(') + 1, fnStr.indexOf(')')).match(ARGUMENT_NAMES) - return result || [] -} - -const getAddedAttributes = (fn, args) => Object.fromEntries( - getArgumentsNames(fn).map((argument, i) => [ - `dbapi.${argument}`, - args[i] - ]) -) - -/** - * @template ModelType - * @param {ModelType} model - * @return {ModelType} - */ -// @ts-ignore -export default (model) => new Proxy(model, { - - /** @param {string} prop */ - get(target, prop) { - return (...args) => Sentry.startSpan( - { - op: 'dbapi', - name: prop, - attributes: args?.length ? getAddedAttributes(target[prop], args) : {} - } - , () => target[prop](...args) - ) - } -}) - -// export default (model) => model ? Object.keys(model).reduce((proxiedModel, method) => { -// db.connect() -// // console.log('\n\n', method, '\n\n') -// proxiedModel[method] = (...args) => Sentry.startSpan( -// { -// op: 'dbapi' -// , name: method -// , attributes: getAddedAttributes(model[method], args) -// } -// , () => model[method](...args) -// ) -// return proxiedModel -// }, Object.create({})) : model - diff --git a/lib/db/proxiedModel.ts b/lib/db/proxiedModel.ts new file mode 100644 index 0000000000..a1c80dd255 --- /dev/null +++ b/lib/db/proxiedModel.ts @@ -0,0 +1,38 @@ +import * as Sentry from '@sentry/node' + +// ----------------------------------Proxy all methods for Sentry error tracing---------------------------------------// + +const STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg +const ARGUMENT_NAMES = /([^\s,]+)/g + +// TODO: argument names can be simplified after build +function getArgumentsNames(fn: Function) { + const fnStr = fn.toString().replace(STRIP_COMMENTS, '') + let result = fnStr.slice(fnStr.indexOf('(') + 1, fnStr.indexOf(')')).match(ARGUMENT_NAMES) + return result || [] +} + +const getAddedAttributes = (fn: Function, args: any[]) => Object.fromEntries( + getArgumentsNames(fn).map((argument, i) => [ + `dbapi.${argument}`, + args[i] + ]) +) + +export default >(model: T) => Object.keys(model).reduce((proxiedModel, method) => { + if (typeof model[method] !== 'function') { + proxiedModel[method] = model[method] + return proxiedModel + } + + proxiedModel[method] = (...args: any[]) => Sentry.startSpan( + { + op: 'dbapi', + name: method, + attributes: getAddedAttributes(model[method], args) + } + , () => model[method](...args) + ) + return proxiedModel +}, {} as any) as T + diff --git a/lib/units/api/controllers/devices.js b/lib/units/api/controllers/devices.js index 7d9f916420..58a037c5b0 100644 --- a/lib/units/api/controllers/devices.js +++ b/lib/units/api/controllers/devices.js @@ -16,7 +16,7 @@ import useDevice, {UseDeviceError} from '../helpers/useDevice.js' import * as Sentry from '@sentry/node' import {accessTokenAuth} from '../helpers/securityHandlers.js' import {DeviceOriginGroupMessage} from '../../../wire/wire.js' -var log = logger.createLogger('api:controllers:devices') +const log = logger.createLogger('api:controllers:devices') /* ------------------------------------ PRIVATE FUNCTIONS ------------------------------- */ function filterGenericDevices(req, res, devices) { @@ -237,8 +237,8 @@ function getDevices(req, res) { } } function getDeviceBySerial(req, res) { - var serial = req.params.serial - var fields = req.query.fields + const serial = req.params.serial + const fields = req.query.fields dbapi.loadDevice(req.user.groups.subscribed, serial) .then(device => { if (!device) { @@ -263,8 +263,8 @@ function getDeviceBySerial(req, res) { }) } function getDeviceSize(req, res) { - var serial = req.params.serial - dbapi.getDeviceDisplaySize(serial) + const serial = req.params.serial + return dbapi.getDeviceDisplaySize(serial) .then(response => { return res.status(200).json(response.display) }) @@ -295,8 +295,8 @@ function getDeviceGroups(req, res) { }) } function getDeviceOwner(req, res) { - var serial = req.params.serial - dbapi.getDeviceGroupOwner(serial) + const serial = req.params.serial + return dbapi.getDeviceGroupOwner(serial) .then(response => { if (!response) { return res.status(404).json({ @@ -312,8 +312,8 @@ function getDeviceOwner(req, res) { }) } function getDeviceType(req, res) { - var serial = req.params.serial - dbapi.getDeviceType(serial) + const serial = req.params.serial + return dbapi.getDeviceType(serial) .then(response => { return res.status(200).json(response) }) @@ -481,12 +481,12 @@ function removeOriginGroupDevice(req, res) { function putDeviceInfoBySerial(req, res) { const serial = req.params.serial const body = req.body - dbapi.loadDeviceBySerial(serial) + return dbapi.loadDeviceBySerial(serial) .then((data) => { if (!data) { return apiutil.respond(res, 404, `Not Found (${serial})`) } - var updates = [] + const updates = [] // Update fields based on given body if (_.has(body, 'note')) { updates.push(dbapi.setDeviceNote(serial, body.note)) diff --git a/lib/units/api/controllers/user.js b/lib/units/api/controllers/user.js index 8aef72fb95..390d7c7c5d 100644 --- a/lib/units/api/controllers/user.js +++ b/lib/units/api/controllers/user.js @@ -48,7 +48,7 @@ function getUser(req, res) { } function getUserDevices(req, res) { - var fields = req.query.fields + const fields = req.query.fields log.info('Loading user devices') dbapi.loadUserDevices(req.user.email) .then(list => { @@ -78,9 +78,9 @@ function getUserDevices(req, res) { } function getUserDeviceBySerial(req, res) { - var serial = req.params.serial - var fields = req.query.fields - dbapi.loadDevice(req.user.groups.subscribed, serial) + const serial = req.params.serial + const fields = req.query.fields + return dbapi.loadDevice(req.user.groups.subscribed, serial) .then(function(device) { if (!device) { return res.status(404).json({ @@ -95,7 +95,7 @@ function getUserDeviceBySerial(req, res) { description: 'Device is not owned by you' }) } - var responseDevice = device + let responseDevice = device if (fields) { responseDevice = _.pick(device, fields.split(',')) } @@ -116,7 +116,7 @@ function addUserDevice(req, res) { let timeout = Object.prototype.hasOwnProperty.call(req, 'body') ? req.body.timeout || null : req.query.timeout || null const lock = {} - lockutil.lockGenericDevice(req, res, lock, dbapi.lockDeviceByCurrent) + return lockutil.lockGenericDevice(req, res, lock, dbapi.lockDeviceByCurrent) .then(function(lockingSuccessed) { if (lockingSuccessed) { const device = lock.device @@ -183,7 +183,7 @@ function deleteUserDeviceBySerial(req, res) { else { serial = req.params.serial } - dbapi.loadDevice(req.user.groups.subscribed, serial) + return dbapi.loadDevice(req.user.groups.subscribed, serial) .then(async function(device) { if (!device) { if (isInternal) { @@ -216,13 +216,18 @@ function deleteUserDeviceBySerial(req, res) { } } - await runTransaction(device.channel, UngroupMessage.create({ + await runTransaction(device.channel, UngroupMessage, { requirements: wireutil.toDeviceRequirements({ serial: { value: serial, match: 'exact' } - })})) + }) + }, { + sub: req.options.sub, + push: req.options.push, + channelRouter: req.options.channelRouter + }) }) .catch(function(err) { let errSerial @@ -244,7 +249,7 @@ function deleteUserDeviceBySerial(req, res) { function remoteConnectUserDeviceBySerial(req, res) { let serial = req.params.serial - dbapi.loadDevice(req.user.groups.subscribed, serial) + return dbapi.loadDevice(req.user.groups.subscribed, serial) .then(function(device) { if (!device) { return res.status(404).json({ @@ -302,7 +307,7 @@ function remoteDisconnectUserDeviceBySerial(req, res) { else { serial = req.params.serial } - dbapi.loadDevice(req.user.groups.subscribed, serial) + return dbapi.loadDevice(req.user.groups.subscribed, serial) .then(function(device) { if (!device) { if (isInternal) { @@ -327,10 +332,10 @@ function remoteDisconnectUserDeviceBySerial(req, res) { }) } } - var responseChannel = 'txn_' + uuidv4() + const responseChannel = 'txn_' + uuidv4() req.options.sub.subscribe(responseChannel) // Timer will be called if no JoinGroupMessage is received till 5 seconds - var timer = setTimeoutS(function() { + const timer = setTimeoutS(function() { req.options.channelRouter.removeListener(responseChannel, messageListener) req.options.sub.unsubscribe(responseChannel) if (isInternal) { @@ -340,7 +345,7 @@ function remoteDisconnectUserDeviceBySerial(req, res) { return apiutil.respond(res, 504, 'Device is not responding') } }, apiutil.GRPC_WAIT_TIMEOUT) - var messageListener = new WireRouter() + const messageListener = new WireRouter() .on(ConnectStoppedMessage, function(channel, message) { if (message.serial === serial) { clearTimeout(timer) diff --git a/lib/units/api/helpers/useDevice.js b/lib/units/api/helpers/useDevice.js index 93c3a267a7..01b3ffba78 100644 --- a/lib/units/api/helpers/useDevice.js +++ b/lib/units/api/helpers/useDevice.js @@ -9,7 +9,7 @@ import wire from '../../../wire/index.js' import {v4 as uuidv4} from 'uuid' import {Log} from '../../../util/logger.js' import {runTransaction} from '../../../wire/transmanager.js' -import {ConnectStartedMessage, JoinGroupMessage} from '../../../wire/wire.js' +import {ConnectStartedMessage, GroupMessage, JoinGroupMessage, OwnerMessage, UngroupMessage} from '../../../wire/wire.js' export const UseDeviceError = Object.freeze({ NOT_FOUND: 0, @@ -50,10 +50,12 @@ const useDevice = ({user, device, channelRouter, push, sub, usage = null, log}) }) try { - await runTransaction(device.channel, new wire.UngroupMessage(deviceRequirements), {sub, push, channelRouter}) + await runTransaction(device.channel, UngroupMessage, { + requirements: deviceRequirements + }, {sub, push, channelRouter}) } catch (/** @type {any} */e) { - log?.info('Transaction failed: $s', e?.message) + log?.info('Transaction failed: %s', e?.message) } const responseTimeout = setTimeout(function() { @@ -73,7 +75,7 @@ const useDevice = ({user, device, channelRouter, push, sub, usage = null, log}) sub.subscribe(responseChannel) const connectTimeout = setTimeout(function() { - channelRouter.removeListener(responseChannel, useDeviceMessageListener) + channelRouter.removeListener(responseChannel, messageListener) sub.unsubscribe(responseChannel) reject(UseDeviceError.FAILED_CONNECT) @@ -102,13 +104,24 @@ const useDevice = ({user, device, channelRouter, push, sub, usage = null, log}) channelRouter.on(wireutil.global, useDeviceMessageListener) - await runTransaction(device.channel, new wire.GroupMessage( - new wire.OwnerMessage(user.email, user.name, user.group) - , timeout - , deviceRequirements - , usage - , user.adbKeys.map((/** @type {{ fingerprint: string }} */ k) => k.fingerprint) - ), {sub, push, channelRouter}) + try { + await runTransaction(device.channel, GroupMessage, { + owner: OwnerMessage.create({ + email: user.email, + name: user.name, + group: user.group + }), + requirements: deviceRequirements, + usage: usage || undefined, + keys: user.adbKeys?.map((/** @type {{ fingerprint: string }} */ k) => k.fingerprint) || [] + }, {sub, push, channelRouter}) + } + catch (/** @type {any} */e) { + log?.info('Transaction failed: %s', e?.message) + clearTimeout(responseTimeout) + channelRouter.removeListener(wireutil.global, useDeviceMessageListener) + return reject(UseDeviceError.FAILED_JOIN) + } }) export default useDevice diff --git a/lib/units/poorxy/index.js b/lib/units/poorxy/index.js index 6a6efc00d9..3b8320dd83 100644 --- a/lib/units/poorxy/index.js +++ b/lib/units/poorxy/index.js @@ -8,7 +8,7 @@ export default (function(options) { let server = http.createServer(app) let proxy = httpProxy.createProxyServer() proxy.on('error', function(err) { - log.error('Proxy had an error', err.stack) + log.error('Proxy had an error %s', err?.message) }) app.use(function(req, res, next) { res.setHeader('X-devicehub-unit', 'poorxy') diff --git a/lib/util/zmqutil.js b/lib/util/zmqutil.js index 6098ff06d7..3a4d67b438 100644 --- a/lib/util/zmqutil.js +++ b/lib/util/zmqutil.js @@ -20,9 +20,32 @@ const socketTypeMap = { reply: zmq.Reply } +// Shared ZMQ context to avoid creating multiple contexts with thread pools +// Each context creates ioThreads (4 by default), so sharing saves resources +/** @type {zmq.Context | null} */ +let sharedContext = null +const getSharedContext = () => { + if (!sharedContext) { + sharedContext = new zmq.Context({ + blocky: true, + ioThreads: 4, + ipv6: true, + maxSockets: 8192, + }) + } + return sharedContext +} + export class SocketWrapper extends EventEmitter { #sendQueue = Promise.resolve() + /** @type {AsyncIterator | null} */ + #iterator = null + + /** + * @param {string} type + * @param {number} keepAliveInterval + */ constructor(type, keepAliveInterval = 30) { super() @@ -34,18 +57,14 @@ export class SocketWrapper extends EventEmitter { this.isActive = true this.endpoints = new Set() + // @ts-ignore const SocketClass = socketTypeMap[type] this.socket = new SocketClass({ tcpKeepalive: 1, tcpKeepaliveIdle: keepAliveInterval, tcpKeepaliveInterval: keepAliveInterval, tcpKeepaliveCount: 100 - }, new zmq.Context({ - blocky: true, - ioThreads: 4, - ipv6: true, - maxSockets: 8192, - })) + }, getSharedContext()) } bindSync = (address) => this.socket.bindSync(address) @@ -86,17 +105,42 @@ export class SocketWrapper extends EventEmitter { } catch (/** @type {any} */ err) { log.error('Error on send: %s', err?.message || err?.toString() || JSON.stringify(err)) + throw err // Re-throw to properly handle in the promise chain } } + /** + * @param {any} args + */ send(args) { this.#sendQueue = this.#sendQueue.then(() => this.sendAsync(args)) return this } - close() { + async close() { this.isActive = false + + // Close async iterator if it exists + if (this.#iterator && typeof this.#iterator.return === 'function') { + try { + await this.#iterator.return() + } + catch { + // Ignore errors during cleanup + } + this.#iterator = null + } + + // Wait for send queue to drain before closing socket + try { + await this.#sendQueue.catch(() => {}) + } + catch { + // Ignore errors during cleanup + } + this.socket.close() + this.removeAllListeners() return this } @@ -118,10 +162,10 @@ export class SocketWrapper extends EventEmitter { } try { - const iterator = this.socket[Symbol.asyncIterator]() + this.#iterator = this.socket[Symbol.asyncIterator]() let result - while (this.isActive && !(result = await iterator.next()).done) { + while (this.isActive && this.#iterator && !(result = await this.#iterator.next()).done) { const message = result.value if (Array.isArray(message) && !!message[0]?.toString) {